Spark and Spark shell - cchantra/bigdata.github.io GitHub Wiki

Basis about SPARK

Again, unlike the various specialized systems, Spark’s goal was to generalize MapReduce to support new apps within same engine

RDD

Two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem,

HDFS, HBase, or any data source offering a Hadoop InputFormat.

Lazy transformation

Persist RDD/Cache

Each node stores in memory any slices of it

that it computes and reuses them in other

actions on that dataset

The cache is fault-tolerant: if any partition

of an RDD is lost, it will automatically be

recomputed using the transformations that

originally created it

val f = sc.textFile("README.md")

val w = f.flatMap(l => l.split(" ")).map(word => (word, 1)).cache()

w.reduceByKey(_ + _).collect.foreach(println)

Set of Transformation: Map ...

val distFile = sc.textFile("README.md")

distFile.map(l => l.split(" ")).collect()

distFile.flatMap(l => l.split(" ")).collect()

Set of Actions: Reduce ...

val f = sc.textFile("README.md")

val words = f.flatMap(l => l.split(" ")).map(word => (word, 1))

words.reduceByKey(_ + _).collect.foreach(println)

Shared variable: Broadcast /accumulator

Broadcast variables let programmer keep a read-only variable cached on each machine rather than shipping a copy of it with tasks

For example, to give every node a copy of a large input dataset efficiently, Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

Accumulators are variables that can only be “added” to through an associative operation Used to implement counters and sums,efficiently in parallel.

Only the driver program can read an accumulator’s value, not the tasks

demo Spark SQL

http://spark.apache.org/docs/latest/sql-programming-guide.html

page 105 (itas_workshop.pdf)

  • demo Spark Streaming

http://spark.apache.org/docs/latest/streaming-programming-guide.html

  • discuss features/benefits for Shark

  • discuss features/benefits for MLlib

http://spark.apache.org/docs/latest/mllib-guide.html

You can download from:

http://spark.apache.org/downloads.html

https://spark.apache.org/downloads.html

or

wget  https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz   
tar xvf  spark-3.1.1-bin-hadoop3.2.tgz

mv spark-3.1.1-bin-hadoop3.2 spark

see: https://spark.apache.org/downloads.html

Then, starting spark shell, call the following for a standalone spark cluster.

cd spark

./bin/spark-shell

Or starting shell with 4 cores: (your spark-shell is in bin folder)

 ./bin/spark-shell --master local[4]

Then, it is prompted as:

scala> 

Try typing,

 val data = 1 to 10000

To create RDD,

scala>  val distData = sc.parallelize(data)

use filter

scala> distData.filter(_ < 10).collect()

Look at file scala.pdf attached. (esp. page 18-28)

Try more example at

https://mapr.com/docs/61/Spark/Spark_GettingStarted.html

Example spark-shell

Now try data on hdfs

First, get auctiondata.csv

This dataset is from eBay online auctions. The dataset contains the following fields:

  • auctionid - Unique identifier of an auction.
  • bid - Proxy bid placed by a bidder.
  • bidtime - Time (in days) that the bid was placed from the start of the auction.
  • bidder - eBay username of the bidder.
  • bidderrate - eBay feedback rating of the bidder.
  • openbid - Opening bid set by the seller.
  • price - Closing price that the item sold for (equivalent to the second highest bid + an increment).
  • item - Type of item.

The table below shows the fields with some sample data:

spark

Then login to hadoop


wget  https://raw.githubusercontent.com/mapr-demos/getting-started-spark-on-mapr/master/data/auctiondata.csv

hadoop fs -mkdir  /spark_data

hadoop fs -put auctiondata.csv /spark_data

./bin/spark-shell --master local[4]
scala> val auctionData = spark.read.textFile("hdfs://localhost:9000/spark_data/auctiondata.csv")

scala> auctionData.first() 

res0: String = 8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3  scala> auctionData.count() res1: Long = 10654

scala> auctionData.show() 


