Menu Close

Partitions in PySpark: A Comprehensive Guide with Examples

Partitions in PySpark

Hi PySpark enthusiasts, In this article, I will explain partitions in PySpark with the help of the proper examples. If you are new to PySpark then You must know the PySpark partition because it is one of the core concepts of the distributed computing framework.

If you didn’t know the PySpark partition, Don’t worry at the end of this article you will have complete knowledge of the PySpark partitions.

Before going into practical into this article, let’s understand a little about the partitions in PySpark.

What are Partitions in PySpark?

In PySpark, partitions are the process of distributing large datasets into smaller logical chunks or partitions. Partitions allow the spark to distribute the large dataset into smaller chunks or multiple partitions for parallel processing. Parallel processing is the main aspect of any distributed computing framework like Spark.

Here, I have mentioned some key concepts of PySpark partitions.

Logical Subdivision

  • A partition is the subset of an RDD (Resilient Distributed Dataset) or DataFrame.
  • Each partition can be processed independently by tasks.

Parallel Processing

  • Spark distributes the partition of RDD or DataFrame across worker nodes in a cluster.
  • Each worker node processes one or more partitions.

Tasks Execution:

  • Spark assigns tasks to each partition for example if an RDD has 10 partitions, the spark will create 10 tasks.
  • These tasks are distributed to and executed by the worker nodes.

Shuffling

  • Some spark operations such as joins, aggregations, etc require shuffling of data means data will move from one partition to another one.
  • This is the expansive operations in Spark.

Now, it’s time to create a partition in PySpark with the help of the examples.

Examples of working with Partitions in PySpark RDD and DataFrame:

First, we will see partitions in PySpark RDD.

Creating RDD with default Partitions

By default, spark DataFrame the number of partitions by the configuration settings such as the number of cores available in the machine.
Let’s create PySpark RDD with default partitions.

from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local[*]", "ProgrammingFunda Partition Example")

# Creating an RDD from a list
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]
rdd = sc.parallelize(data)

# displaying the rdd
print(rdd.collect())

print("Number of partitions:", rdd.getNumPartitions())
# Output: Number of partitions: (number of cores)

sc.stop()

As you can see in the above example, I have created an RDD with default partitions means I didn’t pass any integer value inside the sc.parallelize() method I have just passed the data.
At line 13 I am checking the number of partitions by using the getNumPartitions() method, In my case number of partitions will be 8. It might be different in your case.

This is how you can create PySpark RDD with default partitions.

Creating PySpark RDD with specified Partitions.

You can specify the number of partitions in the sc.parallelize() method during the creation of RDD.

from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local[*]", "ProgrammingFunda Partition Example")

# Creating an RDD from a list
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]
rdd = sc.parallelize(data, 4)

# displaying the rdd
print(rdd.collect())

print("Number of partitions:", rdd.getNumPartitions())
# Output: Number of partitions: (number of cores)

sc.stop()

Now in this case the number of partitions will be 4.

Inspecting partitions:

You can also see content for each partition with the help of the glom() method.

from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local[*]", "ProgrammingFunda Partition Example")

# Creating an RDD from a list
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]
rdd = sc.parallelize(data, 4)

# Collecting data from each partition
partitioned_data = rdd.glom().collect()

print("Partitioned data:", partitioned_data)
# Output: Partitioned data: [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]

sc.stop()

The output will be:

Partitioned data: [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16, 17, 18]]

As you can see there are four nested lists for four partitions.

Repartition of PySpark RDD:

Sometimes we want to repartition the PySpark RDD then we have two methods available for that repartition() and coalesce().
Let’s use both of them one by one.

repartition():

repartition() method is used to increase or decrease the number of partitions in an RDD or DataFrame.This method results full shuffle of data across multiple partitions in a cluster. This is a most expansive method.

from pyspark import SparkContext

sc = SparkContext("local[*]", "Repartition Example")

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Creating an RDD with 2 partitions
rdd = sc.parallelize(data, 2)

print("Number of partitions before repartitioning:", rdd.getNumPartitions())
repartitioned_rdd = rdd.repartition(4)
print("Number of partitions after repartitioning:", repartitioned_rdd.getNumPartitions())

sc.stop()

The output would be:
Number of partitions before repartitioning: 2
Number of partitions after repartitioning: 4

coalesce():

The coalesce() method is used to reduce the number of partitions from RDD and DataFrame.This method always tries to avoid the full shuffle of the data across partitions in a cluster. It is more efficient than repartition when reducing the number of partitions.

from pyspark import SparkContext


sc = SparkContext("local[*]", "Repartition Example")

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Creating an RDD with 2 partitions
rdd = sc.parallelize(data, 2)

print("Number of partitions before coalescing:", rdd.getNumPartitions())
coalesced_rdd = rdd.coalesce(1)
print("Number of partitions after coalescing:", coalesced_rdd.getNumPartitions())

sc.stop()

The output will be:
Number of partitions before coalescing: 2
Number of partitions after coalescing: 1

This is all about partitions in PySpark RDD, now let’s move on to partitions in PySpark DataFrame.

Partitions in PySpark DataFrame:

PySpark DataFrame partitions are almost the same as PySpark RDD partitions the only difference is that in DataFrame partitions we have to apply partitions on DataFrame instead of the PySpark RDD.

Creating PySpark DataFrame with Default partition:

I have prepared a CSV file with some records and now I am about to load the CSV file into PySpark DataFrame. I am not about to assign any custom partitions.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Programming Funda").getOrCreate()


# reading data frame csv file

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .format("CSV")
    .load("sample_data.csv")
)

# displaying the dataframe
df.show()

Output:

You can get the number of partitions by using df.rdd.getNumPartitions().

Creating PySpark DataFrame with Specified Partitions:

We can also assign the number of partitions by using the repartition() method during the loading of the file.

Let’s see:

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .format("CSV")
    .load("sample_data.csv")
    .repartition(2)
)

As you can see in the above, How I am passing the 2 partitions during the loading of the file.

Repartition of the PySpark DataFrame:

Same as PySpark RDD we can repartition the PySpark DataFrame with the help of the repartition() and coalesce() method. repartition method is used to increase and decrease the number of partitions of PySpark DataFrame.

Let’s use the repartition() method.

# number of partitions
print("Number of partitions before repartition:", df.rdd.getNumPartitions())

df = df.repartition(2)

# number of partitions
print("Number of partitions after repartition:", df.rdd.getNumPartitions())

coalesce():- The coalesce() method is used to reduce the number of partitions of the PySpark DataFrame.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .format("CSV")
    .load("sample_data.csv")
)

df = df.repartition(4)

# number of partitions
print("Number of partitions before coalesce:", df.rdd.getNumPartitions())


df = df.coalesce(1)
# number of partitions
print("Number of partitions after coalesce:", df.rdd.getNumPartitions())

This is how you can use the repartition() and coalesce() method is used to repartition the PySpark DataFrame.

See Also:

Conclusion

So throughout this article, we have seen all about the partitions in PySpark RDD and DataFrame with the help of the examples. In Spark, the partition is one of the most important concepts because it allows Spark to distribute the large dataset across multiple nodes or worker nodes of the cluster.

If you found this article helpful, please share and keep visiting for further PySpark Tutorials.

How to Create Temp View in PySpark
Top 30 PySpark DataFrame Methods with Example

Related Posts