Date Series Part 16 of Dataquest Tags Spark

This is more of a review project from one of the missions on dataquest. We are going to use Apache Spark 1.5 with Jupyter. Instead of doing any heavy analysis. This project will focus more on the syntax.

We'll be using spark with a txt file 'hamlet.txt'.

In [1]:
#Initialize findspark and pyspark, create the sc object
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()
In [2]:
#Reads the file hamlet.txt into a RDD object
raw_hamlet = sc.textFile('hamlet.txt')
print(raw_hamlet.take(5))
['hamlet@0\t\tHAMLET', 'hamlet@8', 'hamlet@9', 'hamlet@10\t\tDRAMATIS PERSONAE', 'hamlet@29']

The .map() method

In spark, we can use the .map() method in conjunction with a lambda function to iterate through the data. This method transforms the RDD of length N to another RDD with length N.

In [3]:
#Iterates over every element in the list from raw data, split it into a list of lists on '\t'
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))

split_hamlet.take(5)
Out[3]:
[['hamlet@0', '', 'HAMLET'],
 ['hamlet@8'],
 ['hamlet@9'],
 ['hamlet@10', '', 'DRAMATIS PERSONAE'],
 ['hamlet@29']]

The .flatMap() method

This method transform an RDD of length N into a collection of N collections, then transform the collections into one RDD. This method is useful if we want to transform one RDD into another with a different length.

In [4]:
#Extracts all the lines where hamlet speaks and yeilds a new RDD object
def hamlet_speaks(line):
    id = line[0]
    speaketh = False
    
    if "HAMLET" in line:
        speaketh = True
    
    if speaketh:
        yield id,"hamlet speaketh!"

hamlet_spoken = split_hamlet.flatMap(lambda x: hamlet_speaks(x))
hamlet_spoken.take(5)
Out[4]:
[('hamlet@0', 'hamlet speaketh!'),
 ('hamlet@75', 'hamlet speaketh!'),
 ('hamlet@1004', 'hamlet speaketh!'),
 ('hamlet@9144', 'hamlet speaketh!'),
 ('hamlet@12313', 'hamlet speaketh!')]

The .filter() method

The filter method takes in a function with boolean results. This is useful if we want to search for a certain name or value.

In [5]:
def filter_hamlet_speaks(line):
    if "HAMLET" in line:
        return True
    else:
        return False

#Filters split_hamlet, only lines with 'HAMLET' will be kept

hamlet_spoken_lines = split_hamlet.filter(lambda line: filter_hamlet_speaks(line))
hamlet_spoken_lines.take(5)
Out[5]:
[['hamlet@0', '', 'HAMLET'],
 ['hamlet@75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['hamlet@1004', '', 'HAMLET'],
 ['hamlet@9144', '', 'HAMLET'],
 ['hamlet@12313',
  'HAMLET',
  '[Aside]  A little more than kin, and less than kind.']]

The .count() and .collect() actions

The collect action copies the dataset to the driver. If the dataset is very large and won't fit in memory, we'll run into issues. The .take() action is generally better if we only want a certain number of elements.

In [11]:
#Counts the total number of elements in hamlet_spoken_lines
spoken_count = hamlet_spoken_lines.count()

#Collects all the elements in hamlet_spoken_lines
spoken_101 = hamlet_spoken_lines.collect()

print(spoken_count)
print(spoken_101[0:10])
381
[['hamlet@0', '', 'HAMLET'], ['hamlet@75', 'HAMLET', 'son to the late, and nephew to the present king.'], ['hamlet@1004', '', 'HAMLET'], ['hamlet@9144', '', 'HAMLET'], ['hamlet@12313', 'HAMLET', '[Aside]  A little more than kin, and less than kind.'], ['hamlet@12434', 'HAMLET', "Not so, my lord; I am too much i' the sun."], ['hamlet@12760', 'HAMLET', 'Ay, madam, it is common.'], ['hamlet@12858', 'HAMLET', "Seems, madam! nay it is; I know not 'seems.'"], ['hamlet@14821', 'HAMLET', 'I shall in all my best obey you, madam.'], ['hamlet@15261', 'HAMLET', 'O, that this too too solid flesh would melt']]

Below are some more examples of using the .map() and .filter() methods.

In [7]:
#Change each line in raw_hamlet from 'hamlet@id' to 'id' where id is an integer in each line
raw_hamlet = sc.textFile("hamlet.txt")
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))

def line_replace(line):
    lists = []
    for element in line:
        if 'hamlet@' in element:
            lists.append(element.replace('hamlet@', ''))
        else:
            lists.append(element)
    return lists
                                                
hamlet_with_ids = split_hamlet.map(lambda line: line_replace(line))
print(hamlet_with_ids.take(5))
[['0', '', 'HAMLET'], ['8'], ['9'], ['10', '', 'DRAMATIS PERSONAE'], ['29']]
In [8]:
#Remove any element with only an id and any element that is blank.
def text_only(line):
    lists = []
    for element in line:
        if element == '':
            continue
        else:
            lists.append(element)
    return lists

def one_element(line):
    if len(line) > 1:
        return True
    else:
        return False

hamlet_shortened = hamlet_with_ids.filter(lambda line: one_element(line))
hamlet_text_only = hamlet_shortened.map(lambda x: text_only(x))
hamlet_text_only.take(10)
Out[8]:
[['0', 'HAMLET'],
 ['10', 'DRAMATIS PERSONAE'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['132', 'POLONIUS', 'lord chamberlain. (LORD POLONIUS:)'],
 ['177', 'HORATIO', 'friend to Hamlet.'],
 ['204', 'LAERTES', 'son to Polonius.'],
 ['230', 'LUCIANUS', 'nephew to the king.'],
 ['261', 'VOLTIMAND', '|'],
 ['273', '|']]
In [9]:
#Remove pipe characters.
def line_replace2(line):
    lists = []
    for element in line:
        if element == '|':
            continue
        elif '|' in element:
            lists.append(element.replace('|', ''))
        else:
            lists.append(element)
    return lists
clean_hamlet = hamlet_text_only.map(lambda x: line_replace2(x))
clean_hamlet.take(10)
Out[9]:
[['0', 'HAMLET'],
 ['10', 'DRAMATIS PERSONAE'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['132', 'POLONIUS', 'lord chamberlain. (LORD POLONIUS:)'],
 ['177', 'HORATIO', 'friend to Hamlet.'],
 ['204', 'LAERTES', 'son to Polonius.'],
 ['230', 'LUCIANUS', 'nephew to the king.'],
 ['261', 'VOLTIMAND'],
 ['273']]

Learning Summary

Concepts explored: Spark

PySpark methods used: .map(), .flatMap(), .filter(), .count(), .collect(), .take()

The files used for this project can be found in my GitHub repository.



Comments

comments powered by Disqus