Menu Close

Top 30 PySpark DataFrame Methods with Example

Top 30 PySpark DataFrame Methods with Example

In this article, We will see the Top 30 PySpark DataFrame methods with example. Being a Data Engineer, Data Analyst, or PySpark Developer you must know the PySpark DataFrame methods because with the help of the methods we can perform various operations on top of the PySpark DataFrame.

Remember, All these methods will always apply on top of the PySpark DataFrame.

As we know PySpark is one of the open source distributed compted frameworks that offers to use Spark using Python programming language.

PySpark DataFrame is one of the most widely used data structures that allows us to perform operations over data and it provides high-level operations on structure data.

Now, Let’s take an overview of the most common and useful methods in PySpark DataFrames, along with examples.

To apply DataFrame methods, first, we need to create a PySpark DataFrame.

Create a PySpark DataFrame

I have already prepared a CSV file to create a PySpark DatFrame.

Here, I am using a CSV file to create a PySpark DataFrame and now we will use this DataFrame throughout this course Sometimes we will also create a new PySpark DataFrame form list of dictionaries. I have already written an article about reading CSV files in PySpark DataFrame, you can read it for more clarity.

from pyspark.sql import SparkSession


# creating spark session
spark = SparkSession.builder.appName("Programming Funda").getOrCreate()

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .format("CSV")
    .csv("./sample_data.csv")
)

# displaying the PySpark DataFrame
df.show()

Note:- The spark variable in the above example is the spark session in PySpark which is the entry point of the spark application.

Output DataFrame:

+----------+---------+------+----------+---+----------+----+------+
|first_name|last_name|gender|department|age|      date|  id|salary|
+----------+---------+------+----------+---+----------+----+------+
|     Dulce|    Abril|Female|        IT| 32|2017-10-15|1562| 34500|
|      Mara|Hashimoto|Female|   Testing| 25|2016-08-16|1582| 54000|
|    Philip|     Gent|  Male|        IT| 36|2015-05-21|2587| 18700|
|  Kathleen|   Hanner|Female| Marketing| 25|2017-10-15|1876| 24000|
|      Mara|Hashimoto|Female|   Testing| 25|2016-08-16|1582| 31000|
|  Kathleen|   Hanner|Female|        IT| 25|2017-10-15|1876| 29000|
| Vishvajit|      Rao|  Male|   Testing| 24|2023-04-10| 232| 35000|
|      Ajay|    Kumar|  Male| Marketing| 27|2018-04-10|1234| 34560|
|     Dulce|    Abril|Female| Marketing| 32|2017-10-15|1562| 28750|
|     Harsh|    Kumar|  Male| Marketing| 26|2022-04-10|1232| 12560|
+----------+---------+------+----------+---+----------+----+------+

PySpark DataFrame Methods with Examples

Now Let’s explore all the PySpark DataFrame methods step by step with the help of the example.Here, I have covered almost all the PySpark DataFrame methods that will most important role in your daily PySpark applications.

show()

The show() method is used to display the contents of the DataFrame. By default, it shows the top 20 rows.

df.show()

select()

The select() method allows you to select specific columns from a DataFrame.

new_df = df.select("first_name", "last_name", "age")
new_df.show()

Output DataFrame

+----------+---------+---+
|first_name|last_name|age|
+----------+---------+---+
|     Dulce|    Abril| 32|
|      Mara|Hashimoto| 25|
|    Philip|     Gent| 36|
|  Kathleen|   Hanner| 25|
|      Mara|Hashimoto| 25|
|  Kathleen|   Hanner| 25|
| Vishvajit|      Rao| 24|
|      Ajay|    Kumar| 27|
|     Dulce|    Abril| 32|
|     Harsh|    Kumar| 26|
+----------+---------+---+

filter() or where()

The filter() or where() method is used to filter rows that meet certain conditions.

Using filter():

from pyspark.sql.functions import col
new_df = df.filter(col("age") > 25)
new_df.show()

Using where():

from pyspark.sql.functions import col
new_df = df.where(col("age") > 25)
new_df.show()

This example will display all the records whose age is greater than 25.

groupBy() and agg()

The groupBy() method is used to group data based on one or more columns, and agg() allows you to perform aggregation functions on grouped data.

Here, I am calculating the average salary of each department.

from pyspark.sql.functions import avg
new_df = df.groupBy("department").agg(avg("salary").alias("average_salary"))
new_df.show()

Output DataFrame

