外部集合
Spark可以从任何Hadoop支持的存储源中来创建分布式数据集,包挌本地文件系统,HDFS,Cassandra,HBase,Amazon S3等等.Spark支持文本文件,序列文件及任何其他的hadoop输入格式。
文本文件RDDs可以使用SparkContext的textFile方法被创建。这个方法的参数是文件的URI(可以是一个本地路径,或者hdfs://,s3n://,或者URI),并且读取成行的集合。以下是一个例子:
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt
MapPartitionsRDD[10] at textFile at <console>:26
一旦被创建,distFile可用于数据集操作。例如,我们可以使用以下的map和reduce操作来累加所有行的大小:
distFile.map(s => s.length).reduce((a, b) => a + b).
使用Spark来读取文件需要注意:
- 如果正在使用的是本地文件系统的路径,文件必须在worker节点上的相同路径上都能够被访问。可以拷贝文件到所有的worker节点,也可以使用网络挂载来共享文件系统。
- 所有Spark的基于文件的输入方法,包括textFile,支持运行在目录上,压缩文件,以及通配符。例如你可以使用textFile("/my/directory"),textFile("/my/directory/.txt")和textFile("/my/directory/.gz")。
- textFile方法也可以使用第二个参数来控制文件分区的数量。默认情况下,Spark为文件的每个块创建一个分区(在HDFS中,块的默认大小为64MB),但是,你也可以通过使用一个更大的值,来要求一个更大的分区数量。注意:你不能指定一个比块数量还小的分区数量。
除了文本文件,Spark的Scala API也支持一些其他的数据格式:
SparkContext.wholeTextFiles可以帮你读取一个包含多个小文本文件的目录,并且以(文件名,内容)的形式返回目录下的每一个文件。与texFile相比,它将返回每个文件的每一行作为一个记录。
对于SequenceFiles文件,使用SparkContext的sequenceFile[K, V]方法,其中,K和V是文件中的key和value的类型。这些应该是Hadoop的Writable接口的子类,如IntWritable和Text。另外,Spark允许你指定少量的常用Writables的原生类型,例如sequenceFile[Int, String]将自动读取IntWritables和Texts。
- 对于其他的Hadoop输入格式,你可以使用SparkContext.hadoopRDD方法,它要指定一个JobConf和输入格式类,key类和value类。对于一个使用你自己的输入源的Hadoop作业,用相同的方式设置他们。你也可以使用基于新的MapReduce API(org.apache.hadoop.mapreduce)的SparkContext.newAPIHadoopRDD。
- RDD.saveAsObjectFile和SparkContext.objectFile支持以一个包含序列化的Java对象的简单格式来保存一个RDD。然而,这并不是一个有效的方式,当指定格式为Avro的时候,它提供了一个简单的方式来保存任何RDD。