Hi PySpark developers, In today’s article we are about to see all about the PySpark Broadcast Variables with the help of the examples. Here we will cover, What is PySpark Broadcast Variables, how PySpark Broadcast Variables work, and why we should use them after that we will see examples of PySpark Broadcast variables.
Let’s get started!
Headings of Contents
What is a PySpark Broadcast Variables?
A broadcast variable in PySpark is a mechanism that allows you to share large read-only variables efficiently across all nodes in a cluster. When working with large datasets and distributed computing, it is common to have variables needed by all tasks on all nodes (e.g., lookup tables, model parameters).
Instead of sending this data with every task on the node, broadcast variables ensure that data is copied on each node only once. This broadcast variable concept enhances the performance.
How does PySpark Broadcast Work?
Here, I have mentioned some important points about the PySpark broadcast about working.
- Broadcasting the Variable:- When you create a broadcast variable in PySpark, it is sent to all worker nodes only once. These workers will keep a local copy of the broadcast variable, and when tasks are executed, they can access this variable directly from their local cache, avoiding the need for multiple copies to be transferred over the network.
- Efficient Distribution: Instead of shuffling or copying the data every time a task is run on a worker node, Spark efficiently distributes the broadcast variable, which minimizes memory consumption and reduces network I/O.
- Immutable: Broadcast variables are read-only. Once a broadcast variable is created, it cannot be changed. This is because they are primarily used to share lookup data, configuration, or constants across all the workers.
- Usage: You can create a broadcast variable using the
SparkContext.broadcast()
method. To access the value of a broadcast variable in your transformation or action operations, you need to use the.value
attribute.
Let’s see why should we use it.
Why should we use Broadcast Variable in PySpark?
There are several reasons to use broadcast variables in PySpark:
- Improved Performance: Broadcasting allows you to distribute the data once, reducing the cost of sending the same data to each worker. Without broadcasting, every worker would request the data from the driver, causing unnecessary traffic across the network.
- Efficient Memory Usage: A broadcast variable is stored in each worker’s local memory. Without broadcasting, each task would keep a copy of the data, potentially leading to excessive memory usage.
- Better for Small Lookup Data: When you have small datasets (like lookup tables, configuration values, or dictionaries) that need to be accessed by multiple tasks across different partitions, broadcasting can be very effective.
PySpark Broadcast Variable Example
Let’s say you have a large dataset of employee data of a company and you need to enrich it with information stored in smaller lookups such as department_lookup and instead of joining with each partition you can broadcast it.
Let’s see how.
Define the Data and the Broadcast Variable: You have a department name lookup with IDs and department names.
from pyspark.sql import SparkSession # creating spark session spark = ( SparkSession.builder.master("local[*]") .appName("www.programmingfunda.com") .getOrCreate() ) # department lookup department_lookup = { 1: "IT", 2: "Fintech", 3: "Marketing", 4: "HR", } # Creating the broadcast variable by sending the department_lookup to each node broadcast_department_lookup = spark.sparkContext.broadcast(department_lookup)
Note:- Created a broadcast variable using the broadcast() method.
Large Dataset (Employee): Now, let’s assume you have a large dataset of company employees that contains department IDs.
# Employee datasets data = [ ("1", "Tarun", "Kumar", 2), ("2", "Harsh", "Goal", 1), ("3", "Nansi", "Kumari", 3), ("4", "James", "Bond", 2), ("5", "Ritika", "Kumari", 1), ("6", "Diyanshu", "Saini", 4), ] # column names column_names = ["id", "first_name", "last_name", "dep_id"] # creating pyspark dataframe emp_df = spark.createDataFrame(data, schema=column_names) # displaying the dataframe emp_df.show()
PySpark DataFrame will look like this.
+---+----------+---------+------+ | id|first_name|last_name|dep_id| +---+----------+---------+------+ | 1| Tarun| Kumar| 2| | 2| Harsh| Goal| 1| | 3| Nansi| Kumari| 3| | 4| James| Bond| 2| | 5| Ritika| Kumari| 1| | 6| Diyanshu| Saini| 4| +---+----------+---------+------+
Use the Broadcast Variable: You can now use the broadcasted variable to map department IDs to their names in the emp_df without having to join with the lookup table.
from pyspark.sql.functions import udf from pyspark.sql.types import StringType # function to lookup department name from broadcasted dictionary def get_department_name(dep_id): return broadcast_department_lookup.value.get(dep_id, 'Unknown') # Register udf to use with DataFrame get_department_name_udf = udf(get_department_name, StringType()) # Adding new column which is department name in new dataframe new_df = df.withColumn('department_name', get_department_name_udf(df.dep_id)) new_df.show()
Now new DataFrame would be like this:
+---+----------+---------+------+---------------+ | id|first_name|last_name|dep_id|department_name| +---+----------+---------+------+---------------+ | 1| Tarun| Kumar| 2| Fintech| | 2| Harsh| Goal| 1| IT| | 3| Nansi| Kumari| 3| Marketing| | 4| James| Bond| 2| Fintech| | 5| Ritika| Kumari| 1| IT| | 6| Diyanshu| Saini| 4| HR| +---+----------+---------+------+---------------+
This is how you can use broadcast variables for lookup.
Example of When Broadcasting Works Well
Let’s say you have:
- A large dataset of sales transactions (millions of rows).
- A small lookup table with product information (a few thousand rows).
In this case, broadcasting the product lookup table (the small dataset) across the cluster works great because:
- The lookup table is small enough to fit in memory on each worker node.
- Each worker can access the small lookup table locally without needing to shuffle it across the cluster.
- The large sales dataset can be split into partitions, and each partition can join efficiently with the broadcasted lookup table.
Example of When Broadcasting is Not Ideal
If you have:
- A large dataset of customer information (millions of rows).
- A large lookup table (say, 10+ million rows).
In this case, broadcasting the lookup table across the cluster would likely cause:
- Network overload: Broadcasting a large lookup table to all nodes can overwhelm your network, causing significant delays in transferring data.
- Memory issues: Each worker node will need to store the large lookup table in memory, which can lead to memory exhaustion or increased GC (garbage collection) overhead.
Read Also:
- PySpark DataFrame Tutorial for Beginners
- How to read CSV files using PySpark
- PySpark Column Class with Examples
- How to Fill Null Values in PySpark DataFrame
- PySpark RDD ( Resilient Distributed Datasets ) Tutorial
- How to convert PySpark DataFrame to RDD
- PySpark RDD Actions with Examples
- Partitioning in PySpark
Conclusion
Broadcast variables in PySpark are a powerful tool for efficiently sharing read-only data across workers in a cluster. They reduce memory consumption, and network overhead, and improve performance, particularly when working with small lookup data or configurations.
If you found this article helpful, Please share and keep visiting for further PySpark tutorials.
This is written and verified by Vishvajit Rao.
Thanks for your time…