+----------+--------------+
|department|average_salary|
+----------+--------------+
| Marketing|       24967.5|
|        IT|       27400.0|
|   Testing|       40000.0|
+----------+--------------+

withColumn()

The withColumn() method is used to add or modify a column in the DataFrame. For example, I want to add 5 to each employee’s age value.

from pyspark.sql.functions import col
new_df = df.withColumn("modified_age", col("age") + 5).select(
    "first_name", "last_name", "modified_age"
)
new_df.show()

Output DataFrame:

+----------+---------+------------+
|first_name|last_name|modified_age|
+----------+---------+------------+
|     Dulce|    Abril|          37|
|      Mara|Hashimoto|          30|
|    Philip|     Gent|          41|
|  Kathleen|   Hanner|          30|
|      Mara|Hashimoto|          30|
|  Kathleen|   Hanner|          30|
| Vishvajit|      Rao|          29|
|      Ajay|    Kumar|          32|
|     Dulce|    Abril|          37|
|     Harsh|    Kumar|          31|
+----------+---------+------------+

orderBy() or sort()

The orderBy() or sort() method is used to sort the PySpark DataFrame by one or more columns. For example, I want to sort the employees by their salary.

new_df = df.orderBy(df.salary.desc()).select("first_name", "salary")
new_df.show()

Output DataFrame:

+----------+------+
|first_name|salary|
+----------+------+
|      Mara| 54000|
| Vishvajit| 35000|
|      Ajay| 34560|
|     Dulce| 34500|
|      Mara| 31000|
|  Kathleen| 29000|
|     Dulce| 28750|
|  Kathleen| 24000|
|    Philip| 18700|
|     Harsh| 12560|
+----------+------+

👉You can explore more about PySpark Sort functions.

drop()

The drop() method is used to remove one or more columns from a DataFrame.

df.drop("age").show()

distinct()

The distinct() method returns a new DataFrame with unique rows, removing duplicates.

new_df = df.distinct()
new_df.show()

count()

The count() method returns the number of rows in the DataFrame.

new_df = df.count()
new_df.show()

join()

The join() method allows you to perform SQL-like joins between DataFrames.I am performing PySpark DataFrame join.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# creating spark session
spark = SparkSession.builder.appName("Programming Funda").getOrCreate()

data = [
    {"first_name": "John", "last_name": "Doe", "department_id": 1},
    {"first_name": "Vishvajit", "last_name": "Rwo", "department_id": 2},
    {"first_name": "Pankaj", "last_name": "Kumar", "department_id": 3},
]

data2 = [
    {"department_id": 1, "name": "Finance"},
    {"department_id": 2, "name": "IT"},
    {"department_id": 3, "name": "Sales"},
]

df_emp = spark.createDataFrame(data)
df_dep = spark.createDataFrame(data2)


new_df = df_emp.join(df_dep, on="department_id").withColumn(
    "department_name", col("name")
)

new_df = new_df.select("first_name", "last_name", "department_name")
new_df.show()

Output DataFrame:

+----------+---------+---------------+
|first_name|last_name|department_name|
+----------+---------+---------------+
|      John|      Doe|        Finance|
| Vishvajit|      Rwo|             IT|
|    Pankaj|    Kumar|          Sales|
+----------+---------+---------------+

union()

The union() DataFrame method is used to concatenate two PySpark DataFrames with the same schema.

from pyspark.sql import SparkSession

# creating spark session
spark = SparkSession.builder.appName("Programming Funda").getOrCreate()


data = [
    {"first_name": "John", "last_name": "Doe"},
    {"first_name": "Vishvajit", "last_name": "Rwo"},
    {"first_name": "Pankaj", "last_name": "Kumar"},
]

data2 = [
    {"first_name": "Vivek", "last_name": "Kumar"},
    {"first_name": "Harshita", "last_name": "Singh"}
]

df_emp = spark.createDataFrame(data)
df_emp1 = spark.createDataFrame(data2)

new_df = df_emp.union(df_emp1)
new_df.show()

Output DataFrame:

+----------+---------+
|first_name|last_name|
+----------+---------+
|      John|      Doe|
| Vishvajit|      Rwo|
|    Pankaj|    Kumar|
|     Vivek|    Kumar|
|  Harshita|    Singh|
+----------+---------+

dropDuplicates()

The dropDuplicates() method removes duplicate rows based on all columns or selected columns.

new_df = df.dropDuplicates(subset=["first_name"])
new_df.show()

