block by daniarleagk 984908da40f45c6ea0ff74bcdf3e41a7

Export and import data using Apache Spark from and into Apache HBase

I show two examples for importing and exporting data from and into HBase tables. In these examples I use Apache Spark 2.x and HBase 1.x API.

Export Data into HBase

To load data into HBase we will use either foreachPartition or saveAsNewAPIHadoopDataset action.

ForeachPartition

We create a connection object in a foreachPartition action and push data either with Table object or bufferedMutator interface. The latter one is preferred for a big amount of data, since the bufferedMutator sends records in batches.

I also used for a Scala example try-catch-close pattern from https://medium.com/@dkomanov/scala-try-with-resources-735baad0fd7d

Scala Example

def optionOne(sparkSession: SparkSession, tableName: String, rdd: RDD[MyRecord]): Unit = {
    rdd.foreachPartition(iterator => {
      // since resources should be closed properly I used the solution described in
      // https://medium.com/@dkomanov/scala-try-with-resources-735baad0fd7d
      withResources(ConnectionFactory.createConnection(HBaseConfiguration.create()))(
        connection => {
          withResources(connection.getBufferedMutator(TableName.valueOf(tableName)))(
            mutator => {
              iterator.foreach(record => {
                val put = new Put(Bytes.toBytes(record.key))
                put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.myValue))
                mutator.mutate(put)
              }
              )
            }
          )
        }
      )
    }
    )
}

Java Example

public static void optionOne(SparkSession sparkSession, String tableName, JavaRDD<MyRecord> rdd) {
        rdd.foreachPartition(iterator -> {
                    try (Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
                         //option 1.1 is to use Table table = connection.getTable(TableName.valueOf(tableName));
                         BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName))) {
                        while (iterator.hasNext()) {
                            MyRecord record = iterator.next();
                            Put put = new Put(Bytes.toBytes(record.getKey()));
                            put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.getMyValue()));
                            mutator.mutate(put);
                            //table.put(put);
                        }
                    }
                }
        );
    }

saveAsNewAPIHadoopDataset

In this option Spark uses TableOutputFormat Object to load data in HBase. This is my preferred option, since connection handling and data loading are hidden. The first step is to create a PairRDD[ImmutableBytesWritable, Put] and then use write action with a saveAsNewAPIHadoopDataset call.

Scala

def optionTwo(sparkSession: SparkSession, tableName : String, rdd : RDD[MyRecord]) : Unit ={
 val config = new Configuration
 config.set(TableOutputFormat.OUTPUT_TABLE, tableName)
 val jobConfig = Job.getInstance(config)
 jobConfig.setOutputFormatClass(classOf[TableOutputFormat[_]])
 rdd.map( record => {
   val put = new Put(Bytes.toBytes(record.key))
   put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.myValue))
   ( new ImmutableBytesWritable(put.getRow()), put)
 }).saveAsNewAPIHadoopDataset(jobConfig.getConfiguration)
}

Java

public static void optionOne(SparkSession sparkSession, String tableName, JavaRDD<MyRecord> rdd) {
        rdd.foreachPartition(iterator -> {
                    try (Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
                         //option 1.1 is to use Table table = connection.getTable(TableName.valueOf(tableName));
                         BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName))) {
                        while (iterator.hasNext()) {
                            MyRecord record = iterator.next();
                            Put put = new Put(Bytes.toBytes(record.getKey()));
                            put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.getMyValue()));
                            mutator.mutate(put);
                            //table.put(put);
                        }
                    }
                }
        );
    }

Here is a complete example with a main method. In this example we put records in a HBase table with both options.

Scala:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import scala.util.control.NonFatal

/**
  * this object exhibits simple pattern for ingesting data into hbase
  */
object ExportIntoHBase {

  /**
    * copied from https://medium.com/@dkomanov/scala-try-with-resources-735baad0fd7d
    *
    * @param r
    * @param f
    * @tparam T
    * @tparam V
    * @return
    */
  def withResources[T <: AutoCloseable, V](r: => T)(f: T => V): V = {
    val resource: T = r
    require(resource != null, "resource is null")
    var exception: Throwable = null
    try {
      f(resource)
    } catch {
      case NonFatal(e) =>
        exception = e
        throw e
    } finally {
      closeAndAddSuppressed(exception, resource)
    }
  }

  /**
    * https //todo: labels is not supported//medium.com/@dkomanov/scala-try-with-resources-735baad0fd7d
    *
    * @param e
    * @param resource
    */
  def closeAndAddSuppressed(e: Throwable,
                            resource: AutoCloseable): Unit = {
    if (e != null) {
      try {
        resource.close()
      } catch {
        case NonFatal(suppressed) =>
          e.addSuppressed(suppressed)
      }
    } else {
      resource.close()
    }
  }

