Apache Spark - rlip/java GitHub Wiki
//local[2] - 2 cors in local machine
//local[*] all available cors
//local - 1 core
SparkConf conf = new SparkConf().setAppName("airports").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> airports = sc.textFile("in/airports.text");
JavaRDD<String> airportsInUSA = airports.filter(line -> Float.valueOf(line.split(Utils.COMMA_DELIMITER)[6]) > 40);
JavaRDD<String> airportsNameAndCityNames = airportsInUSA.map(line -> {
String[] splits = line.split(Utils.COMMA_DELIMITER);
return StringUtils.join(new String[]{splits[1], splits[6]}, ",");
}
);
airportsNameAndCityNames.saveAsTextFile("out/airports_by_latitude.text");
zwracają nowy RDD - jak map czy filer
Operacje zwracające rezultat
- collect nie można użyć na dużych kolekcjach bo wszystko musi się zmieścić w pamięci
- count, cloutByValue
- take - zwraca tylko określoną liczbę elementów z przodu
- saveAsTextFile - może zapisać też w AMAZON S3
- reduce - z 2 elementów zwraca 1
jak używamy danego rdd wiele razy
Logger.getLogger("org").setLevel(Level.ERROR);
SparkConf conf = new SparkConf().setAppName("reduce").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> inputIntegers = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> integerRdd = sc.parallelize(inputIntegers);
integerRdd.persist(StorageLevel.MEMORY_ONLY()); jest więcej możliwości
integerRdd.reduce((x, y) -> x * y);
integerRdd.count();

- Spark Streaming
Logger.getLogger("org").setLevel(Level.ERROR);
SparkSession session = SparkSession.builder().appName("StackOverFlowSurvey").master("local[1]").getOrCreate();
DataFrameReader dataFrameReader = session.read();
Dataset<Row> responses = dataFrameReader.option("header","true").csv("in/2016-stack-overflow-survey-responses.csv");
System.out.println("=== Print out schema ===");
responses.printSchema();
System.out.println("=== Print 20 records of responses table ===");
responses.show(20);
System.out.println("=== Print the so_region and self_identification columns of gender table ===");
responses.select(col("so_region"), col("self_identification")).show();
System.out.println("=== Print records where the response is from Afghanistan ===");
responses.filter(col("country").equalTo("Afghanistan")).show();
System.out.println("=== Print the count of occupations ===");
RelationalGroupedDataset groupedDataset = responses.groupBy(col("occupation"));
groupedDataset.count().show();
System.out.println("=== Cast the salary mid point and age mid point to integer ===");
Dataset<Row> castedResponse = responses.withColumn(SALARY_MIDPOINT, col(SALARY_MIDPOINT).cast("integer"))
.withColumn(AGE_MIDPOINT, col(AGE_MIDPOINT).cast("integer"));
System.out.println("=== Print out casted schema ===");
castedResponse.printSchema();
System.out.println("=== Print records with average mid age less than 20 ===");
castedResponse.filter(col(AGE_MIDPOINT).$less(20)).show();
System.out.println("=== Print the result by salary middle point in descending order ===");
castedResponse.orderBy(col(SALARY_MIDPOINT ).desc()).show();
System.out.println("=== Group by country and aggregate by average salary middle point and max age middle point ===");
RelationalGroupedDataset datasetGroupByCountry = castedResponse.groupBy("country");
datasetGroupByCountry.agg(avg(SALARY_MIDPOINT), max(AGE_MIDPOINT)).show();
Dataset<Row> responseWithSalaryBucket = castedResponse.withColumn(
SALARY_MIDPOINT_BUCKET, col(SALARY_MIDPOINT).divide(20000).cast("integer").multiply(20000));
System.out.println("=== With salary bucket column ===");
responseWithSalaryBucket.select(col(SALARY_MIDPOINT), col(SALARY_MIDPOINT_BUCKET)).show();
System.out.println("=== Group by salary bucket ===");
responseWithSalaryBucket.groupBy(SALARY_MIDPOINT_BUCKET).count().orderBy(col(SALARY_MIDPOINT_BUCKET)).show();
session.stop();- Spark MLlib
- GraphX
Zmienne służace zliczaniu czegoś podczas procesu
Logger.getLogger("org").setLevel(Level.ERROR);
SparkConf conf = new SparkConf().setAppName("StackOverFlowSurvey").setMaster("local[1]");
SparkContext sparkContext = new SparkContext(conf);
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
final LongAccumulator total = new LongAccumulator();
final LongAccumulator missingSalaryMidPoint = new LongAccumulator();
total.register(sparkContext, Option.apply("total"), false);
missingSalaryMidPoint.register(sparkContext, Option.apply("missing salary middle point"), false);
JavaRDD<String> responseRDD = javaSparkContext.textFile("in/2016-stack-overflow-survey-responses.csv");
JavaRDD<String> responseFromCanada = responseRDD.filter(response -> {
String[] splits = response.split(Utils.COMMA_DELIMITER, -1);
total.add(1);
if (splits[14].isEmpty()) {
missingSalaryMidPoint.add(1);
}
return splits[2].equals("Canada");
});
System.out.println("Count of responses from Canada: " + responseFromCanada.count());
System.out.println("Total count of responses: " + total.value());
System.out.println("Count of responses missing salary middle point: " + missingSalaryMidPoint.value());zmienne, które bęą przekazane do każdego noda, np. kopia dużego zestawu danych. Jest to efektywniejsze niż czytanie wszystkiego od zera.