In this tutorial, You will learn everything about the PySpark DataFrame with the help of multiple examples, and believe me after reading this article, you wouldn’t need to go anywhere to learn about the PySpark DataFrame. DataFrame is one of the major Data structures in PySpark after RDD which is used to store the data in the form of rows and columns.
As a Data Scientist, Data Engineer, and Data Analyst enthusiast, you are probably familiar with storing files on the local computer and loading that files using Pandas or PySpark, and finally performing some operations on top of that data.
This is a very tedious task when we have large files which stored trillion records and sizes in terabytes and petabytes.
In that scenario, it is not possible to load and process such type of extremely large files.
To overcome such kind of problem, Distributed processing engines like Apache Spark and Hadoop come into the picture. I think, you are familiar with Pandas DataFrame, Pandas DataFrame is one of the widest data used structures to load and process large-scale datasets but Panadas DataFrame has some limitations which I have described in Pandas DataFrame vs PySpark DataFrame section.
Before going through this article, Let’s take a little bit overview of Spark.
Headings of Contents
What is Apache Spark?
Apache Spark is an open-source and unified data analytics engine for large-scale data processing. It has the capability to load large-scale datasets and perform Data engineering, Machine learning, and Data Science on single-node machines or cluster computing, or distributed computing. Apache Spark is written in Scala programming language but now it is available for various programming languages like Python, Java, R, etc.
These are some major features of Apache Spark that make it more popular and easy to use.
- Batch/Streaming Data:- We can perform batch processing or streaming processing. The difference between batch processing and streaming processing is that In batch processing data comes to perform processing periodically but in streaming processing data comes continuously to perform processing. We can use our preferred language to process that data.
- SQL Analytics:- Apache Spark also allows us to perform SQL queries to get the reporting for dashboarding.
- Machine learning:- Spark provides a module called MLlib in order to perform machine learning operations.
Let’s move on to PySark.
What is PySpark?
PySpark is nothing but it is an interface written in Python programming just like other Python packages in order to interact with Apache Spark. Using PySpark APIs our application can use all the functionalities of Apache Spark in order to process large-scale datasets and perform operations on top of loaded datasets.
Note:- Interface means, A point where two systems meet to communicate with each other.
What is PySpark DataFrame?
DataFrame in PySpark is a kind of data structure that is used to store data in the form of rows and columns. In simple words, PySpark DataFrame is the same as a table in Relational Database Management Systems ( RDBMS ) or SQL. As we know, DataFrame in PySpark is the same as SQL table, that’s why we can perform approx all the SQL queries on top of PySpqrk DataFrame.
PySpark DataFrame is immutable means it can’t change, every time a new data frame will get created whenever a new transformation method will apply. It is also lazy evaluation in nature means it is built on top of PySpark RDD ( Resilient Distributed Datasets ).
The meaning of Lazy evaluation is that when we apply any transformation in the DataFrame it does not execute immediately however it plans to execute later.
When any action such as count(), collect(), show(), etc applies explicitly, then it returns the computed value or executed.
PySpark DataFrame runs in multiple nodes in a cluster or distributed computing. Any operation on DataFrame will perform parallel on multiple machines in a cluster.
For example, I have created a simple PySpark Dataframe having five rows and five columns as you can see below. Later we will see how to perform transformation and action on top of DataFrame.
+---------+----------+---------+--------------------+ |emp_id |first_name|last_name|salary | +---------+----------+---------+--------------------+ |Vishvajit|Rao |India |Backend Developer | |Ajay |Kumar |India |Front-end Developer | |John |Doe |USA |Data Engineer | |Vedika |Kumari |India |Data Analyst | |Bharati |Singh |India |Full Stack Developer| +---------+----------+---------+--------------------+
PySpark DataFrame vs Pandas DataFrame
PySpark DataFrame and Pandas DataFrame are both kinds of data structures in order to store value in the form of rows and columns.
In other words, You can say, an It DataFrame is a kind of data structure that is used to store values in tabular format just like a table in SQL.
Here, I have described the difference between both PySpark DataFrame and Pandas DataFrame.
PySpark DataFrame:
- PySpark DataFrame is a kind of data structure in PySpark that stores data in the form of a table like SQL database.
- PySpark DataFrame supports all SQL queries.
- PySpark Dtaframe runs on multiple nodes in a cluster.
- It can handle large datasets.
- PySpark can also use for Data Science, Machine Learning, and Data Engineering.
- PySpark DataFrame is immutable which means it can not be changed when it has been created.
- It performs the computation in memory ( RAM ).
- PySpark DataFrame is lazy in nature which means it does not execute immediately after applying the transformation. It will execute when an action is performed.
- PySpark DataFrame can run applications parallel means in multiple nodes in a cluster or even on a single node.
- PySpark DataFrame assures fault tolerance means it is the capability of the system to work properly even if any sub-component of the system gets fails.
Pandas DataFrame:
- Pandas DataFrame is also a kind of data structure that stores data like a table.
- Pandas DataFrame does not support parallelism.
- Pandas DataFrame un on only single machine or single node.
- Pandas DataFrame works slowly in large datasets.
- DataFrame in Pandas is a mutable data structure which means we can change it, once it has been created.
- Pandas DataFrame executes immediately unlike PySpark DataFrame.
- Pandas DataFrame does not support fault tolerance.
- Pandas DataFrame is also used for Data Engineering, Data Analytics, Data Science, and Machine Learning.
- It is perfect for small datasets.
How to Create PySpark Dataframe?
We can create PySpark DataFrame in many multiple ways. Here we are about some ways from them in order to create DataFrame.
To create DataFrame in PySpark, you have to follow some steps which are given below.
Creating Spark Session
Spark session is an entry point for any Pyspark or Spark application which allows us to work with PySpark RDD, DataFrame, and Datasets. PySpark SQL module has a class called SparkSession and it has a builder attribute that is used to create spark session. To create a spark session we have to import SparkSession class from pyspark.sql module.
As you can see below code, How have I created a spark session?
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("testing").getOrCreate()
In the above code, spark represents the name of the spark session, and SparkSession.builder is used to initiating the spark session and appName() is used to provide the name of the spark application, and the getOrCreate() method is used to return the existing spark session or create one if any spark session is not available.
Creating DataFrame
Here, I have shown multiple ways to create PySpark DataFrame.
Creating PySpark DataFrame from a list of tuples:
Now we have a spark session called spark that has a method called createDataFrame() to create a brand new DataFrame. The createDataFrame() method takes different parameters to create a new DataFrame like PySpark RDD, Pandas DataFrame, PySpark Row, Tuple, Data List, etc.
from pyspark.sql import SparkSession data = [ ('E1001', 'Vishvajit', 'Rao', 20000, 'IT'), ('E1002', 'Harsh', 'Kumar', 15000, 'HR'), ('E1003', 'Pankaj', 'Singh', 30000, 'IT'), ('E1004', 'Pratik', 'Kumar', 18000, 'HR'), ('E1005', 'Harshali', 'Kumari', 23000, 'IT'), ('E1006', 'Vinay', 'Saini', 13000, 'Account'), ('E1007', 'Pratiksha', 'Kaushik', 30000, 'HR'), ('E1008', 'Shailja', 'Srivastava', 30000, 'Account'), ('E1009', 'Vaibhav', 'Pathak', 32000, 'IT'), ('E10010', 'Harnoor', 'Singh', 50000, 'IT'), ('E10011', 'Vedika', 'Kumari', 40000, 'Account'), ] # column column = ['emp_id', 'first_name', 'last_name', 'salary', 'department'] # creating spark session spark = SparkSession.builder.appName("testing").getOrCreate() # creating DataFrame df = spark.createDataFrame(data, column) df.show()
When the above code will execute, the output will be like this.
+------+----------+----------+------+----------+
|emp_id|first_name|last_name |salary|department|
+------+----------+----------+------+----------+
|E1001 |Vishvajit |Rao |20000 |IT |
|E1002 |Harsh |Kumar |15000 |HR |
|E1003 |Pankaj |Singh |30000 |IT |
|E1004 |Pratik |Kumar |18000 |HR |
|E1005 |Harshali |Kumari |23000 |IT |
|E1006 |Vinay |Saini |13000 |Account |
|E1007 |Pratiksha |Kaushik |30000 |HR |
|E1008 |Shailja |Srivastava|30000 |Account |
|E1009 |Vaibhav |Pathak |32000 |IT |
|E10010|Harnoor |Singh |50000 |IT |
|E10011|Vedika |Kumari |40000 |Account |
+------+----------+----------+------+----------+
Creating PySpark DataFrame using PySpark Row:
PySpark provides a Row class which is defined inside pyspark.sql package. The Row class is used to create a single record of the DataFrame. It takes column name and their value as parameters and returns a Row class object.
The createDataFrame() is also used to use create DataFrame from Row class objects.
Let’s how can we create PySpark DataFrame with the help of the PySpark Row class.
from pyspark.sql import SparkSession from pyspark.sql import Row student_name = ['John', 'Rambo', 'Kaushik', 'Jatri'] student_age = [23, 25, 24, 30] spark = SparkSession.builder.appName("testing").getOrCreate() rows = [] # creating List of Row object for i in zip(student_name, student_age): rows.append(Row(name=i[0], age=i[1])) # creating dataframe df = spark.createDataFrame(data=rows) df.show()
The newly created data frame will look like this.
+-------+---+
| name|age|
+-------+---+
| John| 23|
| Rambo| 25|
|Kaushik| 24|
| Jatri| 30|
+-------+---+
Creating DataFrame using Pandas DataFrame:
As we know the spark session createDataFrame() method takes multiple parameters in order to create a new DataFrame, Pandas DataFrame DataFrame is also one of them.
Here, I have created a Pandas DataFrame and then created PySpark DataFrame with the help of the Pandas DataFrame.
from pyspark.sql import SparkSession from pandas import DataFrame data = [ ('E1001', 'Vishvajit', 'Rao', 20000, 'IT'), ('E1002', 'Harsh', 'Kumar', 15000, 'HR'), ('E1003', 'Pankaj', 'Singh', 30000, 'IT'), ('E1004', 'Pratik', 'Kumar', 18000, 'HR'), ('E1005', 'Harshali', 'Kumari', 23000, 'IT'), ('E1006', 'Vinay', 'Saini', 13000, 'Account'), ('E1007', 'Pratiksha', 'Kaushik', 30000, 'HR'), ('E1008', 'Shailja', 'Srivastava', 30000, 'Account'), ('E1009', 'Vaibhav', 'Pathak', 32000, 'IT'), ('E10010', 'Harnoor', 'Singh', 50000, 'IT'), ('E10011', 'Vedika', 'Kumari', 40000, 'Account'), ] # column column = ['emp_id', 'first_name', 'last_name', 'salary', 'department'] # creating spark session spark = SparkSession.builder.appName("testing").getOrCreate() # creating pandas dataframe pandas_df = DataFrame(data, columns=column) # creating pyspark dataframe using pandas df df = spark.createDataFrame(pandas_df) df.show()
As you can see in the above code, How have I created PySpark DataFrame with the help of the Pandas DataFrame?
+------+----------+----------+------+----------+
|emp_id|first_name|last_name |salary|department|
+------+----------+----------+------+----------+
|E1001 |Vishvajit |Rao |20000 |IT |
|E1002 |Harsh |Kumar |15000 |HR |
|E1003 |Pankaj |Singh |30000 |IT |
|E1004 |Pratik |Kumar |18000 |HR |
|E1005 |Harshali |Kumari |23000 |IT |
|E1006 |Vinay |Saini |13000 |Account |
|E1007 |Pratiksha |Kaushik |30000 |HR |
|E1008 |Shailja |Srivastava|30000 |Account |
|E1009 |Vaibhav |Pathak |32000 |IT |
|E10010|Harnoor |Singh |50000 |IT |
|E10011|Vedika |Kumari |40000 |Account |
+------+----------+----------+------+----------+
Creating PySpark DataFrame from RDD:
RDD (Resilient Distributed Datasets) is also a kind of data structure in PySpark, We can also create PySpark DataFrame with the help of the RDD.
There are two ways to create PySpark DataFrame using RDD, First, we can pass created RDD inside the spark session createDataFrame() method, and second, use RDD toDF() method.
First ways:
In this way, I have created PySpark DataFrame with the help of the createDataFrame() method. I have just passed the created RDD along with the new column name in the createDataFrame() method.
data = [ ('Rambo', 1100, 'BCA'), ('Shital', 1101, 'BTech'), ('Harrry', 1102, 'MCA'), ('Pankaj', 1103, 'MTech'), ('Jayanti', 1104, 'PHD'), ] # columns name column = ['student_name', 'roll_number', 'course'] # creating spark session spark = SparkSession.builder.appName("testing").getOrCreate() # creating spark context sc = spark.sparkContext # creating rdd rdd = sc.parallelize(data) pprint(rdd.collect()) # creating pyspark dataframe using rdd df = spark.createDataFrame(rdd, column) df.show()
Second Ways:
The second way of creating DataFrame is, by using RDD toDF() method. The RDD toDF() method takes schema or column names as a parameter and returns a new DataFrame for RDD.
rdd.toDF(column).show()
The newly created PySpark DataFrame will be like this.
+------------+-----------+------+
|student_name|roll_number|course|
+------------+-----------+------+
| Rambo| 1100| BCA|
| Shital| 1101| BTech|
| Harrry| 1102| MCA|
| Pankaj| 1103| MTech|
| Jayanti| 1104| PHD|
+------------+-----------+------+
Create Empty DataFrame in PySpark
In this section, I am going to create a PySpark DataFrame with no data or records. There are multiple ways to create an empty DataFrame in PySpark.You will see all of them.
Create empty DataFrame with emptyRDD():
The emptyRDD() is a spark context method that is used to create an empty RDD. After creating an empty RDD, we can pass it in the createDataFrame() method in order to create an empty DataFrame.
columns = StructType([]) rdd = spark.sparkContext.emptyRDD() df = spark.createDataFrame(rdd, columns) df.show()
Output
++
||
++
++
Create an empty data frame with an empty list
We can also use the spark session createDataFrame() method along with an empty list in order to create an empty PySpark DataFrame.
columns = StructType([]) df = spark.createDataFrame(data=[], schema=columns) df.show()
Create empty DataFrame with schema
You can also create an empty PySpark DataFrame along with schema columns or schema.
from pyspark.sql import SparkSession from pyspark.sql.types import StringType, IntegerType, StructField, StructType columns = StructType( [ StructField("name", StringType(), nullable=True), StructField("roll_number", IntegerType(), nullable=True), StructField("course", StringType(), nullable=True), StructField("marks", IntegerType(), nullable=True), ] ) df = spark.createDataFrame(data=[], schema=columns) df.show()
Output
+----+-----------+------+-----+
|name|roll_number|course|marks|
+----+-----------+------+-----+
+----+-----------+------+-----+
So, this is how you can create an empty DataFrame in PySpark.
Creating DataFrame from Data Sources
So far we have seen how to create PySpark DataFrame with the help of the list of tuples, RDD, and Pandas DataFrame, but in real-life projects, mostly you will load data from data sources like CSV files, JSON files, TXT files, Data warehouse or any database management system like MySQL, SQL Server, etc.
You don’t need to import any external libraries to work with these data sources because PySpark supports all these data sources by default.
Here, I am about to tell you how to load data from CSV, JSON, and TXT files into PySpark DataFrame with the help of the example.
loading data from CSV file:
I have created a CSV file named employee.csv having some records as you can see below. Now, I am about to load this employee.csv file into pySpark DataFrame.
name,designation,country Hayati,Developer,India Vishvajit,Developer,India Vaibhav,Tester,India Shital,HR,USA
Pyaprk provides a DataFrameReader class and it has various methods to load data into PySpark, The csv() method is one of the. This method takes a path of the CSV file as a parameter and loads CSV data into the PySpark DataFrame.
# creating spark session spark = SparkSession.builder.appName("testing").getOrCreate() df = spark.read.option('header', True).csv('employee.csv') df.show()
After executing the above code, The above employee.csv will be like this in PySpark DataFrame.
+---------+-----------+-------+
| name|designation|country|
+---------+-----------+-------+
| Hayati| Developer| India|
|Vishvajit| Developer| India|
| Vaibhav| Tester| India|
| Shital| HR| USA|
+---------+-----------+-------+
Creating PySpark DataFrame from JSON file:
As a developer, sometimes we have to deal with JSON files, PySpark also has the capability to load JSON file data into PySpark DataFrame.
I have created a JSON file that contained the country name and their capital and Now I want to load data into DataFrame.
{"capital": "New Delhi","country_name": "India", } {"capital": "Washington D.C.","country_name": "United States", } {"capital": "Ottawa","country_name": "Canada",} {"capital": "Beijing","country_name": "China",}
To load JSON data into PySpark DataFrame, I have used the json() method. The json() method takes the path of the JSON file and parameter and returns a new DataFrame.
df = spark.read.json('employee.json') df.show()
The JSON data will be like this in DataFrame format.
+---------------+-------------+
| capital| country_name|
+---------------+-------------+
| New Delhi| India|
|Washington D.C.|United States|
| Ottawa| Canada|
| Beijing| China|
+---------------+-------------+
Creating DataFrame from TXT file:
Similarly, with CSV, and JSON files we can also load data from a Txt file. Use the text() method to load data from a text file into PySpark Dataframe.
df = spark.read.option('header', True).csv("employee.txt") df.show()
Output
+---------+-----------+-------+
| name|designation|country|
+---------+-----------+-------+
| Hayati| Developer| India|
|Vishvajit| Developer| India|
| Vaibhav| Tester| India|
| Shital| HR| USA|
+---------+-----------+-------+
Printing Schema of The PySpark DataFrame
Sometimes we want to define the schema or data type of the columns for a Pyspark DataFrame, Then we can also define schema by using StructType.
As you can see in the below code, how I have created a column schema for the DataFrame.
from pyspark.sql import SparkSession from pyspark.sql.types import StringType, IntegerType, StructField, StructType data = [ ('Rambo', 1100, 'BCA'), ('Shital', 1101, 'BTech'), ('Harrry', 1102, 'MCA'), ('Pankaj', 1103, 'MTech'), ('Jayanti', 1104, 'PHD'), ] columns = StructType( [ StructField("name", StringType(), nullable=True), StructField("roll_number", IntegerType(), nullable=True), StructField("course", StringType(), nullable=True), ] ) # # creating spark session spark = SparkSession.builder.appName("testing").getOrCreate() df = spark.createDataFrame(data, columns) df.show()
Displaying the schema of the DataFrame:
Sometimes we want to check the schema of the PySpark Dataframe, so that we can change it according to our requirement, In that case, we can use a DataFrame method called printSchema() to display the schema of the existing DataFrame.
It displays column names along with their data types, as you can see below.
I have displayed the schema of the above-created DataFrame.
print(df.printSchema())
Output
root
|-- name: string (nullable = true)
|-- roll_number: integer (nullable = true)
|-- course: string (nullable = true)
I hope you will have understood the meaning of the PySpark DataFrame as well as the process of creating a new Dataframe in PySpark.
Other Useful PySpark Tutorials:
- How to convert PySpark Row To Dictionary
- PySpark Column Class with Examples
- PySpark Sort Function with Examples
- How to read CSV files using PySpark
- PySpark col() Function with Examples
- Convert PySpark DataFrame Column to List
- How to Write PySpark DataFrame to CSV
- How to Convert PySpark DataFrame to JSON
- How to Apply groupBy in Pyspark DataFrame
- Merge Two DataFrames in PySpark with the Same Column Names
- How to Count Null and NaN Values in Each Column in PySpark DataFrame?
Summary
In this article, we have seen all about PySpark DataFrame with the help of the examples. If you are planning to learn PySpark or are a beginner PySpark developer then you should have knowledge about the PySpark DataFrame because it is about to use frequently on your real-life project as a data engineer.
If you have knowledge of SQL queries, Then it will be easy to learn PySpark because your DataFrame in PySpark is the same as the table in the Relation database and you can perform approx all the SQL operations on the top of PySpark Dataframe.
YOu can also create PySpark Dataframe without the help of multiple ways like using Pandas Dataframe, RDD, List of tuples, PySpark Rows, Data Sources (JSON, CSV, TXT, Databases, etc), etc.
If you found this article helpful, please share and keep visiting for further PySpark tutorials.
Thanks for your valuable timeā¦