+--------------------+
|               value|
+--------------------+
|8213034705,95,2.9...|
|8213034705,115,2....|
|8213034705,100,2....|
|8213034705,117.5,...|
|8213060420,2,0.06...|
|8213060420,15.25,...|
|8213060420,3,0.18...|
|8213060420,10,0.1...|
|8213060420,24.99,...|
|8213060420,20,0.2...|
|8213060420,22,0.2...|
|8213060420,24,0.2...|
|8213060420,26,0.2...|
|8213060420,80,0.5...|
|8213060420,75,0.6...|
|8213060420,85,0.6...|
|8213060420,83,0.8...|
|8213060420,100,1....|
|8213060420,100,1....|
|8213060420,110,1....|
+--------------------+
only showing top 20 rows



scala>  auctionData.take(3)

 res6: Array[String] = Array(8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3, 8213034705,115,2.943484,davidbresler2,1,95,117.5,xbox,3, 8213034705,100,2.951285,gladimacowgirl,58,95,117.5,xbox,3)

scala> auctionData.filter(line => line.contains("xbox")).count()

 res3: Long = 2784  

scala> auctionData.filter(line => line.contains("palm")).count() 

res4: Long = 5924

scala> val auctionDataFrame = spark.read.format("csv").option("inferSchema",true).load("hdfs://localhost:9000/spark_data/auctiondata.csv").toDF("auctionid","bid","bidtime","bidder","bidderrate","openbid","price","item","daystolive")

| auctionid|  bid| bidtime|       bidder|bidderrate|openbid|price|   item|daystolive|

+----------+-----+--------+-------------+----------+-------+-----+-------+----------+

|1643903372| 9.99|1.681493|        2gd4u|        56|   9.99| 26.0|cartier|         3|

|1643903372| 11.0|1.973449|phyllis120577|         0|   9.99| 26.0|cartier|         3|

|1643903372|20.99|1.974838|        2gd4u|        56|   9.99| 26.0|cartier|         3|

|1643903372| 20.0|1.983484|phyllis120577|         0|   9.99| 26.0|cartier|

scala> auctionDataFrame.filter($"price" < 30).show()

##Writing data

scala> auctionDataFrame.filter($"item" === "xbox").write.json("hdfs://localhost:9000/spark_data/results/json/xbox")

scala> auctionDataFrame.filter($"item" === "xbox").write.parquet("hdfs://localhost:9000/spark_data/results/parquet/xbox")

use ^c to quit from spark shell

At unix shell prompt, try to see your filename

hdfs dfs -ls /spark_data/results/json/xbox

My case, the output is

WARNING: HADOOP_PREFIX has been replaced by HADOOP_HOME. Using value of HADOOP_PREFIX.

2021-03-22 15:35:40,801 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Found 2 items

-rw-r--r--   3 hadoop supergroup          0 2021-03-22 15:33 /spark_data/results/json/xbox/_SUCCESS

-rw-r--r--   3 hadoop supergroup     419699 2021-03-22 15:33 /spark_data/results/json/xbox/part-00000-8e8636d7-8cf0-4900-bad2-bca6bca0dee3-c000.json

Then, try. Check your file name and location properly.

hdfs dfs -cat  /spark_data/results/json/xbox/part-00000-8e8636d7-8cf0-4900-bad2-bca6bca0dee3-c000.json

Now your task: Try bank.csv

wget   https://raw.githubusercontent.com/cchantra/bigdata.github.io/refs/heads/master/spark/bank.csv

Attribute Information:

About client features:

  • age

  • job (type of job)

  • marital (marital status)

  • education

  • default (has credit in default? 'no','yes','unknown')

  • balance

  • housing (has housing loan?'no','yes','unknown')

  • loan (has personal loan?'no','yes','unknown')

  • About current campaign:

  • contact (communication type: 'cellular','telephone')

  • duration (last contact duration, in seconds)

