Spark DataFrame is a collection of items organized in rows and columns resembling a table in relational database. They are Sparks data structure implemented on top of Sparks Resilient Distributed Datasets (RDDs) and greatly optimized internally. DataFrames can be created in Scala, Java, Python and R with different data sources such as databases, Hive tables, RDDs or flat files among others. DataFrames in Pandas and Spark shares many similarities but unlike pandas DataFrames Sparks DataFrames are distributed across cluster and optimized in-memory which makes them powerful to handle and process big data. In this post we will look at DataFrames in Spark with PySpark. You can download the dataset for this post from here.

apache-spark-logo

Characteristics of Sparks DataFrames

  1. Integrates well with variety of data sources such as flat files, relational databases, json data etc.
  2. Highly optimized with in-built Spark SQL catalyst optimizer.
  3. Supported in different languages such as Python, Java, Scala and SparkR.
  4. Scales well with big data.
  5. Flexible and Easy-to-Use functions similar to pandas DataFrame functions.

Sparks DataFrames in PySpark

Load required libraries

                    

from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.getOrCreate() # initialize spark session

Creating Sparks DataFrames

Create Spark DataFrame from List

                    

spark_df_from_list = spark.createDataFrame([['France','50M','3T'],['India','30M','30T'],['Kenya','70M','25T'],
                     ['Nigeria','90M','60T'],['China','20M','2T'],['USA','80M','30T'],
                     ['UK','70M','25T'],['USA','20M','30T'],['China','70M','25T'],
                     ['France', '50M', '3T'],['China','70M','25T'] ],
                          schema='Country string,Population string,GDP string')
spark_df_from_list.show()

spark-dataframes-from-list

Create Spark DataFrame from Pandas DataFrame

                    

# Pandas DataFrame
pandas_df=pd.DataFrame([['France','50M','3T'],['India','30M','30T'],['Kenya','70M','25T'],
                     ['Nigeria','90M','60T'],['China','20M','2T'],['USA','80M','30T'],
                     ['UK','70M','25T'],['USA','20M','30T'],['China','70M','25T'],
                     ['France', '50M', '3T'],['China','70M','25T'] ],
                          columns=['Country','Population','GDP'])
pandas_df

spark-pandas-dataframes

                    

# Sparks DataFrame
spark_df=spark.createDataFrame(pandas_df)
spark_df.show()

spark-dataframes-from-list

Create a PySpark DataFrame from an RDD

                    

# RDD Data
rdd_data=sc.parallelize([['France','50M','3T'],['India','30M','30T'],['Kenya','70M','25T'],
                     ['Nigeria','90M','60T'],['China','20M','2T'],['USA','80M','30T'],
                     ['UK','70M','25T'],['USA','20M','30T'],['China','70M','25T'],
                     ['France', '50M', '3T'],['China','70M','25T'] ])
rdd_data.collect()

rdd-data

                    

# Spark DataFrame from RDD
spark_df_from_rdd=spark.createDataFrame(rdd_data, schema=['Country','Population','GDP'])
spark_df_from_rdd.show()

spark-dataframes-from-list

Display Spark DataFrame with Pandas DataFrame Jupyter Look and Feel

                    

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
spark_df_from_rdd

spark-dataframes-with-pandas-lool-and-feel

Import Data to Spark from CSV file

                    

df=spark.read.csv("titanic.csv", inferSchema=True, header=True)
df.show()

spark-read-csv

Show data schema

spark-dataframes-print-schema

Show logical and physical structure of the DataFrame

spark-dataframes-explain-logical

show columns

spark-dataframes-show-columns

Show top 5 records with head function

spark-dataframes-get-first-5-records-with-head-function

Show top 5 records in a tabular format with show function

spark-dataframes-get-first-5-records-with-show-function

Show top 5 records with take function

spark-dataframes-get-first-5-records-with-take-function

Show data with three features/columns only

                    

df.select('name','age','survived').show(5)

spark-dataframes-show-specific-columns

Show distinct records

                    

df.select('sex').distinct().show()

