Menu Close

PySpark Broadcast Variables Tutorial

PySpark Broadcast Variables

Hi PySpark developers, In today’s article we are about to see all about the PySpark Broadcast Variables with the help of the examples. Here we will cover, What is PySpark Broadcast Variables, how PySpark Broadcast Variables work, and why we should use them after that we will see examples of PySpark Broadcast variables.

Let’s get started!

What is a PySpark Broadcast Variables?

A broadcast variable in PySpark is a mechanism that allows you to share large read-only variables efficiently across all nodes in a cluster. When working with large datasets and distributed computing, it is common to have variables needed by all tasks on all nodes (e.g., lookup tables, model parameters).

Instead of sending this data with every task on the node, broadcast variables ensure that data is copied on each node only once. This broadcast variable concept enhances the performance.

How does PySpark Broadcast Work?

Here, I have mentioned some important points about the PySpark broadcast about working.

  • Broadcasting the Variable:- When you create a broadcast variable in PySpark, it is sent to all worker nodes only once. These workers will keep a local copy of the broadcast variable, and when tasks are executed, they can access this variable directly from their local cache, avoiding the need for multiple copies to be transferred over the network.
  • Efficient Distribution: Instead of shuffling or copying the data every time a task is run on a worker node, Spark efficiently distributes the broadcast variable, which minimizes memory consumption and reduces network I/O.
  • Immutable: Broadcast variables are read-only. Once a broadcast variable is created, it cannot be changed. This is because they are primarily used to share lookup data, configuration, or constants across all the workers.
  • Usage: You can create a broadcast variable using the SparkContext.broadcast() method. To access the value of a broadcast variable in your transformation or action operations, you need to use the .value attribute.

Let’s see why should we use it.

Why should we use Broadcast Variable in PySpark?

There are several reasons to use broadcast variables in PySpark:

  • Improved Performance: Broadcasting allows you to distribute the data once, reducing the cost of sending the same data to each worker. Without broadcasting, every worker would request the data from the driver, causing unnecessary traffic across the network.
  • Efficient Memory Usage: A broadcast variable is stored in each worker’s local memory. Without broadcasting, each task would keep a copy of the data, potentially leading to excessive memory usage.
  • Better for Small Lookup Data: When you have small datasets (like lookup tables, configuration values, or dictionaries) that need to be accessed by multiple tasks across different partitions, broadcasting can be very effective.

PySpark Broadcast Variable Example

Let’s say you have a large dataset of employee data of a company and you need to enrich it with information stored in smaller lookups such as department_lookup and instead of joining with each partition you can broadcast it.

Let’s see how.

Define the Data and the Broadcast Variable: You have a department name lookup with IDs and department names.

from pyspark.sql import SparkSession

# creating spark session
spark = (
    SparkSession.builder.master("local[*]")
    .appName("www.programmingfunda.com")
    .getOrCreate()
)


# department lookup
department_lookup = {
    1: "IT",
    2: "Fintech",
    3: "Marketing",
    4: "HR",

}

# Creating the broadcast variable by sending the department_lookup to each node
broadcast_department_lookup = spark.sparkContext.broadcast(department_lookup)

Note:- Created a broadcast variable using the broadcast() method.

Large Dataset (Employee): Now, let’s assume you have a large dataset of company employees that contains department IDs.

# Employee datasets
data = [
    ("1", "Tarun", "Kumar", 2),
    ("2", "Harsh", "Goal", 1),
    ("3", "Nansi", "Kumari", 3),
    ("4", "James", "Bond", 2),
    ("5", "Ritika", "Kumari", 1),
    ("6", "Diyanshu", "Saini", 4),
]

# column names
column_names = ["id", "first_name", "last_name", "dep_id"]

# creating pyspark dataframe
emp_df = spark.createDataFrame(data, schema=column_names)

# displaying the dataframe
emp_df.show()

PySpark DataFrame will look like this.

+---+----------+---------+------+
| id|first_name|last_name|dep_id|
+---+----------+---------+------+
|  1|     Tarun|    Kumar|     2|
|  2|     Harsh|     Goal|     1|
|  3|     Nansi|   Kumari|     3|
|  4|     James|     Bond|     2|
|  5|    Ritika|   Kumari|     1|
|  6|  Diyanshu|    Saini|     4|
+---+----------+---------+------+

Use the Broadcast Variable: You can now use the broadcasted variable to map department IDs to their names in the emp_df without having to join with the lookup table.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# function to lookup department name from broadcasted dictionary
def get_department_name(dep_id):
    return broadcast_department_lookup.value.get(dep_id, 'Unknown')


# Register udf to use with DataFrame
get_department_name_udf = udf(get_department_name, StringType())

# Adding new column which is department name in new dataframe
new_df = df.withColumn('department_name', get_department_name_udf(df.dep_id))

new_df.show()

Now new DataFrame would be like this:

+---+----------+---------+------+---------------+
| id|first_name|last_name|dep_id|department_name|
+---+----------+---------+------+---------------+
|  1|     Tarun|    Kumar|     2|        Fintech|
|  2|     Harsh|     Goal|     1|             IT|
|  3|     Nansi|   Kumari|     3|      Marketing|
|  4|     James|     Bond|     2|        Fintech|
|  5|    Ritika|   Kumari|     1|             IT|
|  6|  Diyanshu|    Saini|     4|             HR|
+---+----------+---------+------+---------------+

This is how you can use broadcast variables for lookup.

Example of When Broadcasting Works Well

Let’s say you have:

  • A large dataset of sales transactions (millions of rows).
  • A small lookup table with product information (a few thousand rows).

In this case, broadcasting the product lookup table (the small dataset) across the cluster works great because:

  • The lookup table is small enough to fit in memory on each worker node.
  • Each worker can access the small lookup table locally without needing to shuffle it across the cluster.
  • The large sales dataset can be split into partitions, and each partition can join efficiently with the broadcasted lookup table.

Example of When Broadcasting is Not Ideal

If you have:

  • A large dataset of customer information (millions of rows).
  • A large lookup table (say, 10+ million rows).

In this case, broadcasting the lookup table across the cluster would likely cause:

  • Network overload: Broadcasting a large lookup table to all nodes can overwhelm your network, causing significant delays in transferring data.
  • Memory issues: Each worker node will need to store the large lookup table in memory, which can lead to memory exhaustion or increased GC (garbage collection) overhead.

Read Also:

Conclusion

Broadcast variables in PySpark are a powerful tool for efficiently sharing read-only data across workers in a cluster. They reduce memory consumption, and network overhead, and improve performance, particularly when working with small lookup data or configurations.

If you found this article helpful, Please share and keep visiting for further PySpark tutorials.

This is written and verified by Vishvajit Rao.

Thanks for your time…

Top 30 PySpark DataFrame Methods with Example

Related Posts