程序指定Schema

当样本类无法提前定义时(例如,记录的结构是编码成一个字符串,或一个文本数据集将会被解析且对于不同的用户,会投影成不同的字段),DataFrame能够用以下三步来创建。

  1. 从原始的RDD创建一个行的RDD(an RDD of Rows)
  2. 创建schema,schema由一个StructType匹配由步骤1创建的行的RDD的结构呈现
  3. 通过由SQLContext提供的createDataFrame方法,应用行的RDD的schema

例如:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)

results matching ""

    No results matching ""