Spark SQL and DataFrames
Spark SQL allows you to run SQL-like queries on your DataFrames. This is one of the most powerful features of Spark, facilitating a smooth transition for data analysts and engineers coming from a traditional relational database background. It allows you to express complex logic using standard SQL syntax while benefitting from Spark's distributed processing engine.
In this tutorial, we will explore:
- Registering DataFrames as Views
- Executing SQL Queries
- Mixing SQL and DataFrame APIs
- Using
selectExpr
First, ensure you have the data tips.csv.
Registering a Temp View
To run SQL queries against a DataFrame, we must first register it as a temporary view. This creates a reference in the SparkSession's catalog. The view is session-scoped and will disappear when the session ends.
# Load the dataframe
tips = spark.read.option("header","true").option("inferSchema","true").csv("data/tips.csv")
tips.schema
# Register as a view
tips.createOrReplaceTempView("tips_view")
Tips view is now registered as a temporary view and we can apply SQL directly to it as a view/table
Running Standard SQL Queries
Once registered, we can use
# Select specific columns with a filter
sql_result = spark.sql("""
SELECT
sex,
smoker,
round(avg(tip), 2) as avg_tip,
count(*) as count
FROM tips_view
WHERE total_bill > 10
GROUP BY 1, 2
ORDER BY avg_tip DESC
""")
# show the output
sql_result.show()
The Power of selectExpr
Sometimes you don't want to register a full view just to write a small piece of SQL logic. The
# Using SQL expressions without a temp view
# converting strings to boolean or calculating ratios
tips.selectExpr(
"if(smoker = 'No', true, false) as is_non_smoker",
"total_bill",
"tip",
"round(tip / total_bill, 2) as tip_ratio"
).show(5)
Interoperability
A common pattern in PySpark development is to use SQL for complex filtering or aggregations (where SQL logic is more readable) and then switch back to the PySpark API for procedural operations or saving data.
# Step 1: Complex SQL aggregations
top_days = spark.sql("SELECT day FROM tips_view GROUP BY day HAVING avg(tip) > 2.5")
# Step 2: Join back using PySpark API
# Note: In a real scenario, this might be a more complex transformation
filtered_tips = tips.join(top_days, on="day", how="inner")
filtered_tips.show()
filtered_tips.explain()
Using
Common Table Expressions (CTEs)
Complex queries often become unreadable with multiple nested subqueries. Spark SQL supports CTEs using the
Scenario: Find the average total bill for Male and Female customers, but ONLY for days where the average transaction size was greater than 3 people.
spark.sql("""
WITH BusyDays AS (
SELECT day
FROM tips_view
GROUP BY day
HAVING avg(size) > 2.5
)
SELECT
sex,
round(avg(total_bill), 2) as avg_bill
FROM tips_view
WHERE day IN (SELECT day FROM BusyDays)
GROUP BY sex
""").show()
Conditional Logic with CASE WHEN
Data is rarely binary.
Scenario: We want to create a "Tipper Profile" based on the ratio of tip to bill.
spark.sql("""
SELECT
total_bill,
tip,
CASE
WHEN (tip/total_bill) > 0.2 THEN 'Generous'
WHEN (tip/total_bill) < 0.1 THEN 'Low'
ELSE 'Standard'
END as tipper_profile
FROM tips_view
LIMIT 5
""").show()
Exploring Metadata
You can use the Spark Catalog to programmatically explore what tables and databases exist. This is useful when building generic data validation tools.
# List available tables in the current session
print(spark.catalog.listTables())
# List columns in our view
print(spark.catalog.listColumns("tips_view"))