Aggregations and Grouping
Aggregations turn raw records into insights. While simple counts are easy, Spark supports complex statistical summaries and multi-dimensional grouping methods.
Data: online-retail-dataset.csv.
The agg() Method
While
from pyspark.sql.functions import sum, expr, avg, count, col, stddev_pop
df = spark.read.option("header","true").option("inferSchema","true").csv("data/online-retail-dataset.csv")
# Calculate Total Quantity, Average Price, and Number of Transactions per Country
summary = df.groupBy("Country").agg(
sum("Quantity").alias("Total_Stock"),
round(avg("UnitPrice"), 2).alias("Avg_Price"),
count("InvoiceNo").alias("Tx_Count"),
round(stddev_pop("Quantity"), 2).alias("Qty_StdDev")
)
summary.orderBy(col("Total_Stock").desc()).show(5)
Grouping with Expressions
You aren't limited to grouping by columns. You can group by expressions.
# Group by whether the purchase was large (> 10 items)
df.groupBy(expr("Quantity > 10").alias("Is_Bulk_Order")) \
.agg(count("*").alias("count")) \
.show()
Advanced Grouping: Rollup and Cube
For reporting, you often need subtotals at different levels (e.g., Total by Country, then Total by Country AND
Customer).
# Note: Removing null customers for cleaner output
df.where("CustomerID IS NOT NULL") \
.rollup("Country", "CustomerID") \
.agg(sum("Quantity")) \
.orderBy("Country") \
.show(10)
The rows where
Pivot Tables
Pivoting rotates data from rows to columns. It is computationally expensive but useful for final report formatting.
# Pivot summing quantity by Country for top 5 StockCodes
top_products = ["22423", "85123A", "85099B"] # Filter for speed
df.filter(col("StockCode").isin(top_products)) \
.groupBy("Country") \
.pivot("StockCode") \
.sum("Quantity") \
.na.fill(0) \
.show(10)
Scenario: Collecting Values into Lists
Sometimes you don't want to sum math; you want to see the list of items a user bought.
from pyspark.sql.functions import collect_set, size
# What different products did each customer buy?
df.groupBy("CustomerID") \
.agg(collect_set("StockCode").alias("purchased_items")) \
.withColumn("unique_item_count", size("purchased_items")) \
.show(5, truncate=50)
Concept: Approximate Aggregations
When dealing with massive "Big Data" (petabytes), counting distinct items precisely is extremely expensive
because it involves shuffling all data to find uniqueness. If you can tolerate a small error (e.g., 5%), use
from pyspark.sql.functions import approx_count_distinct
# Count unique invoices with potential 5% error
df.agg(approx_count_distinct("InvoiceNo", 0.05).alias("approx_txns")).show()