85.批量加载(Bulk Load)
使用Spark指加载数据到HBase有两种选择。有基本的批量加载功能,适用于,当你的行有数以百万的列,以及,在Spark批量加载过程的map端之前,你的列没有固定和分区的情况下。
Spark还有一个“微型”的记录批量加载(thin record bulk load)选择,这个选择的设计,是对于那些每行少于10k个列的表格。这个选择的优势在于,具有更高的生产能力,及在Spark混洗(shuffle)操作中,更少的覆盖所有的加载。
两上实现的工作方式差不多,就像MapReduce的批量加载过程,在这个过程,partitioner会根据region的分割对rowkeys进行分区,并且,row keys会被排好序,发送到reducers,以至于HFiles能够直接从reduce阶段写输出。
用Spark术语,批量加载将被一个Spark repartitionAndSortWithinPartitions
和随后的一个Spark foreachPartition
来实现。
首先让我们来看一个使用基本的批量加载功能的例子:
例子 49. 批量加载例子
以下例子显示了在Spark中批量加载:
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val stagingFolder = ...
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
(Bytes.toBytes("3"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
rdd.hbaseBulkLoad(TableName.valueOf(tableName),
t => {
val rowKey = t._1
val family:Array[Byte] = t._2(0)._1
val qualifier = t._2(0)._2
val value = t._2(0)._3
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
Seq((keyFamilyQualifier, value)).iterator
},
stagingFolder.getPath)
val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
hbaseBulkLoad函数带了3个必选参数:
表名,即我们要批量加载到哪个表。
一个函数,这个函数将转换一个RDD记录成一个key-value鍵值对元组。在这个元组里,key是一个KeyFamilyQualifer对象,value是一个cell的值。KeyFamilyQualifer对象将保存RowKey, Column Family(列簇), Column Qualifier(列修饰符)。混洗将对RowKey分区,而不是对所有的三个值进行排序。
临时目录,HFile要被写到的输出目录。
接下来的Spark批量加载命令是,使用HBase的LoadIncrementalHFiles
对象来加载新创建的HFiles到HBase。
对于用Spark批量加载的额外参数
你可以在hbaseBulkLoad
时,用额外的参数来设置以下属性:
最大的HFiles文件大小
一个标记,是否采用HFiles压缩
对于压缩,布隆类型(bloomType),块大小和数据块编码的列簇设置。
现在,让我一起来看一下,如何调用“微型”的记录批量加载实现。
例子 51. 使用“微型”记录批量加载
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val stagingFolder = ...
val rdd = sc.parallelize(Array(
("1",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
("3",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
rdd.hbaseBulkLoadThinRows(hbaseContext,
TableName.valueOf(tableName),
t => {
val rowKey = t._1
val familyQualifiersValues = new FamiliesQualifiersValues
t._2.foreach(f => {
val family:Array[Byte] = f._1
val qualifier = f._2
val value:Array[Byte] = f._3
familyQualifiersValues +=(family, qualifier, value)
})
(new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
},
stagingFolder.getPath,
new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude = false,
20)
val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
注意:在使用“微型”批量加载时,最大的区别是,函数返回了一个元组,元组的第一个值是row key,第二个值是一个FamiliesQualifiersValues对象,这个对象包含了这行所有列簇对应的所有的值。