Difference between revisions of "Spark"
(→Submission script) |
|||
Line 75: | Line 75: | ||
# Glenn K. Lockwood, San Diego Supercomputer Center February 2014 | # Glenn K. Lockwood, San Diego Supercomputer Center February 2014 | ||
# Revised: | # Revised: | ||
− | # Tingyang Xu | + | # Tingyang Xu September 2015 |
################################################################################ | ################################################################################ | ||
## -N 4 means we are going to use 4 nodes to run Hadoop cluster | ## -N 4 means we are going to use 4 nodes to run Hadoop cluster |
Revision as of 16:31, 16 March 2017
The Spark standalone cluster with hadoop. It is based on the module of Hadoop.
Contents
Load modules
The following modules are needed by Spark:
zlib/1.2.8 openssl/1.0.1e java/1.8.0_31 protobuf/2.5.0 myhadoop/Sep012015 spark/1.5.1
To Load the modules:
module load zlib/1.2.8 openssl/1.0.1e java/1.8.0_31 protobuf/2.5.0 myhadoop/Sep012015 spark/1.5.1
If you want to use the pyspark
with different version of python, you need to add the python module to the ~/.bashrc
. For example, if using python 2.7.6:
$ module initadd python/2.7.6 $ cat ~/.bashrc
# .bashrc
# Source global definitions
if [ -f /etc/bashrc ]; then
. /etc/bashrc
fi
if [ ! -f ~/.ssh/id_rsa ]; then
echo 'No public/private RSA keypair found.'
ssh-keygen -t rsa -b 2048 -f ~/.ssh/id_rsa -N ""
cat ~/.ssh/id_rsa.pub > ~/.ssh/authorized_keys
chmod 644 ~/.ssh/authorized_keys
fi
# Load saved modules
module load null python/2.7.6
# User specific aliases and functions
$ module initremove python/2.7.6 # remove the module from the .bashrc file.
Start Spark Cluster
Here is an example SLURM batch script running word counting mapreduce job. It is based on the code written by Dr. Lockwood. We tailed it to adapt our cluster. This script uses persistent mode.
Usually, the data of the namenode and the datanodes are located at:
/scratch/${USER}_hadoop.$SLURM_JOBID
It is better to clean up the above folder after using Spark if you don't need it in the future. Otherwise, you may receive several warning emails about that the scratch data will be automatically deleted.
Scala Script
We need a Scala script, example.scala
, to run:
// Example of Pi
val NUM_SAMPLES=100000000
val count = sc.parallelize(1 to NUM_SAMPLES).map{i =>
val x = Math.random()
val y = Math.random()
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)
// Example of word count
val textFile = sc.textFile("hdfs://ib-"+sys.env("HOSTNAME")+":54310/data/pg2701.txt")
val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).map(item => item.swap).sortByKey(false)
counts.saveAsTextFile("hdfs://ib-"+sys.env("HOSTNAME")+":54310/scala_outputs")
exit
Submission script
Then submit the SQL commands with Python file using the following script. Later on, to customize the script to you needs, changed the areas highlighted:
#!/bin/bash
################################################################################
# slurm.sbatch - A sample submit script for SLURM that illustrates how to
# spin up a Hadoop cluster for a map/reduce task using myHadoop
#
# Created:
# Glenn K. Lockwood, San Diego Supercomputer Center February 2014
# Revised:
# Tingyang Xu September 2015
################################################################################
## -N 4 means we are going to use 4 nodes to run Hadoop cluster
#SBATCH -N 4
## -c 24 means we are going to use 24 cores on each node.
#SBATCH -c 24
## --ntasks-per-node=1 means each node runs single datanode/namenode.
## When you write your own SLURM batch, you DON'T need to change ntasks-per-node or exclusive.
#SBATCH --ntasks-per-node=1
#SBATCH --exclusive
# download example data to your folder for later on mapredue job.
if [ ! -f ./pg2701.txt ]; then
echo "*** Retrieving some sample input data"
wget 'http://www.gutenberg.org/cache/epub/2701/pg2701.txt'
fi
# where to store the temp configure files.
# make sure that the configure files are GLOBALLY ACCESSIBLE
export HADOOP_CONF_DIR=/apps2/myHadoop/Sep012015/scratch/${USER}_hadoop-conf.$SLURM_JOBID
# If you want to use Hive, please add the following line
#export HIVE_CONF_DIR=$HADOOP_CONF_DIR/hive
#################NO CHANGE############################
export SPARK_CONF_DIR=$HADOOP_CONF_DIR
if [ "z$HADOOP_OLD_DIR" == "z" ]; then
myhadoop-configure.sh
else
myhadoop-configure.sh -p $HADOOP_OLD_DIR
fi
# test if the HADOOP_CONF_DIR is globally accessible
if ! srun ls -d $HADOOP_CONF_DIR; then
echo "The configure files are not globally accessible.
Please consider the the shared, home, or scratch directory to put your HADOOP_CONF_DIR.
For example, export HADOOP_CONF_DIR=/scratch/$USER_hadoop-conf.$SLURM_JOBID"
myhadoop-cleanup.sh
rm -rf $HADOOP_CONF_DIR
exit 1
fi
$HADOOP_HOME/sbin/start-all.sh
sleep 5
hdfs dfs -ls /
$SPARK_HOME/sbin/start-all.sh
hdfs dfs -mkdir -p /tmp/hive/$USER
hdfs dfs -chmod -R 777 /tmp
#################NO CHANGE END########################
# make the data dir. Here we need the direct dir in the hdfs
hdfs dfs -mkdir /data
hdfs dfs -put ./pg2701.txt /data
hdfs dfs -ls /data
# run spark
spark-shell -i example.scala
# copy out the results
hdfs dfs -ls /scala_outputs
hdfs dfs -get /scala_outputs ./
#################NO CHANGE############################
$SPARK_HOME/sbin/stop-all.sh
$HADOOP_HOME/sbin/stop-all.sh
myhadoop-cleanup.sh
rm -rf $HADOOP_CONF_DIR
#################NO CHANGE END########################
Interactive Spark(beta) use with SLURM (not recomended)
NOTE: *any* interruption to the network will cause your job to crash irrecoverably.
To run an interactive Hive / Hadoop session, you will want to do the following:
[NetID@cn65 ~]$ module load zlib/1.2.8 protobuf/2.5.0 openssl java/1.8.0_31 myhadoop/Sep012015 spark/1.5.1 # You can change number of nodes you want to run the Hadoop. # Please always KEEP "--exclusive --ntasks-per-node=1". [NetID@cn65 ~]$ fisbatch.hadoop --exclusive --ntasks-per-node=1 -c24 -N 4 FISBATCH -- the maximum time for the interactive screen is limited to 6 hours. You can add QoS to overwrite it. FISBATCH -- waiting for JOBID 24187 to start on cluster=cluster and partition=Westmere ...! FISBATCH -- booting the Hadoop nodes *******************! FISBATCH -- Connecting to head node (cn43) [NetID@cn43 ~]$ hdfs dfs -put pg2701.txt /tmp/ [NetID@cn43 ~]$ pyspark Python 2.6.6 (r266:84292, May 22 2015, 08:34:51) [GCC 4.4.7 20120313 (Red Hat 4.4.7-15)] on linux2 Type "help", "copyright", "credits" or "license" for more information. ... Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.5.1 /_/ Using Python version 2.6.6 (r266:84292, May 22 2015 08:34:51) SparkContext available as sc, HiveContext available as sqlContext. >>> # compute the pi ... >>> import random >>> def sample(p): ... x,y=random.random(), random.random() ... return 1 if x*x+y*y<1 else 0 ... >>> count=sc.parallelize(xrange(0,100000000)).map(sample).reduce(lambda a,b:a+b) .................................... 15/12/30 10:51:55 INFO DAGScheduler: ResultStage 0 (reduce at <stdin>:1) finished in 2.341 s 15/12/30 10:51:55 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/12/30 10:51:55 INFO DAGScheduler: Job 0 finished: reduce at <stdin>:1, took 2.489716 s >>> print "Pi is roughly %f" % (4.0 * count / 100000000) Pi is roughly 3.142013 >>> >>> >>> >>> >>> # word count example ... >>> text_file=sc.textFile("hdfs://ib-"+os.environ.get('HOSTNAME')+":54310/tmp/pg2701.txt") # hostname: ib-$HOSTNAME. port: 54310 15/12/30 10:55:26 INFO MemoryStore: ensureFreeSpace(110576) called with curMem=22983, maxMem=2222739947 15/12/30 10:55:26 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 108.0 KB, free 2.1 GB) 15/12/30 10:55:26 INFO MemoryStore: ensureFreeSpace(10273) called with curMem=133559, maxMem=2222739947 15/12/30 10:55:26 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 10.0 KB, free 2.1 GB) 15/12/30 10:55:26 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.100.8:46081 (size: 10.0 KB, free: 2.1 GB) 15/12/30 10:55:26 INFO SparkContext: Created broadcast 3 from textFile at NativeMethodAccessorImpl.java:-2 >>>counts=text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a,b:a+b).map(lambda (word,count):(count,word)).sortByKey(False) ..................................... 15/12/30 10:59:59 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 15/12/30 10:59:59 INFO DAGScheduler: ResultStage 6 (sortByKey at <stdin>:1) finished in 0.127 s 15/12/30 10:59:59 INFO DAGScheduler: Job 4 finished: sortByKey at <stdin>:1, took 0.137471 s >>> counts.saveAsTextFile("hdfs://ib-"+os.environ.get('HOSTNAME')+":54310/tmp/outputs") ..................................... 15/12/30 11:01:14 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool 15/12/30 11:01:14 INFO DAGScheduler: ResultStage 9 (saveAsTextFile at NativeMethodAccessorImpl.java:-2) finished in 1.021 s 15/12/30 11:01:14 INFO DAGScheduler: Job 5 finished: saveAsTextFile at NativeMethodAccessorImpl.java:-2, took 1.214476 s >>> exit() [NetID@cn43 ~]$ hdfs dfs -get /tmp/outputs [NetID@cn43 ~]$ head outputs/part-00000 (13765, u'the') (6587, u'of') (5951, u'and') (4533, u'a') (4510, u'to') (4347, u) (3879, u'in') (2693, u'that') (2415, u'his') (1724, u'I') [NetID@cn43 ~]$ exit [screen is terminating] Connection to cn43 closed. FISBATCH -- exiting job # This command is optional. See the following explainations. [NetID@cn65 ~]$ sjobs JobID Partition QOS JobName User Submit State Elapsed NNodes NCPUS NodeList Start ------------ ---------- ---------- ---------- --------- ------------------- ---------- ---------- -------- ---------- --------------- ------------------- 24187 Westmere standard HADOOP.FI+ NetID 2015-09-10T21:54:33 RUNNING 00:32:32 4 48 cn[43-46] 2015-09-10T21:54:34 24187.0 screen 2015-09-10T21:55:03 COMPLETED 00:32:01 1 12 cn43 2015-09-10T21:55:03 24194 Westmere standard HADOOP.CL+ tix11001 2015-09-10T22:27:04 PENDING 00:00:00 2 0 None assigned Unknown
You job may still runs for a while to turn off the Hadoop daemon and cleanup the tmp files. Or you may find another job called "HADOOP.CLEANUP" is running/pending in your job queue. Just let it run. The cleanup steps should only takes 20-60 seconds.
NOTE: please DO NOT FORGET to EXIT from the nodes so that the other users can use it.
Help to debug
This is a beta version of interactive Hive / Hadoop. So please contact us at mailto:hpchelp@engr.uconn.edu if you face any errors or find any bugs.
Start Spark with previous job
If you want to restart a new job using the HDFS creating in the previous job, you need to do the following steps. First, find out your hadoop hdfs folder. By default, it is located at: /scratch/${USER}_hadoop.$SLURM_JOB_ID
Submission Script
Firstly, you need to point out the old HDFS folder by issuing:
$ export HADOOP_OLD_DIR=/path/to/old/hadoop/folder # by default, it is /scratch/${USER}_hadoop.$SLURM_JOB_ID
Then, start your submission script:
$ sbatch slurm.sh
Interactive
Before issuing fisbatch.hadoop
, you need export HADOOP_OLD_DIR
.
$ export HADOOP_OLD_DIR=/path/to/old/hadoop/folder # by default, it is /scratch/${USER}_hadoop.$SLURM_JOB_ID $ fisbatch.hadoop --exclusive --ntasks-per-node=1 -c12 -p Westmere -N 4、 ...