Important note: the duration attribute highly affects the output target (e.g., if duration=0 then y='no'). Yet, the duration is not known before a call is performed. Also, after the end of the call y is obviously known. Thus, this input should only be included for benchmark purposes and should be discarded if the intention is to have a realistic predictive model.

  • day (last contact day of the week)

  • month (last contact month of year)

  • campaign (number of contacts performed during this campaign and for this client)

  • deposit (has the client subscribed a term deposit?'yes','no')

About previous contacts/campaign:

  • pdays (number of days that passed by after the client was last contacted from a previous campaign, 999 means client was not previously contacted)

  • previous (number of contacts performed before this campaign and for this client)

  • poutcome (outcome of the previous marketing campaign: 'failure','nonexistent','success')

bin/spark-shell
scala> val bankDataFrame = spark.read.format("csv").option("inferSchema",true).load("/spark_data/bank.csv").toDF("age","job","marital","education","default","balance",
"housing","loan","contact","day","month","duration","campaign","pdays","previous","poutcome","deposit")

scala> bankDataFrame.filter($"age" < 30).show()

+---+-------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+ |age|          job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit| +---+-------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+ | 28|     services|  single|secondary|     no|   5090|    yes|  no|unknown|  6|  may|    1297|       3|   -1|       0| unknown|    yes| | 29|   management| married| tertiary|     no|    199|    yes| yes|unknown|  7|  may|    1689|       4|   -1|       0| unknown|    yes| | 28|       admin.|divorced|secondary|     no|    785|    yes|  no|unknown|  8|  may|     442|       2|   -1|       0| unknown|    yes| | 28|  blue-collar|  single|secondary|     no|    759|    yes|  no|unknown|  9|  may|     710|       1|   -1|       0| unknown|    yes|

scala> bankDataFrame.filter($"marital" === "single").write.json("/spark_data/results/json/single")

scala> bankDataFrame.filter($"marital" === "single").write.parquet("/spark_data/results/parquet/single")

Answer the following questions.

  1. What is the age range that has the balance more than 5000?

  2. What is the number of contact type "cellular" performed for the deposited client?

  3. Find the important characteristics of the deposited client?

-Average balance value?

-Contact duration must be at least?

-has loan or not?

-average age?

-previous campaign participation affects the current campaign deposit or not?

Running Spark scala

Scala program: Hello world

object HelloWorld {

   /* This is my first java program.

   * This will print 'Hello World' as the output

   */

   def main(args: Array[String]) {

      println("Hello, world!") // prints Hello World

   }

}

Save the file as − HelloWorld.scala.

You have to download scala compiler that is compatible for your spark version.

see https://docs.scala-lang.org/overviews/scala-book/hello-world-1.html

sudo apt-get install scala

Then you can compile it as follows.

 scalac HelloWorld.scala

Then add CLASSPATH in your ~/.bashrc

export CLASSPATH=$CLASSPATH:.

Then

source ~/.bashrc

and run the program below:

 scala HelloWorld

For spark on python

 MASTER=spark://IP:PORT ./bin/pyspark

or for a four core local machine

MASTER=local[4] ./bin/pyspark

say we copy the file RELEASE to hdfs.

hdfs dfs -put RELEASE /RELEASE

Running Spark python

Then run spark with python file.

create file SimpleApp.py as following

"""SimpleApp.py"""
from pyspark import SparkContext 

logFile = "hdfs://localhost:9000/RELEASE"  # Should be some file on your hdfs system 
sc = SparkContext("local", "Simple App") 
logData = sc.textFile(logFile).cache()  
numAs = logData.filter(lambda s: 'a' in s).count() 
numBs = logData.filter(lambda s: 'b' in s).count()  
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

Then use spark-submit to run your application

 bin/spark-submit  --master local[4]   SimpleApp.py 

