In this article, We will see the Top 30 PySpark DataFrame methods with example. Being a Data Engineer, Data Analyst, or PySpark Developer you must know the PySpark DataFrame methods because with the help of the methods we can perform various operations on top of the PySpark DataFrame.
Remember, All these methods will always apply on top of the PySpark DataFrame.
As we know PySpark is one of the open source distributed compted frameworks that offers to use Spark using Python programming language.
PySpark DataFrame is one of the most widely used data structures that allows us to perform operations over data and it provides high-level operations on structure data.
Now, Let’s take an overview of the most common and useful methods in PySpark DataFrames, along with examples.
To apply DataFrame methods, first, we need to create a PySpark DataFrame.
Headings of Contents
- 1 Create a PySpark DataFrame
- 2 PySpark DataFrame Methods with Examples
- 2.1 show()
- 2.2 select()
- 2.3 filter() or where()
- 2.4 groupBy() and agg()
- 2.5 withColumn()
- 2.6 orderBy() or sort()
- 2.7 drop()
- 2.8 distinct()
- 2.9 count()
- 2.10 join()
- 2.11 union()
- 2.12 dropDuplicates()
- 2.13 alias()
- 2.14 collect()
- 2.15 fillna()
- 2.16 dropna()
- 2.17 fill() and replace()
- 2.18 sample()
- 2.19 rdd
- 2.20 describe()
- 2.21 transform()
- 2.22 explode()
- 2.23 first()
- 2.24 head()
- 2.25 toPandas()
- 2.26 toJSON()
- 2.27 withColumns()
- 2.28 take(n)
- 2.29 tail(n)
- 2.30 limit(n)
- 2.31 columns
- 2.32 isEmpty()
- 2.33 toDF()
- 3 Conclusion
Create a PySpark DataFrame
I have already prepared a CSV file to create a PySpark DatFrame.
Here, I am using a CSV file to create a PySpark DataFrame and now we will use this DataFrame throughout this course Sometimes we will also create a new PySpark DataFrame form list of dictionaries. I have already written an article about reading CSV files in PySpark DataFrame, you can read it for more clarity.
from pyspark.sql import SparkSession # creating spark session spark = SparkSession.builder.appName("Programming Funda").getOrCreate() df = ( spark.read.option("header", True) .option("inferSchema", True) .format("CSV") .csv("./sample_data.csv") ) # displaying the PySpark DataFrame df.show()
Note:- The spark variable in the above example is the spark session in PySpark which is the entry point of the spark application.
Output DataFrame:
+----------+---------+------+----------+---+----------+----+------+ |first_name|last_name|gender|department|age| date| id|salary| +----------+---------+------+----------+---+----------+----+------+ | Dulce| Abril|Female| IT| 32|2017-10-15|1562| 34500| | Mara|Hashimoto|Female| Testing| 25|2016-08-16|1582| 54000| | Philip| Gent| Male| IT| 36|2015-05-21|2587| 18700| | Kathleen| Hanner|Female| Marketing| 25|2017-10-15|1876| 24000| | Mara|Hashimoto|Female| Testing| 25|2016-08-16|1582| 31000| | Kathleen| Hanner|Female| IT| 25|2017-10-15|1876| 29000| | Vishvajit| Rao| Male| Testing| 24|2023-04-10| 232| 35000| | Ajay| Kumar| Male| Marketing| 27|2018-04-10|1234| 34560| | Dulce| Abril|Female| Marketing| 32|2017-10-15|1562| 28750| | Harsh| Kumar| Male| Marketing| 26|2022-04-10|1232| 12560| +----------+---------+------+----------+---+----------+----+------+
PySpark DataFrame Methods with Examples
Now Let’s explore all the PySpark DataFrame methods step by step with the help of the example.Here, I have covered almost all the PySpark DataFrame methods that will most important role in your daily PySpark applications.
show()
The show() method is used to display the contents of the DataFrame. By default, it shows the top 20 rows.
df.show()
select()
The select() method allows you to select specific columns from a DataFrame.
new_df = df.select("first_name", "last_name", "age") new_df.show()
Output DataFrame
+----------+---------+---+ |first_name|last_name|age| +----------+---------+---+ | Dulce| Abril| 32| | Mara|Hashimoto| 25| | Philip| Gent| 36| | Kathleen| Hanner| 25| | Mara|Hashimoto| 25| | Kathleen| Hanner| 25| | Vishvajit| Rao| 24| | Ajay| Kumar| 27| | Dulce| Abril| 32| | Harsh| Kumar| 26| +----------+---------+---+
filter() or where()
The filter() or where() method is used to filter rows that meet certain conditions.
Using filter():
from pyspark.sql.functions import col new_df = df.filter(col("age") > 25) new_df.show()
Using where():
from pyspark.sql.functions import col new_df = df.where(col("age") > 25) new_df.show()
This example will display all the records whose age is greater than 25.
groupBy() and agg()
The groupBy() method is used to group data based on one or more columns, and agg() allows you to perform aggregation functions on grouped data.
Here, I am calculating the average salary of each department.
from pyspark.sql.functions import avg new_df = df.groupBy("department").agg(avg("salary").alias("average_salary")) new_df.show()
Output DataFrame
+----------+--------------+ |department|average_salary| +----------+--------------+ | Marketing| 24967.5| | IT| 27400.0| | Testing| 40000.0| +----------+--------------+
withColumn()
The withColumn() method is used to add or modify a column in the DataFrame. For example, I want to add 5 to each employee’s age value.
from pyspark.sql.functions import col new_df = df.withColumn("modified_age", col("age") + 5).select( "first_name", "last_name", "modified_age" ) new_df.show()
Output DataFrame:
+----------+---------+------------+ |first_name|last_name|modified_age| +----------+---------+------------+ | Dulce| Abril| 37| | Mara|Hashimoto| 30| | Philip| Gent| 41| | Kathleen| Hanner| 30| | Mara|Hashimoto| 30| | Kathleen| Hanner| 30| | Vishvajit| Rao| 29| | Ajay| Kumar| 32| | Dulce| Abril| 37| | Harsh| Kumar| 31| +----------+---------+------------+
orderBy() or sort()
The orderBy() or sort() method is used to sort the PySpark DataFrame by one or more columns. For example, I want to sort the employees by their salary.
new_df = df.orderBy(df.salary.desc()).select("first_name", "salary") new_df.show()
Output DataFrame:
+----------+------+ |first_name|salary| +----------+------+ | Mara| 54000| | Vishvajit| 35000| | Ajay| 34560| | Dulce| 34500| | Mara| 31000| | Kathleen| 29000| | Dulce| 28750| | Kathleen| 24000| | Philip| 18700| | Harsh| 12560| +----------+------+
👉You can explore more about PySpark Sort functions.
drop()
The drop() method is used to remove one or more columns from a DataFrame.
df.drop("age").show()
distinct()
The distinct() method returns a new DataFrame with unique rows, removing duplicates.
new_df = df.distinct() new_df.show()
count()
The count() method returns the number of rows in the DataFrame.
new_df = df.count() new_df.show()
join()
The join() method allows you to perform SQL-like joins between DataFrames.I am performing PySpark DataFrame join.
from pyspark.sql import SparkSession from pyspark.sql.functions import col # creating spark session spark = SparkSession.builder.appName("Programming Funda").getOrCreate() data = [ {"first_name": "John", "last_name": "Doe", "department_id": 1}, {"first_name": "Vishvajit", "last_name": "Rwo", "department_id": 2}, {"first_name": "Pankaj", "last_name": "Kumar", "department_id": 3}, ] data2 = [ {"department_id": 1, "name": "Finance"}, {"department_id": 2, "name": "IT"}, {"department_id": 3, "name": "Sales"}, ] df_emp = spark.createDataFrame(data) df_dep = spark.createDataFrame(data2) new_df = df_emp.join(df_dep, on="department_id").withColumn( "department_name", col("name") ) new_df = new_df.select("first_name", "last_name", "department_name") new_df.show()
Output DataFrame:
+----------+---------+---------------+ |first_name|last_name|department_name| +----------+---------+---------------+ | John| Doe| Finance| | Vishvajit| Rwo| IT| | Pankaj| Kumar| Sales| +----------+---------+---------------+
union()
The union() DataFrame method is used to concatenate two PySpark DataFrames with the same schema.
from pyspark.sql import SparkSession # creating spark session spark = SparkSession.builder.appName("Programming Funda").getOrCreate() data = [ {"first_name": "John", "last_name": "Doe"}, {"first_name": "Vishvajit", "last_name": "Rwo"}, {"first_name": "Pankaj", "last_name": "Kumar"}, ] data2 = [ {"first_name": "Vivek", "last_name": "Kumar"}, {"first_name": "Harshita", "last_name": "Singh"} ] df_emp = spark.createDataFrame(data) df_emp1 = spark.createDataFrame(data2) new_df = df_emp.union(df_emp1) new_df.show()
Output DataFrame:
+----------+---------+ |first_name|last_name| +----------+---------+ | John| Doe| | Vishvajit| Rwo| | Pankaj| Kumar| | Vivek| Kumar| | Harshita| Singh| +----------+---------+
dropDuplicates()
The dropDuplicates() method removes duplicate rows based on all columns or selected columns.
new_df = df.dropDuplicates(subset=["first_name"]) new_df.show()
alias()
This method is used to allow us to rename the DataFrame with the new name.
new_df = df.alias("MyDF").select("MyDF.first_name", "MyDF.last_name") new_df.show()
As you can see here, How I have changed the name of the DataFrame loaded from the CSV file to MyDF and selected only the first_name and last_name columns.
collect()
The collect() method retrieves all elements of the DataFrame as a list of Rows.
all_rows = df.collect() for row in all_rows: print(row)
This code will display all the rows of DataFramme in Row.
fillna()
The fillna() method replaces null (or missing) values with a specified value. This is particularly useful when dealing with incomplete data.
from pyspark.sql import SparkSession # creating spark session spark = SparkSession.builder.appName("Programming Funda").getOrCreate() data = [ {"first_name": "John", "last_name": None, "age": None}, {"first_name": "Vishvajit", "last_name": "Rwo", "age": 23}, {"first_name": "Pankaj", "last_name": "Kumar", "age": None}, ] df_emp = spark.createDataFrame(data) new_df = df_emp.fillna({"last_name": "Unknown", "age": 25}) new_df.show()
Output DataFrame:
+---+----------+---------+ |age|first_name|last_name| +---+----------+---------+ | 25| John| Unknown| | 23| Vishvajit| Rwo| | 25| Pankaj| Kumar| +---+----------+---------+
dropna()
The dropna() method drops rows that contain null values. You can specify how and where to drop nulls.
from pyspark.sql import SparkSession # creating spark session spark = SparkSession.builder.appName("Programming Funda").getOrCreate() data = [ {"first_name": "John", "last_name": None, "age": None}, {"first_name": "Vishvajit", "last_name": "Rwo", "age": 23}, {"first_name": "Pankaj", "last_name": "Kumar", "age": None}, ] df_emp = spark.createDataFrame(data) # df_emp1 = spark.createDataFrame(data2) # drop all null values new_df = df_emp.dropna() # drop rows where all values are null new_df = df_emp.dropna(how="all") # Drop rows where less than 2 non-null values exist new_df = df_emp.dropna(thresh=2) new_df.show()
fill() and replace()
The fill() and replace() provide more flexibility to replace values in DataFrame.
fill():- Similar to fillna(), it replaces null values but can also be used for non-null replacement.
replace():- Replaces specific values with new ones, allowing for fine-grained control.
# replacing None to "Unknown" in last name column new_df = df_emp.na.fill("Unknown", subset=["last_name"]) new_df.show() # replacing None to 0 in age column new_df = new_df.replace(to_replace="Rwo", value="Rao", subset=["last_name"]) new_df.show()
Output DataFrame:
+----+----------+---------+ | age|first_name|last_name| +----+----------+---------+ |NULL| John| Unknown| | 23| Vishvajit| Rao| |NULL| Pankaj| Kumar| +----+----------+---------+
sample()
The sample() method is used to take the sample data from the PySpark DataFrame to test the algorithm.
new_df = df.sample(withReplacement=True, fraction=0.5) new_df.show()
rdd
The rdd property is used to change the PySpark DataFrame to PySpark RDD (Resilient Distributed Datasets). This is very useful when you want to perform advanced PySpark DataFrame operations or work with low-level spark’s APIs.
rdd = df.rdd print(rdd.collect())
describe()
The describe() method provides summary statistics for numerical columns in the DataFrame.
summary = df.describe("salary") summary.show()
Output DataFrame:
+-------+------------------+ |summary| salary| +-------+------------------+ | count| 10| | mean| 30207.0| | stddev|11120.657504542316| | min| 12560| | max| 54000| +-------+------------------+
transform()
These methods allow you to apply arbitrary Python functions to your DataFrame or perform chained transformations.
# Adding 10 to each age of the PySpark DataFrame new_df = df.transform(func=lambda df: df.withColumn("new_age", col("age") + 10)) new_df.show()
explode()
The explode() method is used to transform an array or map column into multiple rows, expanding it vertically.
Remember, explode() is not a PySpark DataFrame method, It is a normal built-in function in PySpark which has been defined in the pyspark.sql.functions module.
Let’s see how can we do that.
from pyspark.sql import SparkSession from pyspark.sql.functions import col, explode # creating spark session spark = SparkSession.builder.appName("Programming Funda").getOrCreate() data = [ {"first_name": "John", "last_name": "Doe", "age": 20, "skills": ["Python", "SQL"]}, {"first_name": "Vishvajit", "last_name": "Rwo", "age": 23, "skills": ["Excel", "SQL"]}, {"first_name": "Pankaj", "last_name": "Kumar", "age": 19, "skills": ["Java", "SQL"]}, ] df = spark.createDataFrame(data=data) new_df = df.withColumn("skills", explode(col("skills"))) new_df.show()
Output DataFrame:
+---+----------+---------+------+ |age|first_name|last_name|skills| +---+----------+---------+------+ | 20| John| Doe|Python| | 20| John| Doe| SQL| | 23| Vishvajit| Rwo| Excel| | 23| Vishvajit| Rwo| SQL| | 19| Pankaj| Kumar| Java| | 19| Pankaj| Kumar| SQL| +---+----------+---------+------+
first()
The first() is the DataFrame method that is used to return the first row of the DataFrame.
first_row = df.first() print(first_row)
Output
Row(age=20, first_name='John', last_name='Doe', skills='Python')
head()
The head() method works the same as the first() but it returns multiple rows when you pass the argument in the head() method.
three_rows = df.head(3) print(three_rows)
toPandas()
The toPandas() method is used to convert PySpark DataFrame to Pandas DataFrame. It is very useful when you want to perform operations locally.
pandas_df = df.toPandas() print(pandas_df)
Output
age first_name last_name skills 0 20 John Doe [Python, SQL] 1 23 Vishvajit Rwo [Excel, SQL] 2 19 Pankaj Kumar [Java, SQL]
toJSON()
The toJSON() method is used to convert each row of the PySpark DataFrame to JSON string.
json_string = df.toJSON() print(json_string.collect())
withColumns()
The withColumns() method is used to return a new DataFrame by adding multiple columns or replacing the existing columns that have the same names.
For example, I want to add a new column age_doubled that will store double the age of each person.
new_df = df.withColumns({"age_doubled": col("age") ** 2}) new_df.show()
Output DataFrame
+---+----------+---------+-------------+-----------+ |age|first_name|last_name| skills|age_doubled| +---+----------+---------+-------------+-----------+ | 20| John| Doe|[Python, SQL]| 400.0| | 23| Vishvajit| Rwo| [Excel, SQL]| 529.0| | 19| Pankaj| Kumar| [Java, SQL]| 361.0| +---+----------+---------+-------------+-----------+
take(n)
The take() method takes n as a parameter which n will be any integer value and returns the first n rows from the PySpark DataFrame.
result = df.take(2) print(result)
See Also:
- Merge Two DataFrames in PySpark with the Same Column Names
- How to Apply groupBy in Pyspark DataFrame
- Merge Two DataFrames in PySpark with Different Column Names
- How to Change DataType of Column in PySpark DataFrame
- Drop One or Multiple columns from PySpark DataFrame
- How to Convert PySpark DataFrame to JSON ( 3 Ways )
- How to Write PySpark DataFrame to CSV
- How to Convert PySpark DataFrame Column to List
tail(n)
The tail(n) method takes n as a parameter which n will be any integer value and returns the last n rows from the PySpark DataFrame.
result = df.tail(2) print(result)
limit(n)
The limit(n) method returns a new DataFrame that contains a specified number of rows.n represents the number of rows you want to get from PySpark DataFrame.
result = df.limit(2) result.show()
columns
The columns property is used to get the column names of the DataFrame.
column_name = df.columns print(column_name)
isEmpty()
The isEmpty() method is used to return a boolean value True or False. When DataFrame is empty it will return True otherwise it will return False.
result = df.isEmpty() print(result)
toDF()
The toDF() method is used to return a new DataFrame with new specified column names.
new_columns = ["FirstName", "LastName", "Age", "SKills"] new_df = df.toDF(*new_columns) new_df.show()
Output DataFrame:
+---------+---------+-----+-------------+ |FirstName| LastName| Age| SKills| +---------+---------+-----+-------------+ | 20| John| Doe|[Python, SQL]| | 23|Vishvajit| Rwo| [Excel, SQL]| | 19| Pankaj|Kumar| [Java, SQL]| +---------+---------+-----+-------------+
See Also:
- How to convert PySpark DataFrame to RDD
- How to convert PySpark Row To Dictionary
- How to read CSV files using PySpark
- How to Format a String in PySpark DataFrame using Column Values
- How to Mask Card Number in PySpark DataFrame
- How to Remove Time Part from PySpark DateTime Column
- How to Explode Multiple Columns in PySpark DataFrame
- How to Find the Nth Highest Salary Using PySpark
- How to Drop Duplicate Rows from PySpark DataFrame
Conclusion
All these 30 PySpark DataFrame Methods are very useful when you are working with PySpark DataFrame. With the help of these methods, you can perform different different PySpark DataFrame operations in your PySpark applications.
PySpark DataFrame is the most used data structure in the PySpark application that allows PySpark developers or data engineers to work with structure data the same as data stored in SQL table.
If you found this PySpark DataFrame Methods crash course helpful, Please share it and keep visiting for further PySpark articles.
Happy Learning…