Hi PySpark Lovers, In this article, you will see the complete process to convert PySpark DataFrame to RDD with the help of the example.
After reading this article, you will not have any confusion regarding How to convert PySpark DataFrame to RDD (Resilient Distributed Dataset).PySpark DataFrame has a property called RDD that is used to convert existing PySpark DataFrame to RDD.
Headings of Contents
Why do we need to convert PySpark DataFrame to RDD?
RDD ( Resilient Distributed Dataset ) is a fundamental data structure in PySpark that comes with a lot of features and functionality that does not available in PySpark DataFrame.
To perform more operations on top of PySpark DataFrame firstly we have to convert it into RDD and then we will have to apply all the required operations on top of DataFrame.
After applying all the required operations on DataFrame in PySpark again we can convert RDD to PySpark DataFrame.
Since PySpark version 1.0, PySpark DataFrame was introduced called RDD. The rdd attribute converts the PySpark DataFrame to RDD.
The RDD represents the list of PySpark Row objects. You can learn more about the PySpark Row class by clicking here.
Let’s create a PySpark DataFrame so that we can convert DataFrame into RDD.
Create PySpark DataFrame
For demonstration for this article, I have created a sample PySpark DataFrame and after I will convert it into PySpark RDD (Resilient Distributed Dataset ).
Code to Generate PySpark DataFrame:
from pyspark.sql import SparkSession data = [ ("Sharu", "Developer", "IT", 33000), ("John", "Developer", "IT", 40000), ("Jaiyka", "HR Executive", "HR", 25000), ("Shantanu", "Manual Tester", "IT", 25000), ("Avantika", "Senior HR Manager", "HR", 45000), ("Vaishali", "Junior Accountant", "Account", 23000), ("Vinay", "Senior Accountant", "Account", 40000), ] # columns = ["name", "designation", "department", "salary"] # # # creating spark session spark = SparkSession.builder.appName("testing").getOrCreate() # # # creating dataframe df = spark.createDataFrame(data, columns) df.show(truncate=False)
Explanation of the above code:
let’s explain the meaning of the above code.
- First, I imported the SparkSession class from the pyspark.sql module.
- I have defined a list of tuples and each tuple contained some information about the employee where the name, designation, department, and salary have been comprised.
- Third, I have defined columns of PySpark DataFrame.
- Fourth, I have created the spark session with the help of the builder, appName(), and getOrCreate().
- Fifth, I have created the PySpark DataFrame by using the createDataFrame() spark session method.
- In the end, I have displayed the created PySpark DataFrame using the show() method.
After executing the above code, The DataFrame will be like this.
+--------+-----------------+----------+------+
|name |designation |department|salary|
+--------+-----------------+----------+------+
|Sharu |Developer |IT |33000 |
|John |Developer |IT |40000 |
|Jaiyka |HR Executive |HR |25000 |
|Shantnu |Manual Tester |IT |25000 |
|Avantika|Senior HR Manager|HR |45000 |
|Vaishali|Junior Accountant|Account |23000 |
|Vinay |Senior Accountant|Account |40000 |
+--------+-----------------+----------+------+
Now let’s see how can we convert this DataFrame to RDD.
How to convert PySpark DataFrame to RDD?
For information, I would say, Every PySpark DataFrame indicates the instance of the PySpark DataFrame class.PySpark DataFrame class has an attribute called rdd. The rdd attribute returns the list of PySpark Row class objects and each Row class object indicates the single row or record of the PySpark DataFrame.
let’s see how can we convert the above-created PySpark DataFrame DataFrame to Dictionary.
rdd = df.rdd print(rdd.collect())
The RDD will like this.
[Row(name='Sharu', designation='Developer', department='IT', salary=33000),
Row(name='John', designation='Developer', department='IT', salary=40000),
Row(name='Jaiyka', designation='HR Executive', department='HR', salary=25000),
Row(name='Shantnu', designation='Manual Tester', department='IT', salary=25000),
Row(name='Avantika', designation='Senior HR Manager', department='HR', salary=45000),
Row(name='Vaishali', designation='Junior Accountant', department='Account', salary=23000),
Row(name='Vinay', designation='Senior Accountant', department='Account', salary=40000)]
In the above RDD, As you can see, each row represents the particular record or row of the PySpark DataFrame.
As we know, PySpark DataFrame does not come with more features like RDD that’s why we can perform some extra operations on top of RDD and then convert them into DataFrame.
PySpark DataFrame does not have a map() method to apply map operations on top of PySpark DataFrame. Suppose we want to add a new column in RDD that will be a bonus for each employee. The bonus will be 10 percent of the salary. The map() function takes a function and applies each element of rdd and returns the result as an RDD.
Note:- You have remembered, Each column in PySpark DataFrame indicates the instance of the PySpark Column class.
new_rdd = rdd.map(lambda x: [x['name'], x['designation'], x['department'], x['salary'], (x['salary'] * 10) / 100]) new_column = ["name", "designation", "department", "salary", "bonus"] new_df = new_rdd.toDF(new_column) new_df.show()
After adding a bonus column the new PySpark DataFrame will be like this.
+--------+-----------------+----------+------+------+ | name| designation|department|salary| bonus| +--------+-----------------+----------+------+------+ | Sharu| Developer| IT| 33000|3300.0| | John| Developer| IT| 40000|4000.0| | Jaiyka| HR Executive| HR| 25000|2500.0| | Shantnu| Manual Tester| IT| 25000|2500.0| |Avantika|Senior HR Manager| HR| 45000|4500.0| |Vaishali|Junior Accountant| Account| 23000|2300.0| | Vinay|Senior Accountant| Account| 40000|4000.0| +--------+-----------------+----------+------+------+
Complete Source Code
You can get all the code here, That we have seen throughout this article.
from pyspark.sql import SparkSession data = [ ("Sharu", "Developer", "IT", 33000), ("John", "Developer", "IT", 40000), ("Jaiyka", "HR Executive", "HR", 25000), ("Shantnu", "Manual Tester", "IT", 25000), ("Avantika", "Senior HR Manager", "HR", 45000), ("Vaishali", "Junior Accountant", "Account", 23000), ("Vinay", "Senior Accountant", "Account", 40000), ] columns = ["name", "designation", "department", "salary"] # creating spark session spark = SparkSession.builder.appName("testing").getOrCreate() # creating dataframe df = spark.createDataFrame(data, columns) # displaying dataframe df.show(truncate=False) # ---------- Code to convert rdd and apply map function on rdd. rdd = df.rdd # new rdd after applying map function new_rdd = rdd.map( lambda x: [ x["name"], x["designation"], x["department"], x["salary"], (x["salary"] * 10) / 100, ] ) new_column = columns = ["name", "designation", "department", "salary", "bonus"] new_df = new_rdd.toDF(new_column) new_df.show()
Python PDF File Having Complete Code
I have provided a simple PDF file that has included all the PySpark code about how to convert PySpark Row to Dictionary in a PDF file in a very short. You can download and print the PDF Files below for long-term usage.
Summary
So, throughout this article, we have successfully seen all about how to convert PySpark DataFrame to RDD ( Resilient Distributed Dataset ) with the help of the example. PySpark DataFrame provides an attribute called rdd that is used to return an RDD.
We can convert PySpark DataFrame to RDD when we want to perform some extra operations meaning transformations on top of DataFrame because RDD has a lot of transformations in comparison to PySpark DataFrame.
For example, You can see in the above example, how I have applied the map() transformation method on top of RDD instead of PySpark DataFrame.
If you like this article, please share and keep visiting for further PySpark tutorials.
Thanks for taking the time to read, We are very grateful.
Have a nice day… ❤️