Resilient Distributed Dataset (RDD) is a fault-tolerant collection of elements partitioned across the nodes of the cluster and can be operated on in parallel. RDD is the main abstraction provided by Spark. RDD is created when a file in HDFS is generated or persisting RDD in memory where it can be reused across parallel operations. RDD is a basic data structure in Spark where other advanced data structure builts on top of it. The two main operations in RDDs is transformations and actions. In this post we will look at RDDs operations in PySpark.

apache-spark-logo

RDD Operations

RRD are efficient way of distributing workloads to different nodes in Spark cluster. There are two main types of RDD operations.

  1. Actions. These are operations that results to a single value. Actions are applied to an RDD and the resultant output is a single value.
  2. Transformations. Transformations are operations applied to an RDD and outputs another RDD.

RDD Operations

1. Actions

Sparks action functions produce a value back to the Spark driver program.

Import findspark to use Spark in jupyter notebook

                    

import findspark
findspark.init()

from pyspark import SparkContext

Initialize SparkContext

                    

sc=SparkContext.getOrCreate()

Create Resilient Distributed Datasets (RDDs)

                    

data=sc.parallelize([['France','50M','3T'],['India','30M','30T'],['Kenya','70M','25T'],
                     ['Nigeria','90M','60T'],['China','20M','2T'],['USA','80M','30T'],
                     ['UK','70M','25T'],['USA','20M','30T'],['China','70M','25T'],
                     ['France', '50M', '3T'],['China','70M','25T'] ])
data2=sc.parallelize([23,42,78,90,12,9,42,89,42,12,50,60])
a=sc.parallelize([('USA',35),('Canada',24),('Mexico',27),('Kenya',23)])
b=sc.parallelize([('Kenya',30),('USA',35),('South Africa',23),('Rwanda',23)])
print('data\n',data,'\ndata2\n',data2,'\na\n',a,'\nb\n',b)

pyspark-create-rdd

Show RDD Data

rdd-data

Check the RDD Persistence

pyspark-rdd-persistent

Get number of partitions

8

Show all data

rdd-data

Get number of items

11

Get distinct Records

[9, 12, 12, 23, 42, 42, 42, 50, 60, 78, 89, 90]

                    

sorted(data2.distinct().collect())

[9, 12, 23, 42, 50, 60, 78, 89, 90]

Get max value from the list

90

Get min value from the list

9

Get sum value from the list

549

Get average

45.75

Get variance value from the list
Get standard deviation value from the list
Get mean, std deviation, max and min values
(count: 12, mean: 45.75, stdev: 27.80624929759496, max: 90.0, min: 9.0)
Show first column
[‘France’, ’50M’, ‘3T’]
Get random record
[[‘France’, ’50M’, ‘3T’]]
Count records
Count occurence of the items

dict_items([(‘France’, 2), (‘India’, 1), (‘Kenya’, 1), (‘Nigeria’, 1), (‘China’, 3), (‘USA’, 2), (‘UK’, 1)])

For complete code check the PySpark notebook here.

2. Transformations

Sparks transformation functions produce a new Resilient Distributed Dataset (RDD)

                    

tran_data=sc.parallelize([4,6,8,2,2,6]) 
tran_data.collect()

[4, 6, 8, 2, 2, 6]

Select two random items
                    

tran_data.sample(2,True).collect()

Use map to multiply each item with 2
                    

tran_data.map(lambda x : x*2).collect()

[8, 12, 16, 4, 4, 12]
Use filterMap to duplicate the items

[4, 4, 6, 6, 8, 8, 2, 2, 2, 2, 6, 6]

Filter from data where item is China
[[‘China’, ’20M’, ‘2T’], [‘China’, ’70M’, ’25T’], [‘China’, ’70M’, ’25T’]]
Return unique items
[8, 2, 4, 6]
Sorting

[[‘China’, ’20M’, ‘2T’],

[‘China’, ’70M’, ’25T’],

[‘China’, ’70M’, ’25T’],

[‘France’, ’50M’, ‘3T’],

[‘France’, ’50M’, ‘3T’],

[‘India’, ’30M’, ’30T’],

[‘Kenya’, ’70M’, ’25T’],

[‘Nigeria’, ’90M’, ’60T’],

[‘UK’, ’70M’, ’25T’],

[‘USA’, ’80M’, ’30T’],

[‘USA’, ’20M’, ’30T’]]

Join two RDDs

a : [(‘USA’, 35), (‘Canada’, 24), (‘Mexico’, 27), (‘Kenya’, 23)]

b : [(‘Kenya’, 30), (‘USA’, 35), (‘South Africa’, 23), (‘Rwanda’, 23)]

[(‘USA’, (35, 35)), (‘Kenya’, (23, 30))]

Left outer join

[(‘USA’, (35, 35)),

(‘Mexico’, (27, None)),

(‘Canada’, (24, None)),

(‘Kenya’, (23, 30))]

Right outer join

[(‘USA’, (35, 35)),

(‘South Africa’, (None, 23)),

(‘Kenya’, (23, 30)),

(‘Rwanda’, (None, 23))]

Union

[(‘USA’, 35),

(‘Canada’, 24),

(‘Mexico’, 27),

(‘Kenya’, 23),

(‘Kenya’, 30),

(‘USA’, 35),

(‘South Africa’, 23),

(‘Rwanda’, 23)]

Difference
Intersection
                    

a.intersection(b).collect()

[(‘USA’, 35)]
Cartesian

[((‘USA’, 35), (‘Kenya’, 30)),

((‘USA’, 35), (‘USA’, 35)),

((‘USA’, 35), (‘South Africa’, 23)),

((‘USA’, 35), (‘Rwanda’, 23)),

((‘Canada’, 24), (‘Kenya’, 30)),

((‘Canada’, 24), (‘USA’, 35)),

((‘Canada’, 24), (‘South Africa’, 23)),

((‘Canada’, 24), (‘Rwanda’, 23)),

((‘Mexico’, 27), (‘Kenya’, 30)),

((‘Mexico’, 27), (‘USA’, 35)),

((‘Mexico’, 27), (‘South Africa’, 23)),

((‘Mexico’, 27), (‘Rwanda’, 23)),

((‘Kenya’, 23), (‘Kenya’, 30)),

((‘Kenya’, 23), (‘USA’, 35)),

((‘Kenya’, 23), (‘South Africa’, 23)),

((‘Kenya’, 23), (‘Rwanda’, 23))]

For complete code check the PySpark notebook here.

Conclusion

Resilient Distributed Datasets is the basic data structure in Spark. It’s fault-tolerant as they automatically recovers from node failure. Fundamentally a Spark application comprises of a driver program which runs the main function and executes various parallel operations on the worker node. We have seen that the main abstraction in Spark is RDD, although we have other abstractions such as shared variable which comprises of broadcast variables and accumulators. In this post we have looked at the two basic RDDs operations namely; actions which returns a single value after an operation has been applied to an RDD and transformation which returns a new RDD after applying an operation to the original RDD. In the next post we will look at DataFrames in Spark with PySpark. To learn about PySpark and how to get started check our previous post here.

RDDs in Spark with PySpark

Post navigation


0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x