  /**
    *
    * @param key
    * @param myValue
    */
  case class MyRecord(key: String, myValue: String)

  /**
    * https://medium.com/@dkomanov/scala-try-with-resources-735baad0fd7d
    *
    * @param sparkSession
    * @param tableName
    * @param rdd
    */
  def optionOne(sparkSession: SparkSession, tableName: String, rdd: RDD[MyRecord]): Unit = {
    rdd.foreachPartition(iterator => {
      // initialize for a partition
      // basic pattern:
      // var connection = ConnectionFactory.createConnection(HBaseConfiguration.create())
      // var table = connection.getTable(TableName.valueOf(tableName))
      // iterator.foreach(pair => table.put(pair._2))
      // since resources should be closed properly I used the solution described in
      // https://medium.com/@dkomanov/scala-try-with-resources-735baad0fd7d
      withResources(ConnectionFactory.createConnection(HBaseConfiguration.create()))(
        connection => {
          //option 1.1 Table
          //withResources(connection.getTable(TableName.valueOf(tableName)))(
          //  table => {
          //    iterator.foreach(pair => table.put(pair._2))
          //  }
          //)
          //option 1.2 BufferedMutator preferred one
          withResources(connection.getBufferedMutator(TableName.valueOf(tableName)))(
            mutator => {
              iterator.foreach(record => {
                val put = new Put(Bytes.toBytes(record.key))
                put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.myValue))
                mutator.mutate(put)
              }
              )
            }
          )
        }
      )
    }
    )
  }

  /**
    * preferred one
    *
    * @param sparkSession
    * @param tableName
    * @param rdd
    */
  def optionTwo(sparkSession: SparkSession, tableName: String, rdd: RDD[MyRecord]): Unit = {
    val config = new Configuration
    config.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    val jobConfig = Job.getInstance(config)
    jobConfig.setOutputFormatClass(classOf[TableOutputFormat[_]])
    rdd.map(record => {
      val put = new Put(Bytes.toBytes(record.key))
      put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.myValue))
      (new ImmutableBytesWritable(put.getRow()), put)
    }).saveAsNewAPIHadoopDataset(jobConfig.getConfiguration)
  }

  /**
    *
    * @param args
    */
  def main(args: Array[String]): Unit = {
    // init spark session
    val spark = SparkSession.builder().appName("Spark HBase export")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.ui.showConsoleProgress", "false").getOrCreate();
    // ingesting data into hbase
    val tableName = "testTable"
    val array1 = Array(new MyRecord("key_1", "value_1"), new MyRecord("key_2", "value_2"))
    val array2 = Array(new MyRecord("key_3", "value_3"), new MyRecord("key_4", "value_4"))
    val rdd1 = spark.sparkContext.parallelize(array1, 1)
    val rdd2 = spark.sparkContext.parallelize(array2, 1)
    optionOne(spark, tableName, rdd1)
    optionTwo(spark, tableName, rdd2)
  }
}

Java:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;

public class ExportIntoHBaseJava {
    /**
     * POJO
     */
    public static class MyRecord implements Serializable {

        private String key;

        private String myValue;

        public MyRecord(String key, String myValue) {
            this.key = key;
            this.myValue = myValue;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }

        public String getMyValue() {
            return myValue;
        }

        public void setMyValue(String myValue) {
            this.myValue = myValue;
        }
    }

    public static void optionOne(SparkSession sparkSession, String tableName, JavaRDD<MyRecord> rdd) {
        rdd.foreachPartition(iterator -> {
                    try (Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
                         //option 1.1 is to use Table table = connection.getTable(TableName.valueOf(tableName));
                         BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName))) {
                        while (iterator.hasNext()) {
                            MyRecord record = iterator.next();
                            Put put = new Put(Bytes.toBytes(record.getKey()));
                            put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.getMyValue()));
                            mutator.mutate(put);
                            //table.put(put);
                        }
                    }
                }
        );
    }

    /**
     * @param sparkSession
     * @param tableName
     * @param rdd
     * @throws IOException
     */
    public static void optionTwo(SparkSession sparkSession, String tableName, JavaRDD<MyRecord> rdd) throws IOException {
        Configuration config = new Configuration();
        config.set(TableOutputFormat.OUTPUT_TABLE, tableName);
        Job jobConfig = Job.getInstance(config);
        jobConfig.setOutputFormatClass(TableOutputFormat.class);
        rdd.mapToPair(record -> {
            Put put = new Put(Bytes.toBytes(record.getKey()));
            put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("v"), Bytes.toBytes(record.getMyValue()));
            return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(put.getRow()), put);
        }).saveAsNewAPIHadoopDataset(jobConfig.getConfiguration());
    }

    public static void main(String[] args) throws IOException {
        // init spark session
        SparkSession spark = SparkSession.builder().appName("Spark HBase export")
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .config("spark.ui.showConsoleProgress", "false").getOrCreate();
        String tableName = "testTable";
        JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
        JavaRDD<MyRecord> rdd1 = javaSparkContext.parallelize(Arrays.asList(new MyRecord("key_1", "value_1"), new MyRecord("key_2", "value_2")), 1);
        JavaRDD<MyRecord> rdd2 = javaSparkContext.parallelize(Arrays.asList(new MyRecord("key_3", "value_3"), new MyRecord("key_4", "value_4")), 1);
        optionOne(spark, tableName, rdd1);
        optionTwo(spark, tableName, rdd2);
    }
}

