Working with JSON Data

JSON (JavaScript Object Notation) is the lingua franca of web APIs and NoSQL databases (like MongoDB). Unlike flat CSVs, JSON data is often semi-structured, containing nested objects and arrays. Spark excels at traversing these structures.

Data: 2015-summary.json.

Reading and Schema Inference

Spark can automatically infer the schema of a JSON file by reading a sample. However, for production pipelines, defining a schema is recommended to avoid runtime errors.

# Manually defining schema (Performant & Safer)
from pyspark.sql.types import StructType, StructField, StringType, LongType

user_schema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), True)
])

json_df_strict = spark.read.schema(user_schema).json("data/2015-summary.json")

json_df_strict.show()
OUTPUT+--------------------+-------------------+-----+ | DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +--------------------+-------------------+-----+ | United States| Romania| 15| | United States| Croatia| 1| | United States| Ireland| 344| | Egypt| United States| 15| | United States| India| 62| | United States| Singapore| 1| | United States| Grenada| 62| | Costa Rica| United States| 588| | Senegal| United States| 40| | Moldova| United States| 1| | United States| Sint Maarten| 325| | United States| Marshall Islands| 39| | Guyana| United States| 64| | Malta| United States| 1| | Anguilla| United States| 41| | Bolivia| United States| 30| | United States| Paraguay| 6| | Algeria| United States| 4| |Turks and Caicos ...| United States| 230| | United States| Gibraltar| 1| +--------------------+-------------------+-----+ only showing top 20 rows

Accessing Nested Fields

Although our dataset is flat, often JSON has nested fields (e.g., address: { city: "NY", zip: 10001 } ). You can access these using dot notation.

# Hypothetical example
# df.select("address.city").show()

Parsing JSON Columns with from_json

A very common pattern in Data Engineering is receiving a CSV or Database table where one column contains a raw JSON string. You don't need to re-read the file; you can parse it on the fly.

from pyspark.sql.functions import from_json, col

# Let's simulate a dataframe with a JSON string column
data = [("1", '{"name": "Alice", "age": 30}'), ("2", '{"name": "Bob", "age": 25}')]
raw_df = spark.createDataFrame(data, ["id", "json_string"])

# Define the schema of the JSON inside the string
json_schema = StructType([
    StructField("name", StringType()),
    StructField("age", LongType())
])

# Parse it into a Struct column
parsed_df = raw_df.withColumn("data", from_json(col("json_string"), json_schema))
parsed_df.printSchema()
OUTPUTroot |-- id: string (nullable = true) |-- json_string: string (nullable = true) |-- data: struct (nullable = true) | |-- name: string (nullable = true) | |-- age: long (nullable = true)
parsed_df.select("data.name", "data.age").show()
OUTPUT+-----+---+ | name|age| +-----+---+ |Alice| 30| | Bob| 25| +-----+---+

Complex Types: SQL to JSON

Conversely, you might want to bundle columns into a JSON string to send to an API. Use to_json .

from pyspark.sql.functions import to_json, struct

# Bundles all columns into one JSON string
json_output = json_df.select(to_json(struct("*")).alias("json_payload"))
json_output.show(3, truncate=False)
OUTPUT+---------------------------------------------------------------------------------+ |json_payload | +---------------------------------------------------------------------------------+ |{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Romania","count":15} | |{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Croatia","count":1} | |{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Ireland","count":344}| +---------------------------------------------------------------------------------+ only showing top 3 rows

Scenario: Flattening Lists (Explode)

A generic JSON pattern is a list of items: { "order": 1, "items": ["apple", "banana"] }. To analyze items, we must "explode" the list effectively creating a row for "apple" and a row for "banana" (both with order 1).

from pyspark.sql.functions import explode, split

# Simulating a dataframe with an array column
data = [("Order1", ["Apple", "Banana"]), ("Order2", ["Orange"])]
df_arrays = spark.createDataFrame(data, ["id", "items"])

# Explode creates a new row for EACH element in the array
df_exploded = df_arrays.select("id", explode("items").alias("item"))
df_exploded.show()
OUTPUT+------+------+ | id| item| +------+------+ |Order1| Apple| |Order1|Banana| |Order2|Orange| +------+------+

Concept: Handling Malformed JSON

What if your JSON file has a bad line? By default, Spark might fail or null it. You can control this with mode.

  • PERMISSIVE (Default): Sets nulls for bad fields, puts raw string in _corrupt_record .
  • DROPMALFORMED: Ignores the bad row entirely.
  • FAILFAST: Throws an exception immediately.
# Example: Fail immediately if bad JSON is found
df = spark.read.option("mode", "FAILFAST").json("data/input.json")