spark-dataframes-show-distinct-sex-column

Count distinct records

                    

df.select('sex').distinct().count()

2

Aggregate data with groupby

                    

df.select('sex').groupby('sex').count().show()

spark-dataframes-count-aggregate

Show the number of columns

8

Describe Dataframe for statistical analysis

                    

df.select('Sex','Pclass','age','survived','Fare').describe()

spark-dataframes-statistical-summary

Describe single feature for statistical analysis

spark-dataframes-sindle-feature-statistics

Adding a new column

                    

df=df.withColumn('new_fare',df['fare']+100) # adding 100 on the base fare to get new fare
df.select('name','pclass','fare','new_fare').show(5)

spark-dataframes-add-new-column

Rename a column

                    

df=df.withColumnRenamed('pclass','Passenger_Class') # Rename pclass to Passenger_Class
df.columns

spark-dataframes-rename-column

Aggregate with groupby sum in a specific column

                    

df.groupby('sex').sum().show()

spark-dataframes-aggregate-specific-columns

Aggregate with groupby sum in a specific column

                    

df.select('sex','fare','Passenger_Class','Survived').groupby('sex').sum().show()

spark-dataframes-aggregate-specific-column-2

Pivot table

                    

df.select('sex','Passenger_Class').groupby('sex').pivot('Passenger_Class').sum().show()

spark-dataframes-pivot-table

Drop columns

                    

# Lets new_fare and home.dest columns
df=df.drop('new_fare','home.dest')
df.columns

spark-dataframes-drop-columns

Show null values for in fare column

                    

df.select('fare').where('fare is null').show()

spark-dataframes-check-null

Drop null values

                    

drop_null_df=df.dropna()
drop_null_df.select('fare').where('fare is null').show()

spark-dataframes-check-null

Replace null with specific value

                    

replace_null_df=df.fillna(0.00) # Replace fare null values with 0.00 
replace_null_df.filter(replace_null_df['fare']==0.00).select('name','fare').show()

spark-dataframes-replace-null

Select data with “like”

                    

df.select("name",df['name'].like("Allen")).show(5)

spark-dataframes-use-like

Select data using “startswith”

                    

df.select("name",df['name'].startswith("All")).show(5)

spark-dataframes-using-startswith

Select data using “endswith”

                    

df.select('name',df['name'].endswith('n')).show(5)

spark-dataframes-endswith

Select data using “between”

                    

df.select('age',df['age'].between(30,50)).show(10)

spark-dataframes-use-between

Select data using “contains”

                    

df.select('name',df['name'].contains('All')).show(5)

spark-dataframes-use-contains

Select data using “substr”

                    

df.select('name',df['name'].substr(0,5)).show(10)

spark-dataframes-use-substr

Select data using substring and creating a new alias

                    

df.select('name',df['name'].substr(0,5).alias('First 5 String Characters')).show(10)

spark-dataframes-substr-with-alias-column

Select data using conditional operators in filter

                    

df.filter((df['age']>=40) & (df['age']<=50)).select('name','age').show(5)

spark-dataframes-logical-filter-between

Select data using “startswith” in filter

                    

df.filter(df['name'].startswith("Don")).select('name').show()

spark-dataframes-startswith-filter

Select data using “contains” in filter

                    

df.filter(df['name'].contains('Master')).select('name').show(5)

spark-dataframes-contains-filter

For complete code check the Spark notebook here.

Conclusion

Spark DataFrames provides us with distributed, flexible and powerful functions to manipulate data at scale. Unlike Pandas DataFrame Spark DataFrame is optimized internally with a Spark SQL catalyst optimizer. Spark DataFrame is built on top of Resilient Distributed Datasets which are the foundational data structures in Spark. In this post we have looked at different Sparks DataFrame functions in PySpark such as creating DataFrames, Statistical functions, string manipulations and conditional filter fucntions among others. In the next post we will look at Pandas API on Spark in PySpark. To learn about Resilient Distributed Datasets (RDDs) in PySpark check our previous post here.

Spark DataFrames in PySpark

Post navigation


0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x