This is more of a review project from one of the missions on dataquest. We are going to use Apache Spark 1.5 with Jupyter. Instead of doing any heavy analysis. This project will focus more on the syntax.
We'll be using spark with a txt file 'hamlet.txt'.
#Initialize findspark and pyspark, create the sc object
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
#Reads the file hamlet.txt into a RDD object
raw_hamlet = sc.textFile('hamlet.txt')
print(raw_hamlet.take(5))
The .map() method¶
In spark, we can use the .map() method in conjunction with a lambda function to iterate through the data. This method transforms the RDD of length N to another RDD with length N.
#Iterates over every element in the list from raw data, split it into a list of lists on '\t'
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))
split_hamlet.take(5)
The .flatMap() method¶
This method transform an RDD of length N into a collection of N collections, then transform the collections into one RDD. This method is useful if we want to transform one RDD into another with a different length.
#Extracts all the lines where hamlet speaks and yeilds a new RDD object
def hamlet_speaks(line):
id = line[0]
speaketh = False
if "HAMLET" in line:
speaketh = True
if speaketh:
yield id,"hamlet speaketh!"
hamlet_spoken = split_hamlet.flatMap(lambda x: hamlet_speaks(x))
hamlet_spoken.take(5)
The .filter() method¶
The filter method takes in a function with boolean results. This is useful if we want to search for a certain name or value.
def filter_hamlet_speaks(line):
if "HAMLET" in line:
return True
else:
return False
#Filters split_hamlet, only lines with 'HAMLET' will be kept
hamlet_spoken_lines = split_hamlet.filter(lambda line: filter_hamlet_speaks(line))
hamlet_spoken_lines.take(5)
The .count() and .collect() actions¶
The collect action copies the dataset to the driver. If the dataset is very large and won't fit in memory, we'll run into issues. The .take() action is generally better if we only want a certain number of elements.
#Counts the total number of elements in hamlet_spoken_lines
spoken_count = hamlet_spoken_lines.count()
#Collects all the elements in hamlet_spoken_lines
spoken_101 = hamlet_spoken_lines.collect()
print(spoken_count)
print(spoken_101[0:10])
Below are some more examples of using the .map() and .filter() methods.
#Change each line in raw_hamlet from 'hamlet@id' to 'id' where id is an integer in each line
raw_hamlet = sc.textFile("hamlet.txt")
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))
def line_replace(line):
lists = []
for element in line:
if 'hamlet@' in element:
lists.append(element.replace('hamlet@', ''))
else:
lists.append(element)
return lists
hamlet_with_ids = split_hamlet.map(lambda line: line_replace(line))
print(hamlet_with_ids.take(5))
#Remove any element with only an id and any element that is blank.
def text_only(line):
lists = []
for element in line:
if element == '':
continue
else:
lists.append(element)
return lists
def one_element(line):
if len(line) > 1:
return True
else:
return False
hamlet_shortened = hamlet_with_ids.filter(lambda line: one_element(line))
hamlet_text_only = hamlet_shortened.map(lambda x: text_only(x))
hamlet_text_only.take(10)
#Remove pipe characters.
def line_replace2(line):
lists = []
for element in line:
if element == '|':
continue
elif '|' in element:
lists.append(element.replace('|', ''))
else:
lists.append(element)
return lists
clean_hamlet = hamlet_text_only.map(lambda x: line_replace2(x))
clean_hamlet.take(10)
Learning Summary¶
Concepts explored: Spark
PySpark methods used: .map(), .flatMap(), .filter(), .count(), .collect(), .take()
The files used for this project can be found in my GitHub repository.
Comments
comments powered by Disqus