83.Spark基础

这部分以最低最简单的级别来讨论Spark和HBase的集成。所有其他的交互点都是建立在这个地方讨论的概念之上。

所有Spark和HBase集成的根是HBaseContext。HBaseContext携带着HBase的配置信息,并将其推送到Spark的执行器上。这就使得我们在每个Spark执行器上有一个HBase的连接。

为了引用,Spark执行器可以作为Region Servers在相同的节点上,没有协同定位的依赖时,可以在不同的节点上。考虑每个Spark执行器作为一个多线程客户应用程序。这允许任何Spark任务运行在执行器上来访问共享的Connection对象。

例子 47. HBaseContext 使用的例子

这个例子显示了HBaseContext能够被用Scala在一个RDD上做foreachPartition:

val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

...

val hbaseContext = new HBaseContext(sc, config)

rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
 val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
 it.foreach((putRecord) => {
. val put = new Put(putRecord._1)
. putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
. bufferedMutator.mutate(put)
 })
 bufferedMutator.flush()
 bufferedMutator.close()
})

这里是用Java来实现相同的例子:

JavaSparkContext jsc = new JavaSparkContext(sparkConf);

try {
  List<byte[]> list = new ArrayList<>();
  list.add(Bytes.toBytes("1"));
  ...
  list.add(Bytes.toBytes("5"));

  JavaRDD<byte[]> rdd = jsc.parallelize(list);
  Configuration conf = HBaseConfiguration.create();

  JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);

  hbaseContext.foreachPartition(rdd,
      new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
   public void call(Tuple2<Iterator<byte[]>, Connection> t)
        throws Exception {
    Table table = t._2().getTable(TableName.valueOf(tableName));
    BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
    while (t._1().hasNext()) {
      byte[] b = t._1().next();
      Result r = table.get(new Get(b));
      if (r.getExists()) {
       mutator.mutate(new Put(b));
      }
    }

    mutator.flush();
    mutator.close();
    table.close();
   }
  });
} finally {
  jsc.stop();
}

在Spark和HBase中的所有功能都支持Scala和Java,除了SparkSQL,也支持Spark支持的任意语言。这个文档的剩下部分,现在我们将集中于Scala例子

以上例子描述了如何连接,及做foreachPartition。许多其他的Spark hbase支持的函数没有在这里面。

bulkPut 用于大规模并行发送puts到HBase

bulkDelete 用于大规模并行发送deletes到HBase

bulkGet 用于大规模并行发送gets到HBase来创建一个新的RDD

mapPartition 用连接对象做一个Spark Map函数来全局访问HBase。

hBaseRDD 简化一个分布式scan来创建一个RDD

对于所有这些功能有的例子,请查阅HBase-Spark Module。

results matching ""

    No results matching ""