Map Apache HBase scans to Apache Spark RDDs.

In the next examples we will load data from HBase with Apache Spark.

To query hbase you could use these two java classes:

import org.apache.hadoop.hbase.mapreduce.{MultiTableInputFormat, TableInputFormat}

MultiTableInputFormat could be used for reading data from multiple tables or using different scans from a single table (e.g. if a hbase table uses salting)

def createResultRDD(sc: SparkContext, scans: Array[Scan]): RDD[Result] = {
 val conf = createConfig()
 val scanDefinitions = scans.map(s => {
   convertScanToString(s)
 })
 conf.setStrings(MultiTableInputFormat.SCANS, scanDefinitions: _*)
 val rdd = sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
 rdd.map(pair => pair._2)
}

TableInputFormat could be used for reading data from a single table:

def createResultRDD(sc: SparkContext, tableNameString: String, scan: Scan): RDD[Result] = {
    val conf = createConfig()
    val scanString = convertScanToString(scan)
    conf.set(TableInputFormat.INPUT_TABLE, tableNameString)
    conf.set(TableInputFormat.SCAN, scanString )
    val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    rdd.map(pair => pair._2)
}

Since scan definitions are passed as a String property converting scan objects to string could be done as shown below:

def convertScanToString(scan: Scan): String = {
 val proto = ProtobufUtil.toScan(scan)
 Base64.encodeBytes(proto.toByteArray())
}

Below the complete example for scala and java with a main methods:

Scala:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{MultiTableInputFormat, TableInputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory

/**
  * this object exhibits simple pattern for HBase Queries using apache spark
  */
object ExportFromHBase {
  val logger = LoggerFactory
    .getLogger(this.getClass.getName)

  val SCAN_DURATION: Long = 1000 * 60 * 15 // 15 minutes

  /**
    * create RDD for different scans
    * @param sc
    * @param options
    * @param scans
    * @return
    */
  def createResultRDD(sc: SparkContext, scans: Array[Scan]): RDD[Result] = {
    val conf = createConfig()
    val scanDefinitions = scans.map(s => {
      convertScanToString(s)
    })
    conf.setStrings(MultiTableInputFormat.SCANS, scanDefinitions: _*)
    val rdd = sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    rdd.map(pair => pair._2)
  }

  /**
    * single table
    *
    * @param sc
    * @param tableNameString
    * @param scan
    * @return
    */
  def createResultRDD(sc: SparkContext, tableNameString: String, scan: Scan): RDD[Result] = {
    val conf = createConfig()
    val scanString = convertScanToString(scan)
    conf.set(TableInputFormat.INPUT_TABLE, tableNameString)
    conf.set(TableInputFormat.SCAN, scanString )
    val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    rdd.map(pair => pair._2)
  }

  /**
    * create config for a hbase queries
    * @return
    */
  def createConfig(): Configuration = {
    val conf = HBaseConfiguration.create()
    // when using Spark with HBase, scans are usually large
    // this results in timeout exceptions, since the default 1 minute timeout is too small
    val timeoutMillis: String = SCAN_DURATION.toString
    conf.set("hbase.client.scanner.timeout.period", timeoutMillis)
    conf.set("hbase.rpc.timeout", timeoutMillis)
    conf
  }

  /**
    * convert scan to string
    * @param scan
    * @return
    */
  def convertScanToString(scan: Scan): String = {
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray())
  }

  /**
    *
    * @param args
    */
  def main(args: Array[String]): Unit = {
    // init spark session
    val spark = SparkSession.builder().appName("Spark HBase export")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.ui.showConsoleProgress", "false").getOrCreate()
    import spark.implicits._;
    val tableName_1 = "testTable"
    val tableName_2 = "testTable"
    // case 1 multi tables
    val startRow_1 = Bytes.toBytes("key_1")
    val stopRow_2 = Bytes.toBytes("key_3") // exclusive
    val startRow_3 = Bytes.toBytes("key_3")
    val stopRow_4 = Bytes.toBytes("key_5") // exclusive
    val columnFamily = Bytes.toBytes("c")
    val scan_1 = new Scan(startRow_1, stopRow_2)
    scan_1.addFamily(columnFamily)
    scan_1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, TableName.valueOf(tableName_1).getName)
    val scan_2 = new Scan(startRow_3, stopRow_4)
    scan_2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, TableName.valueOf(tableName_2).getName)
    // map to RDD[result]
    val rddMultiTable =  createResultRDD(spark.sparkContext, Array(scan_1, scan_2))
    val count_1 =rddMultiTable.count()
    logger.info(s"Scan 1 ${count_1}")
    // now you can further process RDD e.g. map to dataset
    val rddSingleTable = createResultRDD(spark.sparkContext, tableName_1, scan_1)
    val count_2 = rddSingleTable.count()
    logger.info(s"Scan 2 ${count_2}")
    // ...
  }
}

