Practical Introduction to PySpark

In this introduction to Practical PySpark, we will follow closely the implementation of functions from textbook Spark, The Definitive Guide. The idea is to simply access the facilities of PySpark using simple dataset available. In the coming sections, we will explore more advanced yet practical way to use PySpark.

To begin, download the data 2015-summary.csv It can also be found at the github repository https://github.com/databricks/Spark-The-Definitive-Guide/blob/master/data/flight-data/csv/2015-summary.csv

Reading CSV into Spark

Once you have downloaded the CSV file, we can read the CSV into a spark session object using the spark.read. This object provides various options to configure the reading of the data object. In the example below we use two options:

  • options("inferSchema", "true"): This infers the meta data directly from the CSV file
  • options("header", "true"): This sets the CSV first row as column names
# reading csv to dataframe object
flight_data = spark.read.option("header", "true").option("inferSchema", "true").csv("Downloads/2015-summary.csv")
flight_data
DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int]

When we call the object flight_data, we notice that it returns the object information. That this is a dataframe with the schema.

printSchema()

Another useful method is the printSchema() which returns the schema of our dataframe. This is more closely linked to the schema representation in SQL. To utilize this method, we call the method on the dataframe object.

flight_data.printSchema()
root |-- DEST_COUNTRY_NAME: string (nullable = true) |-- ORIGIN_COUNTRY_NAME: string (nullable = true) |-- count: integer (nullable = true)

Retrieving Data with head()

If you have used tools such as pandas, command line or R packages like tidyverse, you may be used to the idea of using a head or tail to get a quick glimpse of the data. PySpark offer the same facilities albeit a little differently. For example, we can use the head method to return an array with a sample of values contained in the dataframe

flight_data.head(3)
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

Retrieving Data with take()

Much like the head() method, the take() returns the array with $n$ rows specified a arguments

flight_data.take(3)
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

Sample data with show()

While the take() and head() methods are useful in returning arrays that have benefits for processing, often times, it is better for us to visualize the data as a table. The show() methods allows us to achieve this output format

flight_data.show()
+--------------------+-------------------+-----+ | DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +--------------------+-------------------+-----+ | United States| Romania| 15| | United States| Croatia| 1| | United States| Ireland| 344| | Egypt| United States| 15| | United States| India| 62| | United States| Singapore| 1| | United States| Grenada| 62| | Costa Rica| United States| 588| | Senegal| United States| 40| | Moldova| United States| 1| | United States| Sint Maarten| 325| | United States| Marshall Islands| 39| | Guyana| United States| 64| | Malta| United States| 1| | Anguilla| United States| 41| | Bolivia| United States| 30| | United States| Paraguay| 6| | Algeria| United States| 4| |Turks and Caicos ...| United States| 230| | United States| Gibraltar| 1| +--------------------+-------------------+-----+ only showing top 20 rows

By default, the show method returns the top 20 rows of the dataframe. Passing in an n value, will return that number of rows

Vertical Display Format

Of course, there are other ways to display the data. By default, Vertical display is set to false. However, it can be set to true to achive the results below:

flight_data.show(n=5, vertical=True)
-RECORD 0---------------------------- DEST_COUNTRY_NAME | United States ORIGIN_COUNTRY_NAME | Romania count | 15 -RECORD 1---------------------------- DEST_COUNTRY_NAME | United States ORIGIN_COUNTRY_NAME | Croatia count | 1 -RECORD 2---------------------------- DEST_COUNTRY_NAME | United States ORIGIN_COUNTRY_NAME | Ireland count | 344 -RECORD 3---------------------------- DEST_COUNTRY_NAME | Egypt ORIGIN_COUNTRY_NAME | United States count | 15 -RECORD 4---------------------------- DEST_COUNTRY_NAME | United States ORIGIN_COUNTRY_NAME | India count | 62 only showing top 5 rows

Truncate Display Format

Alternatively, we can choose to truncate the result of each row by some specified character length.

flight_data.show(10,truncate=10)
+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ | United ...| Romania| 15| | United ...| Croatia| 1| | United ...| Ireland| 344| | Egypt| United ...| 15| | United ...| India| 62| | United ...| Singapore| 1| | United ...| Grenada| 62| | Costa Rica| United ...| 588| | Senegal| United ...| 40| | Moldova| United ...| 1| +-----------------+-------------------+-----+ only showing top 10 rows

While in our case, this may not be readable, we can definitely see how it may be useful in other cases

Count

Another fundamental action we can do is to simply count the number of observations in the dataset.

flight_data.count()
256

Sort

Much like we can do in pandas, we can aggregate transformation methods. In the example below, we sort the data and return the sample of the sorted frame using the sort() and show() method.

flight_data.sort("count", ascending=False).show()
+------------------+-------------------+------+ | DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count| +------------------+-------------------+------+ | United States| United States|370002| | United States| Canada| 8483| | Canada| United States| 8399| | United States| Mexico| 7187| | Mexico| United States| 7140| | United Kingdom| United States| 2025| | United States| United Kingdom| 1970| | Japan| United States| 1548| | United States| Japan| 1496| | Germany| United States| 1468| | United States| Dominican Republic| 1420| |Dominican Republic| United States| 1353| | United States| Germany| 1336| | South Korea| United States| 1048| | United States| The Bahamas| 986| | The Bahamas| United States| 955| | United States| France| 952| | France| United States| 935| | United States| China| 920| | Colombia| United States| 873| +------------------+-------------------+------+ only showing top 20 rows

GROUP BY

Finally, let's perform a simple aggregration group by. In this case, we simply sum the count of flights by their destination and return the first 5 rows of data.

flight_data.groupBy("DEST_COUNTRY_NAME").sum('count').withColumnRenamed("sum(count)", "destination_total").limit(5).show()
+-----------------+-----------------+ |DEST_COUNTRY_NAME|destination_total| +-----------------+-----------------+ | Anguilla| 41| | Russia| 176| | Paraguay| 60| | Senegal| 40| | Sweden| 118| +-----------------+-----------------+

While this operation is useful and the begining of more extensive analytics, one of the probles we are already seeing is that using methods directly requires remembering the method names and input. This can be useful for interactive sessions but often times the use of SparkSQL is leveraged for transformation. We will look at this more closely in the coming sections.

Write to local csv

Finally, we can write to local csv using the write method. This method has a number of options and configurations. We will explore each more extensively. For now, we simply write toe csv and save the header.

# aggregating the data
summary = flight_data.groupBy("DEST_COUNTRY_NAME").sum('count').withColumnRenamed("sum(count)", "destination_total")
summary.write.option("header",True).csv("Downloads/summary.csv")

This concludes the introduction to PySpark and some of the facilities. The coming sections will explore more functionality of PySpark leading to ultimately creating extensive pipelines