In Chapter 1, we explored how Spark DataFrames execute on a cluster. In this chapter, we’ll provide you with an overview of DataFrames and Spark SQL programming, starting with the advantages.
In Chapter 1, we explored how Spark DataFrames execute on a cluster. In this chapter, we’ll provide you with an overview of DataFrames and Spark SQL programming, starting with the advantages.
The Spark SQL and the DataFrame APIs provide ease of use, space efficiency, and performance gains with Spark SQL's optimized execution engine.
Spark SQL caches DataFrames (when you call dataFrame.cache) using an in-memory columnar format which is optimized to: scan only required columns, automatically tune compression, minimize memory usage and minimize JVM Garbage Collection.
Spark SQL Vectorized Parquet and ORC readers decompress and decode in column batches, which is roughly nine times faster for reading.
Spark SQL’s Catalyst Optimizer handles logical optimization and physical planning, supporting both rule-based and cost-based optimization. When possible, Spark SQL Whole-Stage Java Code Generation optimizes CPU usage by generating a single optimized function in bytecode for the set of operators in an SQL query.
Data preparation and exploration takes 60 to 80 percent of the analytical pipeline in a typical machine learning (ML) or deep learning (DL) project. In order to build an ML model, you have to clean, extract, explore, and test your dataset in order to find the features of interest that most contribute to the model’s accurate predictions. For illustrative purposes, we’ll use Spark SQL to explore the Taxi dataset to analyze which features might help predict taxi fare amounts.
The following code shows how we loaded the data from a CSV file into a Spark Dataframe, specifying the datasource and schema to load into a DataFrame, as discussed in Chapter 1. After we register the DataFrame as an SQL temporary view, we can use SQL functions on the SparkSession to run SQL queries, which will return the results as a DataFrame. We cache the DataFrame so that Spark does not have to reload it for each query. Also, Spark can cache DataFrames or Tables in columnar format in memory, which can improve memory usage and performance.
// load the data as in Chapter 1
val file = "/data/taxi_small.csv"
val df = spark.read.option("inferSchema", "false")
.option("header", true).schema(schema).csv(file)
// cache DataFrame in columnar format in memory
df.cache
// create Table view of DataFrame for Spark SQL
df.createOrReplaceTempView("taxi")
// cache taxi table in columnar format in memory
spark.catalog.cacheTable("taxi")
Now we can use Spark SQL to explore what might affect the taxi fare amount, with questions like: What is the average fare amount by hour of the day?
%sql
select hour, avg(fare_amount)
from taxi
group by hour order by hour
With a notebook like Zeppelin or Jupyter, we can display the SQL results in graph formats.
Following is the same query with the DataFrame API:
df.groupBy("hour").avg("fare_amount")
.orderBy("hour").show(5)
result:
+----+------------------+
|hour| avg(fare_amount)|
+----+------------------+
| 0.0|11.083333333333334|
| 1.0|22.581632653061224|
| 2.0|11.370820668693009|
| 3.0|13.873989218328841|
| 4.0| 14.57204433497537|
+----+------------------+
What is the average fare amount compared to the average trip distance?
%sql
select trip_distance,avg(trip_distance), avg(fare_amount)
from taxi
group by trip_distance order by avg(trip_distance) desc
What is the average fare amount and average trip distance by hour of the day?
%sql
select hour, avg(fare_amount), avg(trip_distance)
from taxi
group by hour order by hour
What is the average fare amount and average trip distance by rate code?
%sql
select hour, avg(fare_amount), avg(trip_distance)
from taxi
group by rate_code order by rate_code
What is the average fare amount and average trip distance by day of the week?
%sql
select day_of_week, avg(fare_amount), avg(trip_distance)
from taxi
group by day_of_week order by day_of_week
You can use the Spark SQL tab to view Query execution information, such as the query plan details and SQL metrics. Clicking on the query link displays the DAG of the job.
Clicking on the +details in the DAG displays details for that stage
Clicking the Details link on the bottom displays the logical plans and the physical plan in text format.
In the query plan details, you can see:
Use the Spark SQL ANALYZE TABLE tablename COMPUTE STATISTICS to take advantage of cost-based optimization in the Catalyst Planner.
The Jobs tab summary page shows high-level job information, such as the status, duration, and progress of all jobs and the overall event timeline. Here are some metrics to check:
The stage tab displays summary metrics for all tasks. You can use the metrics to identify problems with an executor or task distribution. Here are some things to look for:
The Storage tab displays DataFrames that are cached or persisted to disk with size in memory and size on disk information. You can use the storage tab to see if cached DataFrames are fitting into memory. If a DataFrame will be reused, and if it fits into memory, caching it will make execution faster.
The Executors tab displays summary memory, disk, and task usage information by the executors that were created for the application. You can use this tab to confirm that your application has the amount of resources needed, using the following:
File partitioning and Bucketing are common optimization techniques in Spark SQL. They can be helpful for reducing data skew and data shuffling by pre-aggregating data in files or directories. DataFrames can be sorted, partitioned, and/or bucketed when saved as persistent tables. Partitioning optimizes reads by storing files in a hierarchy of directories based on the given columns. For example, when we partition a DataFrame by year:
df.write.format("parquet")
.partitionBy("year")
.option("path", "/data ")
.saveAsTable("taxi")
The directory would have the following structure:
After partitioning the data, when queries are made with filter operators on the partition column, the Spark SQL catalyst optimizer pushes down the partition filter to the datasource. The scan reads only the directories that match the partition filters, reducing disk I/O and data loaded into memory. For example, the following query reads only the files in the year = '2019' directory.
df.filter("year = '2019')
.groupBy("year").avg("fareamount")
When visualizing the physical plan for this query, you will see Scan PrunedInMemoryFileIndex[ /data/year=2019], PartitionFilters: [ (year = 2019)] .
Similar to partitioning, bucketing splits data by a value. However, bucketing distributes data across a fixed number of buckets by a hash on the bucket value, whereas partitioning creates a directory for each partition column value. Tables can be bucketed on more than one value and bucketing can be used with or without partitioning. If we add bucketing to the previous example, the directory structure is the same as before, with data files in the year directories grouped across four buckets by hour.
df.write.format("parquet")
.partitionBy("year")
.bucketBy(4,"hour")
.option("path", "/data ")
.saveAsTable("taxi")
After bucketing the data, aggregations and joins (wide transformations) on the bucketed value do not have to shuffle data between partitions, reducing network and disk I/O. Also, bucket filter pruning will be pushed to the datasource reducing disk I/O and data loaded into memory. The following query pushes down the partition filter on year to the datasource and avoids the shuffle to aggregate on hour.
df.filter("year = '2019')
.groupBy("hour")
.avg("hour")
Partitioning should only be used with columns used frequently in queries for filtering and that have a limited number of column values with enough corresponding data to distribute the files in the directories. Small files are less efficient with excessive parallelism and too few large files can hurt parallelism. Bucketing works well when the number of unique bucketing column values is large and the bucketing column is used often in queries.
In this chapter, we explored how to use tabular data with Spark SQL. These code examples can be reused as the foundation for processing data with Spark SQL. In another chapter, we use the same data with DataFrames for predicting taxi fares.