檔案分割和分組是 Spark SQL 中常見的最佳化技術。這些技術可預先彙總檔案或目錄中的資料,協助減少資料扭曲和資料隨機置換。DataFrame 可在儲存為永久性表格時排序、分割和/或分組。分割依照指定欄將檔案儲存在目錄階層中,以將讀取最佳化。例如,當我們依照年份分割 DataFrame:
df.write.format("parquet")
.partitionBy("year")
.option("path", "/data ")
.saveAsTable("taxi")
目錄會有以下結構:
After partitioning the data, when queries are made with filter operators on the partition column, the Spark SQL catalyst optimizer pushes down the partition filter to the datasource. The scan reads only the directories that match the partition filters, reducing disk I/O and data loaded into memory. For example, the following query reads only the files in the year ='2019' directory.
df.filter("year ='2019')
.groupBy("year").avg("fareamount")
When visualizing the physical plan for this query, you will see Scan PrunedInMemoryFileIndex[ /data/year=2019], PartitionFilters: [ (year =2019)] .
與分割相似,分組按照值來分隔資料。但是,分組會依照分組值上的雜湊值在固定數量的分組間分配資料,分割則是為每個分割欄值建立目錄。表格可以根據多個值來分組,而且可以選擇是否要分割。如果我們將分組加入上一個範例,目錄結構與之前的相同,年目錄中的資料檔案依照小時分為四組。
df.write.format("parquet")
.partitionBy("year")
.bucketBy(4,"hour")
.option("path", "/data ")
.saveAsTable("taxi")
將資料分組後,分組值上的彙總和聯結 (廣泛轉換) 不須在分割區之間隨機置換資料,所以也不會因而減少網路和磁碟 I/O。此外,分組篩選條件剪除 (pruning) 會推至資料來源,可減少磁碟 I/O 和載入到記憶體內的資料。下方的查詢將年份的分割篩選器下推至資料來源,避免以小時彙總的隨機置換。
df.filter("year ='2019')
.groupBy("hour")
.avg("hour")
分割僅應使用在經常用於篩選查詢的欄,且這些欄的欄值有限,具有足夠的對應資料來分配目錄中的檔案。小型檔案效率低,平行處理過於龐大,而大型檔案過少則會對平行處理有不良影響。當唯一的分組欄值數量很多,而且分組欄經常用於查詢時,分組的效果良好。