alias()

This method is used to allow us to rename the DataFrame with the new name.

new_df = df.alias("MyDF").select("MyDF.first_name", "MyDF.last_name")
new_df.show()

As you can see here, How I have changed the name of the DataFrame loaded from the CSV file to MyDF and selected only the first_name and last_name columns.

collect()

The collect() method retrieves all elements of the DataFrame as a list of Rows.

all_rows = df.collect()
for row in all_rows:
    print(row)

This code will display all the rows of DataFramme in Row.

fillna()

The fillna() method replaces null (or missing) values with a specified value. This is particularly useful when dealing with incomplete data.

from pyspark.sql import SparkSession
# creating spark session
spark = SparkSession.builder.appName("Programming Funda").getOrCreate()

data = [
    {"first_name": "John", "last_name": None, "age": None},
    {"first_name": "Vishvajit", "last_name": "Rwo", "age": 23},
    {"first_name": "Pankaj", "last_name": "Kumar", "age": None},
]

df_emp = spark.createDataFrame(data)

new_df = df_emp.fillna({"last_name": "Unknown", "age": 25})
new_df.show()

Output DataFrame:

+---+----------+---------+
|age|first_name|last_name|
+---+----------+---------+
| 25|      John|  Unknown|
| 23| Vishvajit|      Rwo|
| 25|    Pankaj|    Kumar|
+---+----------+---------+

dropna()

The dropna() method drops rows that contain null values. You can specify how and where to drop nulls.

from pyspark.sql import SparkSession
# creating spark session
spark = SparkSession.builder.appName("Programming Funda").getOrCreate()

data = [
    {"first_name": "John", "last_name": None, "age": None},
    {"first_name": "Vishvajit", "last_name": "Rwo", "age": 23},
    {"first_name": "Pankaj", "last_name": "Kumar", "age": None},
]

df_emp = spark.createDataFrame(data)
# df_emp1 = spark.createDataFrame(data2)
# drop all null values
new_df = df_emp.dropna()

# drop rows where all values are null
new_df = df_emp.dropna(how="all")

# Drop rows where less than 2 non-null values exist
new_df = df_emp.dropna(thresh=2)
new_df.show()

fill() and replace()

The fill() and replace() provide more flexibility to replace values in DataFrame.

fill():- Similar to fillna(), it replaces null values but can also be used for non-null replacement.

replace():- Replaces specific values with new ones, allowing for fine-grained control.

# replacing None to "Unknown" in last name column
new_df = df_emp.na.fill("Unknown", subset=["last_name"])
new_df.show()

# replacing None to 0 in age column
new_df = new_df.replace(to_replace="Rwo", value="Rao", subset=["last_name"])
new_df.show()

Output DataFrame:

+----+----------+---------+
| age|first_name|last_name|
+----+----------+---------+
|NULL|      John|  Unknown|
|  23| Vishvajit|      Rao|
|NULL|    Pankaj|    Kumar|
+----+----------+---------+

sample()

The sample() method is used to take the sample data from the PySpark DataFrame to test the algorithm.

new_df = df.sample(withReplacement=True, fraction=0.5)
new_df.show()

rdd

The rdd property is used to change the PySpark DataFrame to PySpark RDD (Resilient Distributed Datasets). This is very useful when you want to perform advanced PySpark DataFrame operations or work with low-level spark’s APIs.

rdd = df.rdd
print(rdd.collect())

describe()

The describe() method provides summary statistics for numerical columns in the DataFrame.

summary = df.describe("salary")
summary.show()

Output DataFrame:

+-------+------------------+
|summary|            salary|
+-------+------------------+
|  count|                10|
|   mean|           30207.0|
| stddev|11120.657504542316|
|    min|             12560|
|    max|             54000|
+-------+------------------+

transform()

These methods allow you to apply arbitrary Python functions to your DataFrame or perform chained transformations.

# Adding 10 to each age of the PySpark DataFrame
new_df = df.transform(func=lambda df: df.withColumn("new_age", col("age") + 10))
new_df.show()

explode()

The explode() method is used to transform an array or map column into multiple rows, expanding it vertically.

Remember, explode() is not a PySpark DataFrame method, It is a normal built-in function in PySpark which has been defined in the pyspark.sql.functions module.

Let’s see how can we do that.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

# creating spark session
spark = SparkSession.builder.appName("Programming Funda").getOrCreate()

