Handling Missing Data

Real-world data is rarely clean. Missing values (nulls) can break pipelines or skew analysis. PySpark provides a comprehensive set of tools in the DataFrameNaFunctions submodule (accessed via df.na) to handle these scenarios.

We will use the titanic.csv dataset which is known for having missing age and deck information.

Inspecting Nulls

First, let's load the data and quantify the missing values.

titanic = spark.read.option("header","true").option("inferSchema","true").csv("data/titanic.csv")

# Quick viewing of potentially null columns
titanic.select("survived", "age", "deck", "embark_town").show(5)
OUTPUT+--------+----+----+-----------+ |survived| age|deck|embark_town| +--------+----+----+-----------+ | 0|22.0|NULL|Southampton| | 1|38.0| C| Cherbourg| | 1|26.0|NULL|Southampton| | 1|35.0| C|Southampton| | 0|35.0|NULL|Southampton| +--------+----+----+-----------+ only showing top 5 rows

Dropping Data

The na.drop() method is versatile. You can specify whether to drop if any column is null, or only if all columns are null. You can also restrict this to a subset of columns.

# Default: drop row if ANY column is null
print("Count after default drop:", titanic.na.drop().count())

# Drop only if specific columns are null
print("Count after dropping null Age:", titanic.na.drop(subset=["age"]).count())

# Drop only if ALL specified columns are null (rare but useful)
print("Count after dropping if both Age and Deck are null:", titanic.na.drop(how="all", subset=["age", "deck"]).count())
OUTPUTCount after default drop: 182 Count after dropping null Age: 714 Count after dropping if both Age and Deck are null: 733

Filling Data (Imputation)

Dropping data often results in information loss. A better strategy is often to fill (impute) the missing values. PySpark allows filling with a constant or a mapped dictionary.

Fill with Fixed Value

# Fill all null strings with "Unknown" and all null numerics with 0
titanic.na.fill("Unknown").na.fill(0).select("age", "deck").show(5)
OUTPUT+----+-------+ | age| deck| +----+-------+ |22.0|Unknown| |38.0| C| |26.0|Unknown| |35.0| C| |35.0|Unknown| | 0.0|Unknown| |54.0| E| | 2.0|Unknown| |27.0|Unknown| |14.0|Unknown| +----+-------+ only showing top 10 rows

Fill with Column-Specific Values

A dictionary approach is cleaner when dealing with multiple columns.

fill_dict = {
    "deck": "U",          # U for Unknown
    "embark_town": "Southampton" # Mode (most common) value
}
titanic.na.fill(fill_dict).select("deck", "embark_town").show(10)
OUTPUT+----+-----------+ |deck|embark_town| +----+-----------+ | U|Southampton| | C| Cherbourg| | U|Southampton| | C|Southampton| | U|Southampton| | U| Queenstown| | E|Southampton| | U|Southampton| | U|Southampton| | U| Cherbourg| +----+-----------+ only showing top 10 rows

Imputation: Mean/Median

To fill with the mean, you must first calculate it, and then pass it to the fill function. PySpark's Imputer class (from ML lib) handles this, but here is how to do it manually with DataFrames.

from pyspark.sql.functions import mean, round

# 1. Calculate the mean age - rounding for demonstration only
mean_age = titanic.select( round(mean("age"), 1)).first()[0]
mean_age
 
OUTPUT29.7

# 2. Fill
titanic.na.fill(mean_age, subset=["age"]).select("age").show(10)
OUTPUT+----+ | age| +----+ |22.0| |38.0| |26.0| |35.0| |35.0| |29.7| |54.0| | 2.0| |27.0| |14.0| +----+ only showing top 10 rows

Replacing Values

Sometimes data isn't null, but contains placeholders like "n/a" or -999. The replace method helps here.

# Example: Replace "Southampton" with "S" and "Cherbourg" with "C" to save space
titanic.replace(["Southampton", "Cherbourg"], ["S", "C"], subset=["embark_town"]).show(5)
OUTPUT+--------+------+------+----+-----+-----+-------+--------+------+-----+----------+----+-----------+-----+-----+ |survived|pclass| sex| age|sibsp|parch| fare|embarked| class| who|adult_male|deck|embark_town|alive|alone| +--------+------+------+----+-----+-----+-------+--------+------+-----+----------+----+-----------+-----+-----+ | 0| 3| male|22.0| 1| 0| 7.25| S| Third| man| true|NULL| S| no|false| | 1| 1|female|38.0| 1| 0|71.2833| C| First|woman| false| C| C| yes|false| | 1| 3|female|26.0| 0| 0| 7.925| S| Third|woman| false|NULL| S| yes| true| | 1| 1|female|35.0| 1| 0| 53.1| S| First|woman| false| C| S| yes|false| | 0| 3| male|35.0| 0| 0| 8.05| S| Third| man| true|NULL| S| no| true| | 0| 3| male|NULL| 0| 0| 8.4583| Q| Third| man| true|NULL| Queenstown| no| true| | 0| 1| male|54.0| 0| 0|51.8625| S| First| man| true| E| S| no| true| | 0| 3| male| 2.0| 3| 1| 21.075| S| Third|child| false|NULL| S| no|false| | 1| 3|female|27.0| 0| 2|11.1333| S| Third|woman| false|NULL| S| yes|false| | 1| 2|female|14.0| 1| 0|30.0708| C|Second|child| false|NULL| C| yes|false| +--------+------+------+----+-----+-----+-------+--------+------+-----+----------+----+-----------+-----+-----+ only showing top 10 rows

Data Quality Flagging

