Spark
Apache Spark is an open-source data analytics cluster computing framework.
Hadoop
Spark is not tied to the two-stage MapReduce paradigm, and promises performance up to 100 times faster than Hadoop MapReduce for certain applications. Spark provides primitives for in-memory cluster computing that allows user programs to load data into a cluster's memory and query it repeatedly, making it well suited to machine learning algorithms.
Before to submit a Spark application to a YARN cluster, export environment variables:
$ source /etc/default/hadoop
To submit a Spark application to a YARN cluster:
$ cd /apps/hathi/spark
$ ./bin/spark-submit --master yarn --deploy-mode cluster examples/src/main/python/pi.py 100
Please note that there are two ways to specify the master: yarn-cluster and yarn-client. In cluster mode, your driver program will run on the worker nodes; while in client mode, your driver program will run within the spark-submit process which runs on the hathi front end. We recommand that you always use the cluster mode on hathi to avoid overloading the front end nodes.
To write your own spark jobs, use the Spark Pi as a baseline to start.
Spark can work with input files from both HDFS and local file system. The default after exporting the environment variables is from HDFS. To use input files that are on the cluster storage (e.g., data depot), specify: file:///path/to/file.
Note: when reading input files from cluster storage, the files must be accessible from any node in the cluster.
To run an interactive analysis or to learn the API with Spark Shell:
$ cd /apps/hathi/spark
$ ./bin/pyspark
Create a Resilient Distributed Dataset (RDD) from Hadoop InputFormats (such as HDFS files):
>>> textFile = sc.textFile("derby.log")
15/09/22 09:31:58 INFO storage.MemoryStore: ensureFreeSpace(67728) called with curMem=122343, maxMem=278302556
15/09/22 09:31:58 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 66.1 KB, free 265.2 MB)
15/09/22 09:31:58 INFO storage.MemoryStore: ensureFreeSpace(14729) called with curMem=190071, maxMem=278302556
15/09/22 09:31:58 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 14.4 KB, free 265.2 MB)
15/09/22 09:31:58 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:57813 (size: 14.4 KB, free: 265.4 MB)
15/09/22 09:31:58 INFO spark.SparkContext: Created broadcast 1 from textFile at NativeMethodAccessorImpl.java:-2
Note: derby.log is a file on hdfs://hathi-adm.rcac.purdue.edu:8020/user/myusername/derby.log
Call the count() action on the RDD:
>>> textFile.count()
15/09/22 09:32:01 INFO mapred.FileInputFormat: Total input paths to process : 1
15/09/22 09:32:01 INFO spark.SparkContext: Starting job: count at :1
15/09/22 09:32:01 INFO scheduler.DAGScheduler: Got job 0 (count at :1) with 2 output partitions (allowLocal=false)
15/09/22 09:32:01 INFO scheduler.DAGScheduler: Final stage: ResultStage 0(count at :1)
......
15/09/22 09:32:03 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 1870 bytes result sent to driver
15/09/22 09:32:04 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2254 ms on localhost (1/2)
15/09/22 09:32:04 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 2220 ms on localhost (2/2)
15/09/22 09:32:04 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/09/22 09:32:04 INFO scheduler.DAGScheduler: ResultStage 0 (count at :1) finished in 2.317 s
15/09/22 09:32:04 INFO scheduler.DAGScheduler: Job 0 finished: count at :1, took 2.548350 s
93
To learn programming in Spark, refer to Spark Programming Guide
To learn submitting Spark applications, refer to Submitting Applications
PBS
This section walks through how to submit and run a Spark job using PBS on the compute nodes of Hammer.
pbs-spark-submit launches an Apache Spark program within a PBS job, including starting the Spark master and worker processes in standalone mode, running a user supplied Spark job, and stopping the Spark master and worker processes. The Spark program and its associated services will be constrained by the resource limits of the job and will be killed off when the job ends. This effectively allows PBS to act as a Spark cluster manager.
The following steps assume that you have a Spark program that can run without errors.
To use Spark and pbs-spark-submit, you need to load the following two modules to setup SPARK_HOME and PBS_SPARK_HOME environment variables.
module load spark
module load pbs-spark-submit
The following example submission script serves as a template to build your customized, more complex Spark job submission. This job requests 2 whole compute nodes for 10 minutes, and submits to the default queue.
#PBS -N spark-pi
#PBS -l nodes=2:ppn=20
#PBS -l walltime=00:10:00
#PBS -q standby
#PBS -o spark-pi.out
#PBS -e spark-pi.err
cd $PBS_O_WORKDIR
module load spark
module load pbs-spark-submit
pbs-spark-submit $SPARK_HOME/examples/src/main/python/pi.py 1000
In the submission script above, this command submits the pi.py program to the nodes that are allocated to your job.
pbs-spark-submit $SPARK_HOME/examples/src/main/python/pi.py 1000
You can set various environment variables in your submission script to change the setting of Spark program. For example, the following line sets the SPARK_LOG_DIR to $HOME/log. The default value is current working directory.
export SPARK_LOG_DIR=$HOME/log
The same environment variables can be set via the pbs-spark-submit command line argument. For example, the following line sets the SPARK_LOG_DIR to $HOME/log2.
pbs-spark-submit --log-dir $HOME/log2
Environment Variable | Default | Shell Export | Command Line Args |
---|---|---|---|
SPAKR_CONF_DIR | $SPARK_HOME/conf | export SPARK_CONF_DIR=$HOME/conf | --conf-dir |
SPAKR_LOG_DIR | Current Working Directory | export SPARK_LOG_DIR=$HOME/log | --log-dir |
SPAKR_LOCAL_DIR | /tmp | export SPARK_LOCAL_DIR=$RCAC_SCRATCH/local | NA |
SCRATCHDIR | Current Working Directory | export SCRATCHDIR=$RCAC_SCRATCH/scratch | --work-dir |
SPARK_MASTER_PORT | 7077 | export SPARK_MASTER_PORT=7078 | NA |
SPARK_DAEMON_JAVA_OPTS | None | export SPARK_DAEMON_JAVA_OPTS="-Dkey=value" | -D key=value |
Note that SCRATCHDIR must be a shared scratch directory across all nodes of a job.
In addition, pbs-spark-submit supports command line arguments to change the properties of the Spark daemons and the Spark jobs. For example, the --no-stop argument tells Spark to not stop the master and worker daemons after the Spark application is finished, and the --no-init argument tells Spark to not initialize the Spark master and worker processes. This is intended for use in a sequence of invocations of Spark programs within the same job.
pbs-spark-submit --no-stop $SPARK_HOME/examples/src/main/python/pi.py 800
pbs-spark-submit --no-init $SPARK_HOME/examples/src/main/python/pi.py 1000
Use the following command to see the complete list of command line arguments.
pbs-spark-submit -h
To learn programming in Spark, refer to Spark Programming Guide
To learn submitting Spark applications, refer to Submitting Applications