Results:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/09/23 14:43:23 INFO SparkContext: Running Spark version 2.0.0 16/09/23 14:43:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/09/23 14:43:23 INFO SecurityManager: Changing view acls to: chantana 16/09/23 14:43:23 INFO SecurityManager: Changing modify acls to: chantana 16/09/23 14:43:23 INFO SecurityManager: Changing view acls groups to: 16/09/23 14:43:23 INFO SecurityManager: Changing modify acls groups to: 16/09/23 14:43:23 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(chantana); groups with view permissions: Set(); users  with modify permissions: Set(chantana); groups with modify permissions: Set() 16/09/23 14:43:23 INFO Utils: Successfully started service 'sparkDriver' on port 35950. 16/09/23 14:43:23 INFO SparkEnv: Registering MapOutputTracker 16/09/23 14:43:23 INFO SparkEnv: Registering BlockManagerMaster 16/09/23 14:43:23 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-2afaf7a5-27e9-455e-bc2f-2884453147e2 16/09/23 14:43:23 INFO MemoryStore: MemoryStore started with capacity 366.3 MB 16/09/23 14:43:23 INFO SparkEnv: Registering OutputCommitCoordinator 16/09/23 14:43:24 INFO Utils: Successfully started service 'SparkUI' on port 4040. 16/09/23 14:43:24 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://158.108.38.94:4040 16/09/23 14:43:24 INFO Utils: Copying /home/chantana/spark/SimpleApp.py to /tmp/spark-beb10625-37ca-46b2-baab-a68eec0f0268/userFiles-37b64b6e-568b-4a47-8259-9a26b37053bc/SimpleApp.py 16/09/23 14:43:24 INFO SparkContext: Added file file:/home/chantana/spark/SimpleApp.py at file:/home/chantana/spark/SimpleApp.py with timestamp 1474616604207 16/09/23 14:43:24 INFO Executor: Starting executor ID driver on host localhost 16/09/23 14:43:24 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 36499. 16/09/23 14:43:24 INFO NettyBlockTransferService: Server created on 158.108.38.94:36499 16/09/23 14:43:24 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 158.108.38.94, 36499) 16/09/23 14:43:24 INFO BlockManagerMasterEndpoint: Registering block manager 158.108.38.94:36499 with 366.3 MB RAM, BlockManagerId(driver, 158.108.38.94, 36499) 16/09/23 14:43:24 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 158.108.38.94, 36499) 16/09/23 14:43:24 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 232.0 KB, free 366.1 MB) 16/09/23 14:43:24 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.6 KB, free 366.1 MB) 16/09/23 14:43:24 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 158.108.38.94:36499 (size: 22.6 KB, free: 366.3 MB) 16/09/23 14:43:24 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2 16/09/23 14:43:24 INFO FileInputFormat: Total input paths to process : 1 16/09/23 14:43:24 INFO SparkContext: Starting job: count at /home/chantana/spark/SimpleApp.py:7 16/09/23 14:43:24 INFO DAGScheduler: Got job 0 (count at /home/chantana/spark/SimpleApp.py:7) with 1 output partitions 16/09/23 14:43:24 INFO DAGScheduler: Final stage: ResultStage 0 (count at /home/chantana/spark/SimpleApp.py:7) 16/09/23 14:43:24 INFO DAGScheduler: Parents of final stage: List() 16/09/23 14:43:24 INFO DAGScheduler: Missing parents: List() 16/09/23 14:43:24 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at count at /home/chantana/spark/SimpleApp.py:7), which has no missing parents 16/09/23 14:43:24 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.5 KB, free 366.0 MB) 16/09/23 14:43:24 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.0 KB, free 366.0 MB) 16/09/23 
14:43:24 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 158.108.38.94:36499 (size: 4.0 KB, free: 366.3 MB) 16/09/23 14:43:24 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1012 16/09/23 14:43:24 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (PythonRDD[2] at count at /home/chantana/spark/SimpleApp.py:7) 16/09/23 14:43:24 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 16/09/23 14:43:24 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 5485 bytes) 16/09/23 14:43:24 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 16/09/23 14:43:24 INFO Executor: Fetching file:/home/chantana/spark/SimpleApp.py with timestamp 1474616604207 16/09/23 14:43:24 INFO Utils: /home/chantana/spark/SimpleApp.py has been previously copied to /tmp/spark-beb10625-37ca-46b2-baab-a68eec0f0268/userFiles-37b64b6e-568b-4a47-8259-9a26b37053bc/SimpleApp.py 16/09/23 14:43:25 INFO HadoopRDD: Input split: file:/home/chantana/spark/README.md:0+3828 16/09/23 14:43:25 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 16/09/23 14:43:25 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 16/09/23 14:43:25 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 16/09/23 14:43:25 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 16/09/23 14:43:25 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 16/09/23 14:43:25 INFO MemoryStore: Block rdd_1_0 stored as bytes in memory (estimated size 2.5 KB, free 366.0 MB) 16/09/23 14:43:25 INFO BlockManagerInfo: Added rdd_1_0 in memory on 158.108.38.94:36499 (size: 2.5 KB, free: 366.3 MB) 16/09/23 14:43:25 INFO PythonRunner: Times: total = 216, boot = 204, init = 11, finish = 1 16/09/23 14:43:25 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2146 bytes result sent to driver 16/09/23 14:43:25 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 442 ms on localhost (1/1) 16/09/23 14:43:25 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 16/09/23 14:43:25 INFO DAGScheduler: ResultStage 0 (count at /home/chantana/spark/SimpleApp.py:7) finished in 0.457 s 16/09/23 14:43:25 INFO DAGScheduler: Job 0 finished: count at /home/chantana/spark/SimpleApp.py:7, took 0.570769 s 16/09/23 14:43:25 INFO SparkContext: Starting job: count at /home/chantana/spark/SimpleApp.py:8 16/09/23 14:43:25 INFO DAGScheduler: Got job 1 (count at /home/chantana/spark/SimpleApp.py:8) with 1 output partitions 16/09/23 14:43:25 INFO DAGScheduler: Final stage: ResultStage 1 (count at /home/chantana/spark/SimpleApp.py:8) 16/09/23 14:43:25 INFO DAGScheduler: Parents of final stage: List() 16/09/23 14:43:25 INFO DAGScheduler: Missing parents: List() 16/09/23 14:43:25 INFO DAGScheduler: Submitting ResultStage 1 (PythonRDD[3] at count at /home/chantana/spark/SimpleApp.py:8), which has no missing parents 16/09/23 14:43:25 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 6.5 KB, free 366.0 MB) 16/09/23 14:43:25 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.0 KB, free 366.0 MB) 16/09/23 14:43:25 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 158.108.38.94:36499 (size: 4.0 KB, free: 366.3 MB) 16/09/23 14:43:25 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1012 16/09/23 14:43:25 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (PythonRDD[3] at count at /home/chantana/spark/SimpleApp.py:8) 16/09/23 14:43:25 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 16/09/23 14:43:25 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0, PROCESS_LOCAL, 5485 bytes) 16/09/23 14:43:25 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 16/09/23 14:43:25 INFO BlockManager: Found block rdd_1_0 locally 16/09/23 14:43:25 INFO PythonRunner: Times: total = 41, boot = -89, init = 130, finish = 0 16/09/23 14:43:25 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1461 bytes result sent to driver 16/09/23 14:43:25 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 63 ms on localhost (1/1) 16/09/23 14:43:25 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 16/09/23 14:43:25 INFO DAGScheduler: ResultStage 1 (count at /home/chantana/spark/SimpleApp.py:8) finished in 0.064 s 16/09/23 14:43:25 INFO DAGScheduler: Job 1 finished: count at /home/chantana/spark/SimpleApp.py:8, took 0.081462 s 
Lines with a: 61, lines with b: 27 
16/09/23 14:43:25 INFO SparkContext: Invoking stop() from shutdown hook 16/09/23 14:43:25 INFO SparkUI: Stopped Spark web UI at http://158.108.38.94:4040 16/09/23 14:43:25 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 16/09/23 14:43:25 INFO MemoryStore: MemoryStore cleared 16/09/23 14:43:25 INFO BlockManager: BlockManager stopped 16/09/23 14:43:25 INFO BlockManagerMaster: BlockManagerMaster stopped 16/09/23 14:43:25 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 16/09/23 14:43:25 INFO SparkContext: Successfully stopped SparkContext 16/09/23 14:43:25 INFO ShutdownHookManager: Shutdown hook called 16/09/23 14:43:25 INFO ShutdownHookManager: Deleting directory /tmp/spark-beb10625-37ca-46b2-baab-a68eec0f0268/pyspark-60c6c4b3-a747-4f02-b31c-af274fa75074 16/09/23 14:43:25 INFO ShutdownHookManager: Deleting directory /tmp/spark-beb10625-37ca-46b2-baab-a68eec0f0268  Lines with a: 46, Lines with b: 23

