85.批量加载(Bulk Load)

使用Spark指加载数据到HBase有两种选择。有基本的批量加载功能,适用于,当你的行有数以百万的列,以及,在Spark批量加载过程的map端之前,你的列没有固定和分区的情况下。

Spark还有一个“微型”的记录批量加载(thin record bulk load)选择,这个选择的设计,是对于那些每行少于10k个列的表格。这个选择的优势在于,具有更高的生产能力,及在Spark混洗(shuffle)操作中,更少的覆盖所有的加载。

两上实现的工作方式差不多,就像MapReduce的批量加载过程,在这个过程,partitioner会根据region的分割对rowkeys进行分区,并且,row keys会被排好序,发送到reducers,以至于HFiles能够直接从reduce阶段写输出。

用Spark术语,批量加载将被一个Spark repartitionAndSortWithinPartitions和随后的一个Spark foreachPartition来实现。

首先让我们来看一个使用基本的批量加载功能的例子:

例子 49. 批量加载例子

以下例子显示了在Spark中批量加载:

val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

val hbaseContext = new HBaseContext(sc, config)

val stagingFolder = ...
val rdd = sc.parallelize(Array(
      (Bytes.toBytes("1"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
      (Bytes.toBytes("3"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...

rdd.hbaseBulkLoad(TableName.valueOf(tableName),
  t => {
   val rowKey = t._1
   val family:Array[Byte] = t._2(0)._1
   val qualifier = t._2(0)._2
   val value = t._2(0)._3

   val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)

   Seq((keyFamilyQualifier, value)).iterator
  },
  stagingFolder.getPath)

val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
  conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

hbaseBulkLoad函数带了3个必选参数:

  • 表名,即我们要批量加载到哪个表。

  • 一个函数,这个函数将转换一个RDD记录成一个key-value鍵值对元组。在这个元组里,key是一个KeyFamilyQualifer对象,value是一个cell的值。KeyFamilyQualifer对象将保存RowKey, Column Family(列簇), Column Qualifier(列修饰符)。混洗将对RowKey分区,而不是对所有的三个值进行排序。

  • 临时目录,HFile要被写到的输出目录。

接下来的Spark批量加载命令是,使用HBase的LoadIncrementalHFiles对象来加载新创建的HFiles到HBase。

对于用Spark批量加载的额外参数

你可以在hbaseBulkLoad时,用额外的参数来设置以下属性:

  • 最大的HFiles文件大小

  • 一个标记,是否采用HFiles压缩

  • 对于压缩,布隆类型(bloomType),块大小和数据块编码的列簇设置。

现在,让我一起来看一下,如何调用“微型”的记录批量加载实现。

例子 51. 使用“微型”记录批量加载

val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

val hbaseContext = new HBaseContext(sc, config)

val stagingFolder = ...
val rdd = sc.parallelize(Array(
      ("1",
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
      ("3",
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...

rdd.hbaseBulkLoadThinRows(hbaseContext,
      TableName.valueOf(tableName),
      t => {
        val rowKey = t._1

        val familyQualifiersValues = new FamiliesQualifiersValues
        t._2.foreach(f => {
          val family:Array[Byte] = f._1
          val qualifier = f._2
          val value:Array[Byte] = f._3

          familyQualifiersValues +=(family, qualifier, value)
        })
        (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
      },
      stagingFolder.getPath,
      new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
      compactionExclude = false,
      20)

val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
  conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

注意:在使用“微型”批量加载时,最大的区别是,函数返回了一个元组,元组的第一个值是row key,第二个值是一个FamiliesQualifiersValues对象,这个对象包含了这行所有列簇对应的所有的值。

results matching ""

    No results matching ""