Implementing Joins and Relations
Joining datasets is a fundamental operation in data engineering. It allows you to enrich facts (like flights) with dimensions (like country details).
Data: 2015-summary.csv and country_codes.csv.
The Datasets
The following code will read in the summary and country_codes csv files into variables for manipulation.
flights = spark.read.option("header","true").option("inferSchema","true").csv("data/2015-summary.csv")
countries = spark.read.option("header","true").option("inferSchema","true").csv("data/country_codes.csv")
# Quick look to identify the key
flights.printSchema()
countries.printSchema()
We see
Inner Join
The
# condition: flights.DEST_COUNTRY_NAME == countries.name
joined_inner = flights.join(countries, flights.DEST_COUNTRY_NAME == countries.name, "inner")
# Select relevant columns Use `countries['alpha-3']` to resolve ambiguity if needed
joined_inner.select("DEST_COUNTRY_NAME", "alpha-3", "count").show(5)
Left Outer Join
Keeps all rows from the Left dataset (Flights) and matches from the Right (Countries). If no match is found, columns from the Right will be Null. This is crucial for Data Quality checks (identifying unknown codes).
joined_left = flights.join(countries, flights.DEST_COUNTRY_NAME == countries.name, "left")
# Let's filter for rows where the join failed (null country code)
joined_left.where("name IS NULL").select("DEST_COUNTRY_NAME", "count").show(5)
Note: "United States" failed here because the country file likely lists it as "United States of America". This highlights a real-world data cleaning need!
Handling Duplicate Column Names
If both dataframes have a column named
Strategy 1: Rename before joining
countries_renamed = countries.withColumnRenamed("name", "country_name")
flights.join(countries_renamed, flights.DEST_COUNTRY_NAME == countries_renamed.country_name)
Strategy 2: Join on string expression (if names match)
# Only works if column names are IDENTICAL
# flights.join(countries, "country_code")
Other Join Types
- Right Outer: Inverse of Left. Keeps all rows from Countries.
- Full Outer: Keeps rows from both sides, filling nulls where matches are missing.
- Cross Join: Cartesian product (All rows x All rows). expensive! Avoid unless intentional.
Scenario: Finding Unmatched Records (Anti Join)
A "Left Anti Join" is a powerful tool for negation. It answers the question: "Which flights went to countries NOT in our country list?". It only returns columns from the Left dataframe, efficiently filtering out matches.
# Returns flights where DEST_COUNTRY_NAME is NOT found in countries dataset
missing_metadata = flights.join(countries, flights.DEST_COUNTRY_NAME == countries.name, "left_anti")
missing_metadata.select("DEST_COUNTRY_NAME", "count").distinct().show(5)
This is much faster than doing a Left Join and filtering where right_side_id is Null.
Scenario: Filtering Existence (Semi Join)
A "Left Semi Join" is the opposite. It answers: "Filter my flights to only show those that DO have a valid country code". Crucially, it does not duplicate rows if the right side has duplicates, and it does not add columns from the right side.
valid_flights = flights.join(countries, flights.DEST_COUNTRY_NAME == countries.name, "left_semi")
# Note: 'alpha-3' from countries is NOT available here
valid_flights.printSchema()
Concept: Cross Join with Filters
Sometimes you need to compare every row with every other row. This is a Cross Join. It is dangerous on large data. Always try to pair it with a filter immediately.
# Scenario: Compare every country's flight count with every other country's flight count
# to find pairs with similar traffic.
df1 = flights.select("DEST_COUNTRY_NAME", "count").withColumnRenamed("count", "c1")
df2 = flights.select("DEST_COUNTRY_NAME", "count").withColumnRenamed("count", "c2")
# Generate all pairs, but filter immediately for similar traffic (within 5%)
similar_traffic = df1.crossJoin(df2) \
.filter("abs(c1 - c2) < (c1 * 0.05)") \
.filter("DEST_COUNTRY_NAME != DEST_COUNTRY_NAME") # Exclude self-matches (requires aliasing properly in real code)