You can turn off the logging message. see

https://stackoverflow.com/questions/25193488/how-to-turn-off-info-logging-in-spark

You may skip the following if not use JAVA.

Running Spark Java,

Follow the instruction in https://spark.apache.org/docs/latest/quick-start.html

First

mkdir java_example

Create file containing the following. Save in the folder java_example above

/*** SimpleApp.java ***/

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

public class SimpleApp {

  public static void main(String[] args) {

    String logFile = "/home/hadoop/spark/RELEASE"; // Should be some file on your system

    JavaSparkContext sc = new JavaSparkContext("local", "Simple App",

      "$YOUR_SPARK_HOME", new String[]{"target/simple-project-1.0.jar"});

    JavaRDD<String> logData = sc.textFile(logFile).cache();

    long numAs = logData.filter(new Function<String, Boolean>() {

      public Boolean call(String s) { return s.contains("a"); }

    }).count();

    long numBs = logData.filter(new Function<String, Boolean>() {

      public Boolean call(String s) { return s.contains("b"); }

    }).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

  }

}

cd java_example

Create pom.xml

<project>  
 <groupId>edu.berkeley</groupId>  
 <artifactId>simple-project</artifactId>   
<modelVersion>4.0.0</modelVersion>   
<name>Simple Project</name>   
<packaging>jar</packaging>   
<version>1.0</version>  
 <dependencies>     
