Working with Text Data
This notebook introduces some of the functions from pyspark to allow you to work with datasets that contain text. We will cover
useful functions listed below:
- select: Use a selection in SQL
- col: Specfication of columns
- split: Splitting text based on some delimiter
- regexp_extract: Regular expression implementation for PySpark
This tutorial is largely based on the book Data Analysis with Python and PySpark by Jonathan Rioux
Reading Text Data
To begin with, we will read text data from the directory book.
sample_book = spark.read.text('book/1342-0.txt')
book.show(10, truncate=80)
Changing column name with alias
An often pedestrian activity to do is to convert the column provide names into names
that make sense for the analysis. There are two ways to achieve this. Using both the
alias() and withColumnRenamed()
book = book.select(col("value").alias("book_lines"))
book.show(5, truncate=80)
withColumnRenamed()
The withColumnRenamed()
function can be called in directly from the dataframe object. It would take on
the current name of the column and replace it with a name you provide it.
book.withColumnRenamed("book_lines", "lines").show(5, truncate=80)
Split the text into tokens
In the field on NLP, one common operation for text processing is tokenization. This is a process of
splitting text into their fundamentally meaningful entities. In many applications, this means individual words
however, depending on contect, it could mean characters or other combinations.
In the example below we implement a tokenization protocol simply by splitting the sentences into words using a
" " delimiter
book = book.withColumn("words", split("book_lines", " "))
book.show(truncate=80)
Fetching all words of the corpus
The next action is to extract all the words from the book. To do this,
we will need to use the explode() to create a column with all
the words from the book
words = book.select(F.explode("words").alias('word'))
words.show(truncate=10)
Removing NULLs or "" using where()
You will have notices that in the previous output, there were a few rows with empty data.
We wish to remove these. To remove these empty values, we will remove the nulls using the where method
words = words.where(F.col("word") != "")
words.show(truncate=10)
Removing Punctuation with regexp_extract()
Now, we will remove all the Punctuation using a simply regex
pattern [a-zA-Z']* to only retain alphanumeric characters.
clean_words = words.select(F.regexp_extract(F.col("word"), "[a-zA-Z']*", 0).alias("word"))
clean_words.show()
Lowering Words with lower()
Finally, let's lower all the words so that we can have distinct cases. This way,
words such as (Yes, yes) would map to one word. Here we use
the lower() function.
clean_words = clean_words.select(lower(F.col("word")).alias("word"))
clean_words.show()
groupBY() and orderBY()
Finally, we can group the words and count them. To achieve this, we use the
groupBy() and orderBy() to
perform aggregations. Below is the example:
clean_words.groupBy("word").count().show()
clean_words.groupBy("word").count().orderBy("count", ascending=False).show()
Cleaning Up
While we have implemented the series of operations above, all of the above
operations can be combined into a unified series of concatenated operations.
Below, we have one implementations that combines all the processes together
neatly.
books = spark.read.text("books/1342-0.txt")
results = books.select(split(F.col('value'), " ").alias("lines")).
select( F.explode(F.col("lines")).alias("word")).
select( F.regexp_extract( F.col("word"), "[a-zA-Z']*", 0).alias("word")).
select(lower(F.col("word")).alias("word")).
where(F.col("word") !="").groupBy("word").count().
orderBy("count", ascending=False)