Spark SQL and DataFrames

Spark SQL allows you to run SQL-like queries on your DataFrames. This is one of the most powerful features of Spark, facilitating a smooth transition for data analysts and engineers coming from a traditional relational database background. It allows you to express complex logic using standard SQL syntax while benefitting from Spark's distributed processing engine.

In this tutorial, we will explore:

  • Registering DataFrames as Views
  • Executing SQL Queries
  • Mixing SQL and DataFrame APIs
  • Using selectExpr

First, ensure you have the data tips.csv.

Registering a Temp View

To run SQL queries against a DataFrame, we must first register it as a temporary view. This creates a reference in the SparkSession's catalog. The view is session-scoped and will disappear when the session ends.

# Load the dataframe
tips = spark.read.option("header","true").option("inferSchema","true").csv("data/tips.csv")

tips.schema
OUTPUTDataFrame[total_bill: double, tip: double, sex: string, smoker: string, day: string, time: string, size: int] # Register as a view
tips.createOrReplaceTempView("tips_view")

Tips view is now registered as a temporary view and we can apply SQL directly to it as a view/table

Running Standard SQL Queries

Once registered, we can use spark.sql() to execute standard SQL queries. This returns a new DataFrame.

# Select specific columns with a filter
sql_result = spark.sql("""
    SELECT 
        sex, 
        smoker, 
        round(avg(tip), 2) as avg_tip,
        count(*) as count
    FROM tips_view 
    WHERE total_bill > 10 
    GROUP BY 1, 2
    ORDER BY avg_tip DESC
""")

# show the output
sql_result.show()
OUTPUT+------+------+-------+-----+ | sex|smoker|avg_tip|count| +------+------+-------+-----+ | Male| No| 3.25| 89| | Male| Yes| 3.06| 57| |Female| Yes| 3.02| 30| |Female| No| 2.86| 51| +------+------+-------+-----+

The Power of selectExpr

Sometimes you don't want to register a full view just to write a small piece of SQL logic. The selectExpr method allows you to use SQL expressions directly on a DataFrame, acting as a bridge between the two APIs. This is extremely useful for casting types or performing arithmetic inline.

# Using SQL expressions without a temp view
# converting strings to boolean or calculating ratios
tips.selectExpr(
    "if(smoker = 'No', true, false) as is_non_smoker",
    "total_bill",
    "tip",
    "round(tip / total_bill, 2) as tip_ratio"
).show(5)
OUTPUT+-------------+----------+----+---------+ |is_non_smoker|total_bill| tip|tip_ratio| +-------------+----------+----+---------+ | true| 16.99|1.01| 0.06| | true| 10.34|1.66| 0.16| | true| 21.01| 3.5| 0.17| | true| 23.68|3.31| 0.14| | true| 24.59|3.61| 0.15| +-------------+----------+----+---------+ only showing top 5 rows

Interoperability

A common pattern in PySpark development is to use SQL for complex filtering or aggregations (where SQL logic is more readable) and then switch back to the PySpark API for procedural operations or saving data.

# Step 1: Complex SQL aggregations
top_days = spark.sql("SELECT day FROM tips_view GROUP BY day HAVING avg(tip) > 2.5")

