83.Spark基础
这部分以最低最简单的级别来讨论Spark和HBase的集成。所有其他的交互点都是建立在这个地方讨论的概念之上。
所有Spark和HBase集成的根是HBaseContext
。HBaseContext携带着HBase的配置信息,并将其推送到Spark的执行器上。这就使得我们在每个Spark执行器上有一个HBase的连接。
为了引用,Spark执行器可以作为Region Servers
在相同的节点上,没有协同定位的依赖时,可以在不同的节点上。考虑每个Spark执行器作为一个多线程客户应用程序。这允许任何Spark任务运行在执行器上来访问共享的Connection对象。
例子 47. HBaseContext
使用的例子
这个例子显示了HBaseContext
能够被用Scala在一个RDD上做foreachPartition
:
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
...
val hbaseContext = new HBaseContext(sc, config)
rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
it.foreach((putRecord) => {
. val put = new Put(putRecord._1)
. putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
. bufferedMutator.mutate(put)
})
bufferedMutator.flush()
bufferedMutator.close()
})
这里是用Java来实现相同的例子:
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
List<byte[]> list = new ArrayList<>();
list.add(Bytes.toBytes("1"));
...
list.add(Bytes.toBytes("5"));
JavaRDD<byte[]> rdd = jsc.parallelize(list);
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.foreachPartition(rdd,
new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
public void call(Tuple2<Iterator<byte[]>, Connection> t)
throws Exception {
Table table = t._2().getTable(TableName.valueOf(tableName));
BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
while (t._1().hasNext()) {
byte[] b = t._1().next();
Result r = table.get(new Get(b));
if (r.getExists()) {
mutator.mutate(new Put(b));
}
}
mutator.flush();
mutator.close();
table.close();
}
});
} finally {
jsc.stop();
}
在Spark和HBase中的所有功能都支持Scala和Java,除了SparkSQL,也支持Spark支持的任意语言。这个文档的剩下部分,现在我们将集中于Scala例子
以上例子描述了如何连接,及做foreachPartition。许多其他的Spark hbase支持的函数没有在这里面。
bulkPut
用于大规模并行发送puts
到HBase
bulkDelete
用于大规模并行发送deletes
到HBase
bulkGet
用于大规模并行发送gets
到HBase来创建一个新的RDD
mapPartition
用连接对象做一个Spark Map函数来全局访问HBase。
hBaseRDD
简化一个分布式scan
来创建一个RDD
对于所有这些功能有的例子,请查阅HBase-Spark Module。