Broadcast - datacouch-io/spark-java GitHub Wiki
Broadcast variables in Apache Spark are read-only variables that can be shared efficiently across all worker nodes in a cluster. They are used to cache a value or data on each machine rather than shipping it over the network multiple times. This is particularly useful when you have a large dataset or some reference data that needs to be shared among all tasks in a Spark application. Broadcast variables are considered read-only and are therefore safe to use in parallel processing.
Here's an example in Java that demonstrates how to use broadcast variables in Apache Spark:
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.SparkConf;
public class BroadcastExample {
public static void main(String[] args) {
// Create a SparkConf and JavaSparkContext
SparkConf conf = new SparkConf().setAppName("BroadcastExample").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// Create a broadcast variable
final Broadcast<Integer> broadcastVar = sc.broadcast(100);
// Create an RDD with some data
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
// Use the broadcast variable within a transformation
JavaRDD<Integer> resultRDD = dataRDD.map(num -> num * broadcastVar.value());
// Collect and print the results
resultRDD.collect().forEach(System.out::println);
// Stop the SparkContext
sc.stop();
}
}
In this example:
- We create a SparkConf and a JavaSparkContext.
- We create a broadcast variable
broadcastVar
with a value of100
. This value will be shared across all worker nodes. - We create an RDD named
dataRDD
with a list of integers. - Inside the
map
transformation, we use thebroadcastVar
to multiply each element indataRDD
by the value stored in the broadcast variable. This is done efficiently without transmitting the broadcast value over the network. - Finally, we collect and print the results, which will show each element of
dataRDD
multiplied by 100.
Broadcast variables are especially useful when you have large reference data, such as lookup tables or dictionaries, that need to be used by multiple tasks across worker nodes without incurring the overhead of transferring the data multiple times.
In this example, we have maker space data from across the UK, including information like the name of the maker space, email address, postcode, number of visitors, etc. Our goal is to determine how these maker spaces are distributed across different regions in the UK. However, we only have the postcode for each maker space, and we don't know the region of each maker's space.
To answer this question, we need to introduce another dataset that contains UK postcode data for each postcode prefix, which will allow us to determine the region to which each postcode belongs. By combining these two datasets, we can answer the question of how maker spaces are distributed across different regions in the UK.
Below is a Java class named UkMakerSpaces
located in the package "com.sparkTutorial.advanced.broadcast." This class demonstrates how to use Spark and broadcast variables to achieve our goal:
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Scanner;
import org.apache.spark.broadcast.Broadcast;
public class UkMakerSpaces {
public static void main(String[] args) throws Exception {
// Set log level to ERROR
Logger.getLogger("org").setLevel(Level.ERROR);
// Initialize Spark configuration
SparkConf conf = new SparkConf().setAppName("UkMakerSpaces").setMaster("local[1]");
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
// Create a broadcast variable to store postcode data
final Broadcast<Map<String, String>> postCodeMap = javaSparkContext.broadcast(loadPostCodeMap());
// Load the maker space data
JavaRDD<String> makerSpaceRdd = javaSparkContext.textFile("in/uk-makerspaces-identifiable-data.csv");
// Filter out the header and map postcode to regions
JavaRDD<String> regions = makerSpaceRdd.filter(line -> !line.split(Utils.COMMA_DELIMITER, -1)[0].equals("Timestamp"))
.map(line -> {
Optional<String> postPrefix = getPostPrefix(line);
if (postPrefix.isPresent() && postCodeMap.value().containsKey(postPrefix.get())) {
return postCodeMap.value().get(postPrefix.get());
}
return "Unknown";
});
// Count and print the distribution of regions
for (Map.Entry<String, Long> regionCounts : regions.countByValue().entrySet()) {
System.out.println(regionCounts.getKey() + " : " + regionCounts.getValue());
}
}
private static Optional<String> getPostPrefix(String line) {
String[] splits = line.split(Utils.COMMA_DELIMITER, -1);
String postcode = splits[4];
if (postcode.isEmpty()) {
return Optional.empty();
}
return Optional.of(postcode.split(" ")[0]);
}
private static Map<String, String> loadPostCodeMap() throws FileNotFoundException {
Scanner postCode = new Scanner(new File("in/uk-postcode.csv"));
Map<String, String> postCodeMap = new HashMap<>();
while (postCode.hasNextLine()) {
String line = postCode.nextLine();
String[] splits = line.split(Utils.COMMA_DELIMITER, -1);
postCodeMap.put(splits[0], splits[7]);
}
return postCodeMap;
}
}
In this class, we load the maker space data, create a broadcast variable for postcode data, filter out the header, map postcodes to regions, and then count and print the distribution of regions.
Please ensure that you have the necessary data files ("uk-makerspaces-identifiable-data.csv" and "uk-postcode.csv") in the "in" directory as specified in the code.