In production pipelines, simply dropping bad data might hide issues. A better pattern is often to flag the data instead of deleting it. This allows analysts to review the "bad" data later.

from pyspark.sql.functions import when, col

# Create a new column 'dq_issue' that is True if Age is missing
df_flagged = titanic.withColumn("dq_issue", 
    when(col("age").isNull(), "Missing Age") \
    .when(col("deck").isNull(), "Missing Deck") \
    .otherwise("Clean")
)

df_flagged.show(10)
OUTPUT+--------+------+------+----+-----+-----+-------+--------+------+-----+----------+----+-----------+-----+-----+------------+ |survived|pclass| sex| age|sibsp|parch| fare|embarked| class| who|adult_male|deck|embark_town|alive|alone| dq_issue| +--------+------+------+----+-----+-----+-------+--------+------+-----+----------+----+-----------+-----+-----+------------+ | 0| 3| male|22.0| 1| 0| 7.25| S| Third| man| true|NULL|Southampton| no|false|Missing Deck| | 1| 1|female|38.0| 1| 0|71.2833| C| First|woman| false| C| Cherbourg| yes|false| Clean| | 1| 3|female|26.0| 0| 0| 7.925| S| Third|woman| false|NULL|Southampton| yes| true|Missing Deck| | 1| 1|female|35.0| 1| 0| 53.1| S| First|woman| false| C|Southampton| yes|false| Clean| | 0| 3| male|35.0| 0| 0| 8.05| S| Third| man| true|NULL|Southampton| no| true|Missing Deck| | 0| 3| male|NULL| 0| 0| 8.4583| Q| Third| man| true|NULL| Queenstown| no| true| Missing Age| | 0| 1| male|54.0| 0| 0|51.8625| S| First| man| true| E|Southampton| no| true| Clean| | 0| 3| male| 2.0| 3| 1| 21.075| S| Third|child| false|NULL|Southampton| no|false|Missing Deck| | 1| 3|female|27.0| 0| 2|11.1333| S| Third|woman| false|NULL|Southampton| yes|false|Missing Deck| | 1| 2|female|14.0| 1| 0|30.0708| C|Second|child| false|NULL| Cherbourg| yes|false|Missing Deck| +--------+------+------+----+-----+-----+-------+--------+------+-----+----------+----+-----------+-----+-----+------------+ only showing top 10 rows
df_flagged.groupBy("dq_issue").count().show()
OUTPUT+------------+-----+ | dq_issue|count| +------------+-----+ | Clean| 184| |Missing Deck| 530| | Missing Age| 177| +------------+-----+

Concept: NaN vs Null

It is important to note that internal to Spark, NaN (Not a Number, usually from floating point operations) and Null (Missing) are distinct.

from pyspark.sql.functions import nanvl, lit

# nanvl returns the first column if not NaN, otherwise the second (default)
titanic.select(nanvl("age", lit(0))).show()
OUTPUT+-------------+ |nanvl(age, 0)| +-------------+ | 22.0| | 38.0| | 26.0| | 35.0| | 35.0| | NULL| | 54.0| | 2.0| | 27.0| | 14.0| | 4.0| | 58.0| | 20.0| | 39.0| | 14.0| | 55.0| | 2.0| | NULL| | 31.0| | NULL| +-------------+ only showing top 20 rows

Note: Regular na.drop() often handles both, but filter(col("x").isNull()) might miss NaN values in float columns. Use isnan() for strictly checking NaN.

Scenario: Conditional Cleaning

Strategies often differ by data type. We can use a loop to apply logic dynamically.

# Impute 0 for all numeric columns, "Unknown" for all string columns
for dtype in titanic.dtypes:
    col_name = dtype[0]
    col_type = dtype[1]
    
    if col_type == "string":
        titanic = titanic.na.fill("Unknown", subset=[col_name])
    elif col_type == "double" or col_type == "int":
        titanic = titanic.na.fill(0, subset=[col_name])

titanic.show(5)
OUTPUT+--------+------+------+----+-----+-----+-------+--------+------+-----+----------+-------+-----------+-----+-----+ |survived|pclass| sex| age|sibsp|parch| fare|embarked| class| who|adult_male| deck|embark_town|alive|alone| +--------+------+------+----+-----+-----+-------+--------+------+-----+----------+-------+-----------+-----+-----+ | 0| 3| male|22.0| 1| 0| 7.25| S| Third| man| true|Unknown|Southampton| no|false| | 1| 1|female|38.0| 1| 0|71.2833| C| First|woman| false| C| Cherbourg| yes|false| | 1| 3|female|26.0| 0| 0| 7.925| S| Third|woman| false|Unknown|Southampton| yes| true| | 1| 1|female|35.0| 1| 0| 53.1| S| First|woman| false| C|Southampton| yes|false| | 0| 3| male|35.0| 0| 0| 8.05| S| Third| man| true|Unknown|Southampton| no| true| | 0| 3| male| 0.0| 0| 0| 8.4583| Q| Third| man| true|Unknown| Queenstown| no| true| | 0| 1| male|54.0| 0| 0|51.8625| S| First| man| true| E|Southampton| no| true| | 0| 3| male| 2.0| 3| 1| 21.075| S| Third|child| false|Unknown|Southampton| no|false| | 1| 3|female|27.0| 0| 2|11.1333| S| Third|woman| false|Unknown|Southampton| yes|false| | 1| 2|female|14.0| 1| 0|30.0708| C|Second|child| false|Unknown| Cherbourg| yes|false| +--------+------+------+----+-----+-----+-------+--------+------+-----+----------+-------+-----------+-----+-----+ only showing top 10 rows