<dependency> 
<!-- Spark dependency -->       
<groupId>org.apache.spark</groupId>    
  <artifactId>spark-sql_2.12</artifactId>       
<version>2.4.5</version>       
<scope>provided</scope>     
</dependency>   
</dependencies> 
</project>

create ./src/main/java/ according to mvn structure.

mkdir -p ~/spark/java_example/src/main/java
 find .

should output

./pom.xml ./src ./src/main ./src/main/java ./src/main/java/SimpleApp.java

install maven

sudo apt-get install maven

Now, execute the application using Maven: in the folder java_example

 mvn package 
...
Downloaded from central: https://repo.maven.apache.org/maven2/commons-lang/commons-lang/2.1/commons-lang-2.1.jar (208 kB at 1.8 MB/s)
Downloaded from central: https://repo.maven.apache.org/maven2/org/codehaus/plexus/plexus-utils/3.0/plexus-utils-3.0.jar (226 kB at 1.6 MB/s)
[INFO] Building jar: /home/hadoop/spark/java_example/target/simple-project-1.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  37.706 s
[INFO] Finished at: 2025-03-10T09:08:54Z
[INFO] ------------------------------------------------------------------------

Then run it.

/home/hadoop/spark/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
25/03/10 09:09:26 WARN Utils: Your hostname, bigdata2024 resolves to a loopback address: 127.0.1.1; using 10.3.132.216 instead (on interface ens3)
25/03/10 09:09:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/03/10 09:09:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
25/03/10 09:09:27 INFO SparkContext: Running Spark version 3.1.1
25/03/10 09:09:27 INFO ResourceUtils: ==============================================================
25/03/10 09:09:27 INFO ResourceUtils: No custom resources configured for spark.driver.
25/03/10 09:09:27 INFO ResourceUtils: ==============================================================
...