data = [
    {"first_name": "John", "last_name": "Doe", "age": 20, "skills": ["Python", "SQL"]},
    {"first_name": "Vishvajit", "last_name": "Rwo", "age": 23, "skills": ["Excel", "SQL"]},
    {"first_name": "Pankaj", "last_name": "Kumar", "age": 19, "skills": ["Java", "SQL"]},
]

df = spark.createDataFrame(data=data)
new_df = df.withColumn("skills", explode(col("skills")))
new_df.show()

Output DataFrame:

+---+----------+---------+------+
|age|first_name|last_name|skills|
+---+----------+---------+------+
| 20|      John|      Doe|Python|
| 20|      John|      Doe|   SQL|
| 23| Vishvajit|      Rwo| Excel|
| 23| Vishvajit|      Rwo|   SQL|
| 19|    Pankaj|    Kumar|  Java|
| 19|    Pankaj|    Kumar|   SQL|
+---+----------+---------+------+

first()

The first() is the DataFrame method that is used to return the first row of the DataFrame.

first_row = df.first()
print(first_row)

Output

Row(age=20, first_name='John', last_name='Doe', skills='Python')

head()

The head() method works the same as the first() but it returns multiple rows when you pass the argument in the head() method.

three_rows = df.head(3)
print(three_rows)

toPandas()

The toPandas() method is used to convert PySpark DataFrame to Pandas DataFrame. It is very useful when you want to perform operations locally.

pandas_df = df.toPandas()
print(pandas_df)

Output

   age first_name last_name         skills
0   20       John       Doe  [Python, SQL]
1   23  Vishvajit       Rwo   [Excel, SQL]
2   19     Pankaj     Kumar    [Java, SQL]

toJSON()

The toJSON() method is used to convert each row of the PySpark DataFrame to JSON string.

json_string = df.toJSON()
print(json_string.collect())

withColumns()

The withColumns() method is used to return a new DataFrame by adding multiple columns or replacing the existing columns that have the same names.

For example, I want to add a new column age_doubled that will store double the age of each person.

new_df = df.withColumns({"age_doubled": col("age") ** 2})
new_df.show()

Output DataFrame

+---+----------+---------+-------------+-----------+
|age|first_name|last_name|       skills|age_doubled|
+---+----------+---------+-------------+-----------+
| 20|      John|      Doe|[Python, SQL]|      400.0|
| 23| Vishvajit|      Rwo| [Excel, SQL]|      529.0|
| 19|    Pankaj|    Kumar|  [Java, SQL]|      361.0|
+---+----------+---------+-------------+-----------+

take(n)

The take() method takes n as a parameter which n will be any integer value and returns the first n rows from the PySpark DataFrame.

result = df.take(2)
print(result)

See Also:

tail(n)

The tail(n) method takes n as a parameter which n will be any integer value and returns the last n rows from the PySpark DataFrame.

result = df.tail(2)
print(result)

limit(n)

The limit(n) method returns a new DataFrame that contains a specified number of rows.n represents the number of rows you want to get from PySpark DataFrame.

result = df.limit(2)
result.show()

columns

The columns property is used to get the column names of the DataFrame.

column_name = df.columns
print(column_name)

isEmpty()

The isEmpty() method is used to return a boolean value True or False. When DataFrame is empty it will return True otherwise it will return False.

result = df.isEmpty()
print(result)

toDF()

The toDF() method is used to return a new DataFrame with new specified column names.

new_columns = ["FirstName", "LastName", "Age", "SKills"]
new_df = df.toDF(*new_columns)
new_df.show()

Output DataFrame:

+---------+---------+-----+-------------+
|FirstName| LastName|  Age|       SKills|
+---------+---------+-----+-------------+
|       20|     John|  Doe|[Python, SQL]|
|       23|Vishvajit|  Rwo| [Excel, SQL]|
|       19|   Pankaj|Kumar|  [Java, SQL]|
+---------+---------+-----+-------------+

See Also:

Conclusion

All these 30 PySpark DataFrame Methods are very useful when you are working with PySpark DataFrame. With the help of these methods, you can perform different different PySpark DataFrame operations in your PySpark applications.

PySpark DataFrame is the most used data structure in the PySpark application that allows PySpark developers or data engineers to work with structure data the same as data stored in SQL table.

If you found this PySpark DataFrame Methods crash course helpful, Please share it and keep visiting for further PySpark articles.

Happy Learning…

Partitions in PySpark: A Comprehensive Guide with Examples

Related Posts