In this review project, we are going to focus on processing big data using Spark SQL.
We'll be working with census data from 1980, 1990, 2000, 2010. The objective of this project is to learn how to use SQLContext objects in conjunction with spark/pandas dataframes, and SQL queries.
import findspark
findspark.init()
import pyspark
sc=pyspark.SparkContext()
First we want to creates a SQL context object named 'sqlCtx'. this class can read data from a wide range of sources.
- This includes file formats such as: .JSON, .CSV/TSV, .XML, Parquet, Amazon S3
- Database systems such as: MySQL, PostgreSQL
- Big data systems such as: Hive, Avro, Hbase
sqlCtx = pyspark.SQLContext(sc)
#Reads the json file into a dataframe
df = sqlCtx.read.json("census_2010.json")
print(type(df))
#Prints the schema of the columns
df.printSchema()
#prints the first 20 rows
df.show()
Unlike pandas dataframes, spark dataframes requires us to input the number of rows we want displayed in the .head() method
first_five = df.head(5)[0:5]
for element in first_five:
print(element)
first_one = df.head(5)[0]
print(first_one)
#Selecting columns from spark dataframes and display
df.select('age', 'males', 'females', 'year').show()
#Using boolean filtering and select rows where age > 5
five_plus = df[df['age'] > 5]
five_plus.show()
#Shows all columns where females < males
df[df['females'] < df['males']].show()
The .toPandas() method¶
The idea is to harness speed of Spark when analyzing big data and extract only the data we are interested in. Then we can convert it into a pandas dataframe for heavier data analysis.
import matplotlib.pyplot as plt
import pandas as pd
%matplotlib inline
pandas_df = df.toPandas()
pandas_df['total'].hist()
pandas_df.head()
Using SQL queries with Spark¶
SQL is extremely useful when joining multiple tables. Spark SQL allows us to combine data from various files and store the information in one table.
#Register a temp table
df.registerTempTable('census2010')
tables = sqlCtx.tableNames()
print(tables)
q1 = "SELECT age FROM census2010"
sqlCtx.sql(q1).show()
q2 = "SELECT males, females FROM census2010 WHERE age > 5 AND age < 15"
sqlCtx.sql(q2).show()
#Using describe to show basic statistics
q3 = "SELECT males, females FROM census2010"
sqlCtx.sql(q3).describe().show()
Combining files with Spark SQL¶
This is where we see the power of Spark SQL. The ability to use joins to analyze multiple tables from various files at a high speed.
#Load files into the sqlCtx object
df = sqlCtx.read.json("census_2010.json")
df2 = sqlCtx.read.json("census_1980.json")
df3 = sqlCtx.read.json("census_1990.json")
df4 = sqlCtx.read.json("census_2000.json")
df.registerTempTable('census2010')
df2.registerTempTable('census1980')
df3.registerTempTable('census1990')
df4.registerTempTable('census2000')
#Shows the table names
tables = sqlCtx.tableNames()
print(tables)
#Using joins with sqlCtx
q4 = 'SELECT c1.total, c2.total FROM census2010 c1 INNER JOIN census2000 c2 ON c1.age = c2.age'
sqlCtx.sql(q4).show()
#Using SQL aggregate functions with multiple files
q5 = '''
SELECT
SUM(c1.total) 2010_total,
SUM(c2.total) 2000_total,
SUM(c3.total) 1990_total
FROM census2010 c1
INNER JOIN census2000 c2 ON c1.age = c2.age
INNER JOIN census1990 c3 ON c1.age = c3.age
'''
sqlCtx.sql(q5).show()
Learning Summary¶
Concepts explored: Spark SQL, Spark Dataframes, combining data from multiple files
Methods and functions used: .SQLContext(), .head(), .toPandas(), .show(), .select(), .hist(), .registerTempTable()
The files used for this project can be found in my GitHub repository
Comments
comments powered by Disqus