Lines with a: 2, lines with b: 2
25/03/10 09:09:30 INFO SparkContext: Invoking stop() from shutdown hook
25/03/10 09:09:30 INFO SparkUI: Stopped Spark web UI at http://10.3.132.216:4041
25/03/10 09:09:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
25/03/10 09:09:30 INFO MemoryStore: MemoryStore cleared
25/03/10 09:09:30 INFO BlockManager: BlockManager stopped
25/03/10 09:09:30 INFO BlockManagerMaster: BlockManagerMaster stopped
25/03/10 09:09:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
25/03/10 09:09:30 INFO SparkContext: Successfully stopped SparkContext
25/03/10 09:09:30 INFO ShutdownHookManager: Shutdown hook called
25/03/10 09:09:30 INFO ShutdownHookManager: Deleting directory /tmp/spark-53b5b656-382a-4652-a49f-2f9da2f3eac2
25/03/10 09:09:30 INFO ShutdownHookManager: Deleting directory /tmp/spark-5ef13389-6ed0-4d39-979f-4ddc69515bc0

(See more java example. https://github.com/jgperrin/net.jgp.labs.spark)

Run Existing Example:

./bin/run-example SparkPi 2 local

See the following code in the example folder under spark folder


import scala.math.random

import org.apache.spark._

/** Computes an approximation to pi */

object SparkPi {

 def main(args: Array[String]) {

 val conf = new SparkConf().setAppName("Spark Pi")

 val spark = new SparkContext(conf)

 val slices = if (args.length > 0) args(0).toInt else 2

 val n = 100000 * slices

 val count = spark.parallelize(1 to n, slices).map { i =>

 val x = random * 2 - 1

 val y = random * 2 - 1

 if (x*x + y*y < 1) 1 else 0

 }.reduce(_ + _)

 println("Pi is roughly " + 4.0 * count / n)

 spark.stop()

 }

}

To run,

./bin/spark-submit  examples/src/main/python/wordcount.py ./README.md

... Results:

clean: 1

<class>: 1

spark://: 1

first: 1

core: 1

talk: 1

latest: 1

16/09/23 14:55:17 INFO SparkUI: Stopped Spark web UI at http://158.108.38.94:4040

16/09/23 14:55:17 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

16/09/23 14:55:17 INFO MemoryStore: MemoryStore cleared

16/09/23 14:55:17 INFO BlockManager: BlockManager stopped

16/09/23 14:55:17 INFO BlockManagerMaster: BlockManagerMaster stopped

16/09/23 14:55:17 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

16/09/23 14:55:17 INFO SparkContext: Successfully stopped SparkContext

16/09/23 14:55:18 INFO ShutdownHookManager: Shutdown hook called

16/09/23 14:55:18 INFO ShutdownHookManager: Deleting directory /tmp/spark-36f8c485-f738-472e-a465-b50071a18b62/pyspark-62bf92b4-c65d-47f1-8726-a18acc8fd3b1

16/09/23 14:55:18 INFO ShutdownHookManager: Deleting directory /tmp/spark-36f8c485-f738-472e-a465-b50071a18b62

More examples: see file examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala

 cp data/mllib/kmeans_data.txt  .

 ./bin/run-example SparkKMeans kmeans_data.txt 3 0.01 local examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala

 cp data/mllib/pagerank_data.txt  .

./bin/run-example SparkPageRank pagerank_data.txt 10 local

References

Installation and test

(http://blog.prabeeshk.com/blog/2014/10/31/install-apache-spark-on-ubuntu-14-dot-04/)

Testing

http://spark.apache.org/docs/1.1.1/quick-start.html

More about Spark API

http://spark.apache.org/docs/1.1.1/programming-guide.html#linking-with-spark

Basic Tutorial about Scala

http://www.scala-lang.org/old/node/166.html

https://www.cs.helsinki.fi/u/lagerspe/courses/scala.pdf

http://www.tutorialspoint.com/scala/

https://www.safaribooksonline.com/library/view/scala-cookbook/9781449340292/ch11s23.html

⚠️ **GitHub.com Fallback** ⚠️