Spark SQL通用Load/Save函数
在最简单的形式中,默认数据源parquet(除非spark.sql.sources.default另有配置)将用于所有操作。在中的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
Spark SQL通用加载/保存函数
在最简单的形式中,默认数据源parquet(除非spark.sql.sources.default另有配置)将用于所有操作。
df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
在Spark repo中的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
一、手动指定选项
你还可以手动指定将要使用的数据源以及要传递给数据源的任何额外选项。数据源由其完全限定的名称(即org.apache.spark.sql.parquet)指定,但对于内置源,你也可以使用它们的短名称(json, parquet, jdbc, orc, libsvm, csv, text)。从任何数据源类型加载的DataFrames都可以使用此语法转换为其他类型。
有关内置源的可用选项,请参阅API文档,例如org.apache.spark.sql.DataFrameReader 和 org.apache.spark.sql.DataFrameWriter。那里记录的选项也适用于非Scala Spark API(例如PySpark)。有关其他格式,请参阅特定格式的API文档。
要加载JSON文件,可以使用:
df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
在Spark repo中的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
要加载CSV文件,可以使用:
df = spark.read.load("examples/src/main/resources/people.csv",
format="csv", sep=";", inferSchema="true", header="true")
在Spark repo的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
额外的选项也用于写入操作。例如,可以控制ORC数据源的布隆过滤器和字典编码。以下ORC示例将创建bloom过滤器,并仅对favorite_color使用字典编码。对于Parquet,也有parquet.bloom.filter.enabled和parquet.enable.dictionary。要查找有关额外ORC/Parquet选项的更多详细信息,请访问Apache ORC / Parquet官方网站。
ORC数据源:
df = spark.read.orc("examples/src/main/resources/users.orc")
(df.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc"))
在Spark repo的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
Parquet数据源:
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet"))
在Spark repo中的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
二、直接在文件上运行SQL
除了使用read API将文件加载到DataFrame中并进行查询之外,还可以直接使用SQL查询该文件。
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
在Spark repo中的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
三、保存模式
保存操作可以选择使用SaveMode,它指定如何处理现有数据(如果存在)。重要的是要认识到,这些保存模式不利用任何锁定,也不是原子的。此外,在执行Overwrite时,将在写入新数据之前删除数据。
Scala/Java | Any Language | 含义 |
---|---|---|
SaveMode.ErrorIfExists (default) | “error” or “errorifexists” (default) | 将DataFrame保存到数据源时,如果数据已经存在,则会引发异常。 |
SaveMode.Append | “append” | 在将DataFrame保存到数据源时,如果数据/表已经存在,则需要将DataFrame的内容附加到现有数据中。 |
SaveMode.Overwrite | “overwrite” | 覆盖模式意味着在将DataFrame保存到数据源时,如果数据/表已经存在,则预计现有数据将被DataFrame的内容覆盖。 |
SaveMode.Ignore | “ignore” | 忽略模式意味着在将DataFrame保存到数据源时,如果数据已经存在,则保存操作不会保存DataFrame的内容,也不会更改现有数据。这类似于SQL中的CREATE TABLE IF NOT EXISTS。 |
四、保存到持久表
也可以使用saveAsTable命令将DataFrames作为持久表保存到Hive metastore中。请注意,使用此功能不需要现有的Hive部署。Spark将为你创建一个默认的本地Hive metastore(使用Derby)。与createOrReplaceTempView命令不同,saveAsTable将物化DataFrame的内容,并创建一个指向Hive metastore中数据的指针。即使在Spark程序重新启动后,只要保持与同一metastore的连接,持久表仍然存在。通过使用表的名称在SparkSession上调用table方法,可以创建持久表的DataFrame。
对于基于文件的数据源,例如text、parquet、json等,你可以通过路径选项指定自定义表路径,例如df.write.option(“path”, “/some/path”).saveAsTable(“t”)。删除表时,不会删除自定义表路径,并且表数据仍在那里。如果没有指定自定义表路径,Spark会将数据写入仓库目录下的默认表路径。当表被删除时,默认的表路径也将被删除。
从Spark 2.1开始,持久数据源表的每个分区metadata都存储在Hive metastore中。这带来了几个好处:
- 由于metastore只为查询返回必要的分区,因此不再需要在表的第一个查询中发现所有分区。
- Hive DDL,如ALTER TABLE PARTITION。。。SET LOCATION现在可用于使用Datasource API创建的表。
请注意,在创建外部数据源表(带有路径选项的表)时,默认情况下不会收集分区信息。要同步metastore中的分区信息,可以调用MSCK REPAIR TABLE。
五、分桶Bucketing,排序和分区
对于基于文件的数据源,也可以对输出进行分桶、排序或分区。bucket和排序只适用于持久化表:
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
在Spark repo中的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
而在使用Dataset api时,分区可以与save和saveAsTable一起使用。
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
可以对单个表同时使用分区和bucket:
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed"))
在Spark repo的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
partitionBy创建一个目录结构,如Partition Discovery部分所述。因此,它对具有高基数(cardinality)的列的适用性有限。相比之下,bucketBy将数据分布在固定数量的buckets中,并且可以在唯一值的数量不受限制时使用。
更多推荐
所有评论(0)