Resilient Distributed Dataset (RDD) is a fault-tolerant collection of elements partitioned across the nodes of the cluster and can be operated on in parallel. RDD is the main abstraction provided by Spark. RDD is created when a file in HDFS is generated or persisting RDD in memory where it can be reused across parallel operations. RDD is a basic data structure in Spark where other advanced data structure builts on top of it. The two main operations in RDDs is transformations and actions. In this post we will look at RDDs operations in PySpark.
RDD Operations
RRD are efficient way of distributing workloads to different nodes in Spark cluster. There are two main types of RDD operations.
- Actions. These are operations that results to a single value. Actions are applied to an RDD and the resultant output is a single value.
- Transformations. Transformations are operations applied to an RDD and outputs another RDD.
RDD Operations
1. Actions
Sparks action functions produce a value back to the Spark driver program.
Import findspark to use Spark in jupyter notebook
import findspark
findspark.init()
from pyspark import SparkContext
Initialize SparkContext
sc=SparkContext.getOrCreate()
Create Resilient Distributed Datasets (RDDs)
data=sc.parallelize([['France','50M','3T'],['India','30M','30T'],['Kenya','70M','25T'],
['Nigeria','90M','60T'],['China','20M','2T'],['USA','80M','30T'],
['UK','70M','25T'],['USA','20M','30T'],['China','70M','25T'],
['France', '50M', '3T'],['China','70M','25T'] ])
data2=sc.parallelize([23,42,78,90,12,9,42,89,42,12,50,60])
a=sc.parallelize([('USA',35),('Canada',24),('Mexico',27),('Kenya',23)])
b=sc.parallelize([('Kenya',30),('USA',35),('South Africa',23),('Rwanda',23)])
print('data\n',data,'\ndata2\n',data2,'\na\n',a,'\nb\n',b)
Show RDD Data
data.collect()
Check the RDD Persistence
data.persist().is_cached
Get number of partitions
data.getNumPartitions()
8
Show all data
data.collect()
Get number of items
len(data.collect())
11
Get distinct Records
sorted(data2.collect())
[9, 12, 12, 23, 42, 42, 42, 50, 60, 78, 89, 90]
sorted(data2.distinct().collect())
[9, 12, 23, 42, 50, 60, 78, 89, 90]
Get max value from the list
data2.max()
90
Get min value from the list
data2.min()
9
Get sum value from the list
data2.sum()
549
Get average
data2.mean()
data2.variance()
773.1875000000001
data2.stdev()
data2.stats()
data.first()
data.takeSample(1,True)
data.count()
data.countByKey().items()
dict_items([(‘France’, 2), (‘India’, 1), (‘Kenya’, 1), (‘Nigeria’, 1), (‘China’, 3), (‘USA’, 2), (‘UK’, 1)])
For complete code check the PySpark notebook here.
2. Transformations
Sparks transformation functions produce a new Resilient Distributed Dataset (RDD)
tran_data=sc.parallelize([4,6,8,2,2,6])
tran_data.collect()
tran_data.sample(2,True).collect()
[4, 6, 6, 6, 2, 6]
tran_data.map(lambda x : x*2).collect()
tran_data.flatMap(lambda x : [x,x]).collect()
data.filter(lambda x : "China" in x).collect()
tran_data.distinct().collect()
data.sortByKey(1, True).collect()
[[‘China’, ’20M’, ‘2T’],
Join two RDDs
print("a : ",a.collect())
print("b : ",b.collect())
a : [(‘USA’, 35), (‘Canada’, 24), (‘Mexico’, 27), (‘Kenya’, 23)]
a.join(b).collect()
a.leftOuterJoin(b).collect()
[(‘USA’, (35, 35)),
a.rightOuterJoin(b).collect()
[(‘USA’, (35, 35)),
a.union(b).collect()
[(‘USA’, 35),
a.subtract(b).collect()
a.intersection(b).collect()
a.cartesian(b).collect()
For complete code check the PySpark notebook here.
Conclusion
Resilient Distributed Datasets is the basic data structure in Spark. It’s fault-tolerant as they automatically recovers from node failure. Fundamentally a Spark application comprises of a driver program which runs the main function and executes various parallel operations on the worker node. We have seen that the main abstraction in Spark is RDD, although we have other abstractions such as shared variable which comprises of broadcast variables and accumulators. In this post we have looked at the two basic RDDs operations namely; actions which returns a single value after an operation has been applied to an RDD and transformation which returns a new RDD after applying an operation to the original RDD. In the next post we will look at DataFrames in Spark with PySpark. To learn about PySpark and how to get started check our previous post here.