# Step 2: Join back using PySpark API
# Note: In a real scenario, this might be a more complex transformation
filtered_tips = tips.join(top_days, on="day", how="inner")
filtered_tips.show()
OUTPUT+---+----------+----+------+------+------+----+ |day|total_bill| tip| sex|smoker| time|size| +---+----------+----+------+------+------+----+ |Sun| 16.99|1.01|Female| No|Dinner| 2| |Sun| 10.34|1.66| Male| No|Dinner| 3| |Sun| 21.01| 3.5| Male| No|Dinner| 3| |Sun| 23.68|3.31| Male| No|Dinner| 2| |Sun| 24.59|3.61|Female| No|Dinner| 4| |Sun| 25.29|4.71| Male| No|Dinner| 4| |Sun| 8.77| 2.0| Male| No|Dinner| 2| |Sun| 26.88|3.12| Male| No|Dinner| 4| |Sun| 15.04|1.96| Male| No|Dinner| 2| |Sun| 14.78|3.23| Male| No|Dinner| 2| |Sun| 10.27|1.71| Male| No|Dinner| 2| |Sun| 35.26| 5.0|Female| No|Dinner| 4| |Sun| 15.42|1.57| Male| No|Dinner| 2| |Sun| 18.43| 3.0| Male| No|Dinner| 4| |Sun| 14.83|3.02|Female| No|Dinner| 2| |Sun| 21.58|3.92| Male| No|Dinner| 2| |Sun| 10.33|1.67|Female| No|Dinner| 3| |Sun| 16.29|3.71| Male| No|Dinner| 3| |Sun| 16.97| 3.5|Female| No|Dinner| 3| |Sat| 20.65|3.35| Male| No|Dinner| 3| +---+----------+----+------+------+------+----+ only showing top 20 rows
filtered_tips.explain()
OUTPUT== Physical Plan == *(2) Project [day#14, total_bill#10, tip#11, sex#12, smoker#13, time#15, size#16] +- *(2) BroadcastHashJoin [day#14], [day#14], Inner, BuildRight, false :- *(2) Filter isnotnull(day#14) : +- FileScan csv ... +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#95] +- *(1) Filter (isnotnull(avg(tip)#35) AND (avg(tip)#35 > 2.5)) +- *(1) HashAggregate(keys=[day#14], functions=[avg(tip#11)]) +- Exchange hashpartitioning(day#14, 200), ENSURE_REQUIREMENTS, [id=#91] +- *(1) HashAggregate(keys=[day#14], functions=[partial_avg(tip#11)]) +- FileScan csv ...

Using explain() confirms that Spark optimizes the mixed query plan into a single execution graph.

Common Table Expressions (CTEs)

Complex queries often become unreadable with multiple nested subqueries. Spark SQL supports CTEs using the WITH clause. This is cleaner and allows you to break down logic.

Scenario: Find the average total bill for Male and Female customers, but ONLY for days where the average transaction size was greater than 3 people.

spark.sql("""
WITH BusyDays AS (
    SELECT day 
    FROM tips_view 
    GROUP BY day 
    HAVING avg(size) > 2.5
)
SELECT 
    sex, 
    round(avg(total_bill), 2) as avg_bill
FROM tips_view
WHERE day IN (SELECT day FROM BusyDays)
GROUP BY sex
""").show()
OUTPUT+------+--------+ | sex|avg_bill| +------+--------+ |Female| 19.76| | Male| 21.34| +------+--------+

Conditional Logic with CASE WHEN

Data is rarely binary. CASE WHEN statements allow you to bin data into custom categories directly in SQL.

Scenario: We want to create a "Tipper Profile" based on the ratio of tip to bill.

spark.sql("""
SELECT 
    total_bill, 
    tip, 
    CASE 
        WHEN (tip/total_bill) > 0.2 THEN 'Generous'
        WHEN (tip/total_bill) < 0.1 THEN 'Low'
        ELSE 'Standard'
    END as tipper_profile
FROM tips_view
LIMIT 5
""").show()
OUTPUT+----------+----+--------------+ |total_bill| tip|tipper_profile| +----------+----+--------------+ | 16.99|1.01| Low| | 10.34|1.66| Standard| | 21.01| 3.5| Standard| | 23.68|3.31| Standard| | 24.59|3.61| Standard| +----------+----+--------------+

Exploring Metadata

You can use the Spark Catalog to programmatically explore what tables and databases exist. This is useful when building generic data validation tools.

# List available tables in the current session
print(spark.catalog.listTables())
OUTPUT[Table(name='tips_view', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]
# List columns in our view
print(spark.catalog.listColumns("tips_view"))
OUTPUT [Column(name='total_bill', description=None, dataType='double', nullable=True, isPartition=False, isBucket=False, isCluster=False), Column(name='tip', description=None, dataType='double', nullable=True, isPartition=False, isBucket=False, isCluster=False), Column(name='sex', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False, isCluster=False), Column(name='smoker', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False, isCluster=False), Column(name='day', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False, isCluster=False), Column(name='time', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False, isCluster=False), Column(name='size', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False, isCluster=False)]