86.SparkSQL/DataFrames
HBase-Spark Connector(在HBase-Spark 模块中)手段DataSource API(SPARK-3247)是在Spark-1.2.0引入的,在简单的HBase KV存储和复杂的关系型SQL查询之间架起了桥梁,使得用户可以在HBase上使用Spark执行复杂的数据分析工作。HBase Dataframe是一个标准的Spark Dataframe,能够与任何其他的数据源进行交互,比如Hive,Orc,Parquet,JSON等。HBase-Spark Connector应用了关键技术,如分区剪枝(partition pruning),列剪枝(column pruning),谓詞下推(predicate pushdown)和数据局部性(data locality)。
要使用HBase-Spark Connector,用户需要定义在HBase和Spark表之间的映射关系的schema目录,准备数据,并且填充到HBase表中,然后加载HBase Dataframe。之后,用户可以使用SQL查询做集成查询和访问记录HBase的表。以下描述了这个的基本步骤:
86.1. 定义目录(Define catalog)
def catalog = s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
目录定义了HBase和Spark表之间的一个映射。这个目录有两个重要的部分。一个是rowkey的定义,另一个是在Spark的表列与HBase的列簇和列修饰符的映射。以上定义了一个命名为table1,row key为key,和一些列(col1-col8)的HBase表的schema。注意,rowkey也必须作为一列(col0)定义在详细中,这一列有一个特定的列簇(rowkey)。
86.2. 保存DataFrame
case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte)
object HBaseRecord
{
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s"String$i: $t",
i.toByte)
}
}
val data = (0 to 255).map { i => HBaseRecord(i, "extra")}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark ")
.save()
用户准备的数据是一个本地的Scala集合,里面有256个HBaseRecord对象。sc.parallelize(data)
函数分发数据形成一个RDD。toDF
返回一个DataFrame。write
函数返回一个DataFrameWriter,它用于写DataFrame到外部存储系统(例如这里的HBase)。用指定的一个schema目录给出一个DataFrame,save
函数将创建一个有5个分区的HBase表,并且保存DataFrame在里面。
86.3. 加载 DataFrame
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.hadoop.hbase.spark")
.load()
}
val df = withCatalog(catalog)
在withCatalog函数里面,sqlContext是一个SQLContext变量,它是在Spark里使用结构化数据(行和列)的入口。read返回一个DataFrameReader,它用用于把数据写入一个DataFrame。option函数用于对隐式数据源增加输入参数到DataFrameReader,format函数对DataFrameReader指定输入数据源格式。通过withCatalog函数返回的dataFrame df,常被用于访问HBase表,例如4.4和4.5。
86.4. 语言集成查询
val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
$"col0" === "row005" ||
$"col0" <= "row005")
.select("col0", "col1", "col4")
s.show
DataFrame能够做各种操作,例如join,sort,select,filter,orderBy等等。上面的df.filter使用给出的SQL表达式过滤行。select选取一个列集合:col0,col1和col4。
86.5. SQL 查询
sqlContext.sql("select count(col1) from table1").show
registerTempTable注册df DataFrame为一个名为table1的临时表。这个临时表的生存期与创建df的SQLContext相关。sqlContext.sql函数允许用户执行SQL查询。
86.6. 其他
Example 52. 使用时间戳查询
在HBaseSparkConf中,有4个与时间戳相关的参数可以被设置。他们是分别是TIMESTAMP, MIN_TIMESTAMP, MAX_TIMESTAMP和MAX_VERSIONS。用户使用不同的时间戳或用MIN_TIMESTAMP和MAX_TIMESTAMP时间范围来查询记录。同时,用户使用具体的值来代替以下例子中的tsSpecified和oldMs。
以下例子显示了如何使用时间戳来加载DataFrame df。tsSpecified由用户指定。HBaseTableCatalog定义了HBase和Relation的关系schema。writeCatalog定义了对于schema映射的目录。
val df = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString))
.format("org.apache.hadoop.hbase.spark")
.load()
以下例子显示了如何使用时间范围加载DataFrame df。oldMs由用户指定。
val df = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString))
.format("org.apache.hadoop.hbase.spark")
.load()
加载DataFrame df之后,用户可以查询数据:
df.registerTempTable("table")
sqlContext.sql("select count(col1) from table").show
例子 53. 原生的Avro支持
HBase-Spark Connector支持不同的数据格式,如Avro,Jason等。以下的使用情况显示了Spark是如何支持Avro的。用户可以直接持久化Avro记录到HBase。在内部,Avro schema是被自动地转换成一个原生的Spark Catalyst数据类型。注意,HBase表的key-value部分都能够被定义成Avro格式。
1)定义schema映射的目录:
def catalog = s"""{
|"table":{"namespace":"default", "name":"Avrotable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
|}
|}""".stripMargin
catalog是一个名为Avrotable的HBase表的schema。row key是作为一个key,也必须定义为一列(col0),这列有一个指定的列簇,即rowkey。
2)准备数据:
object AvroHBaseRecord {
val schemaString =
s"""{"namespace": "example.avro",
| "type": "record", "name": "User",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": ["int", "null"]},
| {"name": "favorite_color", "type": ["string", "null"]},
| {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
| {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
| ] }""".stripMargin
val avroSchema: Schema = {
val p = new Schema.Parser
p.parse(schemaString)
}
def apply(i: Int): AvroHBaseRecord = {
val user = new GenericData.Record(avroSchema);
user.put("name", s"name${"%03d".format(i)}")
user.put("favorite_number", i)
user.put("favorite_color", s"color${"%03d".format(i)}")
val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema())
favoriteArray.add(s"number${i}")
favoriteArray.add(s"number${i+1}")
user.put("favorite_array", favoriteArray)
import collection.JavaConverters._
val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava
user.put("favorite_map", favoriteMap)
val avroByte = AvroSedes.serialize(user, avroSchema)
AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte)
}
}
val data = (0 to 255).map { i =>
AvroHBaseRecord(i)
}
首先定义schemaString,然后它被解析得到avroSchema。avroSchema是被用于产生AvroHBaseRecord。用户准备的数据是一个本地的Scala集合,里面有256个AvroHBaseRecord对象。
3) 保存 DataFrame:
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
用指定的schema catalog给出一个DataFrame,以上将创建一个有5个regions的HBase表,并且保存在DataFrame里面。
4) 加载 DataFrame
def avroCatalog = s"""{
|"table":{"namespace":"default", "name":"avrotable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}
val df = withCatalog(catalog)
在withCatalog函数里面,read函数返回一个DataFrameReader,DataFrameReader可以被用于读取数据从一个DataFrame。option函数对隐式的数据源增加输入参数给DataFrameReader。有两个配置项(options):一个是用AvroHBaseRecord.schemaString设置avroSchema,另外一个是用avroCatalog设置HBaseTableCatalog.tableCatalog。load()函数加载输入到一个DataFrame。通过withCatalog函数返回的dataFrame df,常被用于访问HBase表。
5) SQL查询
val c = sqlContext.sql("select count(1) from avrotable").
加载DataFrame df之后,用户可以查询数据。registerTempTable注册DataFrame df为一个名为avrotable的临时表。sqlContext.sql函数允许用户执行SQL查询。