Handling Missing Data
Real-world data is rarely clean. Missing values (nulls) can break pipelines or skew analysis. PySpark provides a
comprehensive set of tools in the DataFrameNaFunctions submodule (accessed via
df.na) to handle these scenarios.
We will use the titanic.csv dataset which is known for having missing age and deck
information.
Inspecting Nulls
First, let's load the data and quantify the missing values.
titanic = spark.read.option("header","true").option("inferSchema","true").csv("data/titanic.csv")
# Quick viewing of potentially null columns
titanic.select("survived", "age", "deck", "embark_town").show(5)
Dropping Data
The na.drop() method is versatile. You can specify whether to drop if any column
is null, or only if all columns are null. You can also restrict this to a subset of columns.
# Default: drop row if ANY column is null
print("Count after default drop:", titanic.na.drop().count())
# Drop only if specific columns are null
print("Count after dropping null Age:", titanic.na.drop(subset=["age"]).count())
# Drop only if ALL specified columns are null (rare but useful)
print("Count after dropping if both Age and Deck are null:", titanic.na.drop(how="all", subset=["age", "deck"]).count())
Filling Data (Imputation)
Dropping data often results in information loss. A better strategy is often to fill (impute) the missing values.
PySpark allows filling with a constant or a mapped dictionary.
Fill with Fixed Value
# Fill all null strings with "Unknown" and all null numerics with 0
titanic.na.fill("Unknown").na.fill(0).select("age", "deck").show(5)
Fill with Column-Specific Values
A dictionary approach is cleaner when dealing with multiple columns.
fill_dict = {
"deck": "U", # U for Unknown
"embark_town": "Southampton" # Mode (most common) value
}
titanic.na.fill(fill_dict).select("deck", "embark_town").show(10)
Imputation: Mean/Median
To fill with the mean, you must first calculate it, and then pass it to the fill function. PySpark's
Imputer class (from ML lib) handles this, but here is how to do it manually with DataFrames.
from pyspark.sql.functions import mean, round
# 1. Calculate the mean age - rounding for demonstration only
mean_age = titanic.select( round(mean("age"), 1)).first()[0]
mean_age
# 2. Fill
titanic.na.fill(mean_age, subset=["age"]).select("age").show(10)
Replacing Values
Sometimes data isn't null, but contains placeholders like "n/a" or -999. The replace
method helps here.
# Example: Replace "Southampton" with "S" and "Cherbourg" with "C" to save space
titanic.replace(["Southampton", "Cherbourg"], ["S", "C"], subset=["embark_town"]).show(5)
Data Quality Flagging
In production pipelines, simply dropping bad data might hide issues. A better pattern is often to flag the
data instead of deleting it. This allows analysts to review the "bad" data later.
from pyspark.sql.functions import when, col
# Create a new column 'dq_issue' that is True if Age is missing
df_flagged = titanic.withColumn("dq_issue",
when(col("age").isNull(), "Missing Age") \
.when(col("deck").isNull(), "Missing Deck") \
.otherwise("Clean")
)
df_flagged.show(10)
df_flagged.groupBy("dq_issue").count().show()
Concept: NaN vs Null
It is important to note that internal to Spark, NaN (Not a Number, usually from floating
point operations) and Null (Missing) are distinct.
from pyspark.sql.functions import nanvl, lit
# nanvl returns the first column if not NaN, otherwise the second (default)
titanic.select(nanvl("age", lit(0))).show()
Note: Regular na.drop() often handles both, but filter(col("x").isNull())
might miss NaN values in float columns. Use isnan()
for strictly checking NaN.
Scenario: Conditional Cleaning
Strategies often differ by data type. We can use a loop to apply logic dynamically.
# Impute 0 for all numeric columns, "Unknown" for all string columns
for dtype in titanic.dtypes:
col_name = dtype[0]
col_type = dtype[1]
if col_type == "string":
titanic = titanic.na.fill("Unknown", subset=[col_name])
elif col_type == "double" or col_type == "int":
titanic = titanic.na.fill(0, subset=[col_name])
titanic.show(5)