Schema合并
像ProtocolBuffer,Avro,和Thrift一样,Parquet也支持schema进化。用户可以以一个简单的schema开始,随后逐渐加入更多需要的列到schema。这种方式,用户最后可能导致有多个Parquet文件,但相互兼容的schema。Parquet数据源现在可能自动地探测这种情况,且合并所有文件的schema。
因为schema合并是一个相对开销大的操作,在很多情况下并非必需,从版本1.5.0开始,我们默认关闭了它。你也可以启用它,通过以下:
- 当你读取Parquet文件(如以下例子所示),设置数据源选项
mergeSchema
为true
,或者 - 设置全局SQL选项
spark.sql.parquet.mergeSchema
为true
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")
// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)