Java:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

import java.io.IOException;

public class ExportFromHBaseJava {

    public static Logger logger = LoggerFactory
            .getLogger(ExportFromHBaseJava.class);

    public static long  SCAN_DURATION  = 1000 * 60 * 15; // 15 minutes

    /**
     * create RDD for different scans
     * @param sc
     * @param options
     * @param scans
     * @return
     */
    public static JavaRDD<Result> createResultRDD(JavaSparkContext sc, Scan[] scans) throws IOException{
        Configuration conf = createConfig();
        String[] scanDefinitions = new String[scans.length];
        for(int i = 0; i < scanDefinitions.length; i++){
            scanDefinitions[i] = convertScanToString(scans[i]);
        }
        conf.setStrings(MultiTableInputFormat.SCANS, scanDefinitions);
        JavaPairRDD<ImmutableBytesWritable, Result> rdd = sc.newAPIHadoopRDD(conf, MultiTableInputFormat.class, ImmutableBytesWritable.class, Result.class);
        return rdd.map(pair ->  pair._2());
    }

    /**
     * single table
     *
     * @param sc
     * @param tableNameString
     * @param scan
     * @return
     */
    public static JavaRDD<Result> createResultRDD(JavaSparkContext sc, String tableNameString, Scan scan) throws IOException {
        Configuration conf = createConfig();
        String scanString = convertScanToString(scan);
        conf.set(TableInputFormat.INPUT_TABLE, tableNameString);
        conf.set(TableInputFormat.SCAN, scanString );
        JavaPairRDD<ImmutableBytesWritable, Result> rdd = sc.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
        return rdd.map(pair -> pair._2());
    }

    /**
     * create config for a hbase queries
     * @return
     */
    public static Configuration createConfig() {
        Configuration conf = HBaseConfiguration.create();
        // when using Spark with HBase, scans are usually large
        // this results in timeout exceptions, since the default 1 minute timeout is too small
        String timeoutMillis = String.valueOf(SCAN_DURATION);
        conf.set("hbase.client.scanner.timeout.period", timeoutMillis);
        conf.set("hbase.rpc.timeout", timeoutMillis);
       return  conf;
    }

    /**
     * convert scan to string
     * @param scan
     * @return
     */
    private static String convertScanToString(Scan scan) throws IOException {
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        return Base64.encodeBytes(proto.toByteArray());
    }


    public static void main(String[] args) throws IOException {
        // init spark session
        SparkSession spark = SparkSession.builder().appName("Spark HBase export")
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .config("spark.ui.showConsoleProgress", "false").getOrCreate();
        String tableName_1 = "testTable";
        String tableName_2 = "testTable";
        // case 1 multi tables
        byte[] startRow_1 = Bytes.toBytes("key_1");
        byte[] stopRow_2 = Bytes.toBytes("key_3"); // exclusive
        byte[] startRow_3 = Bytes.toBytes("key_3");
        byte[] stopRow_4 = Bytes.toBytes("key_5"); // exclusive
        byte[] columnFamily = Bytes.toBytes("c");
        Scan scan_1 = new Scan(startRow_1, stopRow_2);
        scan_1.addFamily(columnFamily);
        scan_1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, TableName.valueOf(tableName_1).getName());
        Scan scan_2 = new Scan(startRow_3, stopRow_4);
        scan_2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, TableName.valueOf(tableName_2).getName());
        // map to RDD[result]
        JavaRDD<Result> rddMultiTable =  createResultRDD(JavaSparkContext.fromSparkContext(spark.sparkContext()),
                new Scan[]{scan_1, scan_2});
        long count_1 =rddMultiTable.count();
        logger.info("Scan 1: " + count_1);
        // now you can further process RDD e.g. map to dataset
        JavaRDD<Result>  rddSingleTable = createResultRDD(JavaSparkContext.fromSparkContext(spark.sparkContext()), tableName_1, scan_1);
        long count_2 = rddSingleTable.count();
        logger.info("Scan 2: " + count_2);
        // ...
    }
}