Spark

From Storrs HPC Wiki
Revision as of 16:06, 16 May 2017 by Tix11001 (talk | contribs) (Submission script)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to: navigation, search

The Spark standalone cluster with hadoop. It is based on the module of Hadoop.

Modules

Prerequired Modules

The following modules are needed by Spark:

zlib/1.2.8
openssl/1.0.1e
java/1.8.0_31
protobuf/2.5.0
myhadoop/Sep012015

Available Spark Modules

$ module avail spark
-------------------------------- /apps2/Modules/3.2.6/modulefiles --------------------------------
spark/1.5.1 spark/1.6.1 spark/2.1.0

Load Spark Module

To Load the module:

module load zlib/1.2.8 openssl/1.0.1e java/1.8.0_31 protobuf/2.5.0 myhadoop/Sep012015 spark/2.1.0

If you want to use the pyspark with different version of python, you need to add the python module to the ~/.bashrc. For example, if using python 2.7.6:

$ module initadd python/2.7.6
$ cat ~/.bashrc
# .bashrc

# Source global definitions
if [ -f /etc/bashrc ]; then
        . /etc/bashrc
fi


if [ ! -f ~/.ssh/id_rsa ]; then
    echo 'No public/private RSA keypair found.'
    ssh-keygen -t rsa -b 2048 -f ~/.ssh/id_rsa -N ""
    cat ~/.ssh/id_rsa.pub > ~/.ssh/authorized_keys
    chmod 644 ~/.ssh/authorized_keys
fi

# Load saved modules
module load null python/2.7.6

# User specific aliases and functions
$ module initremove python/2.7.6 # remove the module from the .bashrc file.

Start Spark Cluster

Here is an example SLURM batch script running word counting mapreduce job. It is based on the code written by Dr. Lockwood. We tailed it to adapt our cluster. This script uses persistent mode.

Usually, the data of the namenode and the datanodes are located at:

/scratch/${USER}_hadoop.$SLURM_JOBID

It is better to clean up the above folder after using Spark if you don't need it in the future. Otherwise, you may receive several warning emails about that the scratch data will be automatically deleted.

Scala Script

We need a Scala script, example.scala, to run:

// Example of Pi
val NUM_SAMPLES=100000000
val count = sc.parallelize(1 to NUM_SAMPLES).map{i =>
  val x = Math.random()
  val y = Math.random()
  if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)

// Example of word count
val textFile = sc.textFile("hdfs://ib-"+sys.env("HOSTNAME")+":54310/data/pg2701.txt")
val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).map(item => item.swap).sortByKey(false)
counts.saveAsTextFile("hdfs://ib-"+sys.env("HOSTNAME")+":54310/scala_outputs")

exit

Submission script

Then submit the SQL commands with Python file using the following script. Later on, to customize the script to you needs, changed the areas highlighted:

#!/bin/bash
################################################################################
#  slurm.sbatch - A sample submit script for SLURM that illustrates how to
#  spin up a Hadoop cluster for a map/reduce task using myHadoop
#
# Created:
#  Glenn K. Lockwood, San Diego Supercomputer Center             February 2014
# Revised:
#  Tingyang Xu      September 2015
################################################################################
## -N 4 means we are going to use 4 nodes to run Hadoop cluster
#SBATCH -N 4
## -c 24 means we are going to use 24 cores on each node.
#SBATCH -c 24
## --ntasks-per-node=1 means each node runs single datanode/namenode.
## When you write your own SLURM batch, you DON'T need to change ntasks-per-node or exclusive.
#SBATCH --ntasks-per-node=1
#SBATCH --exclusive

# download example data to your folder for later on mapredue job.
if [ ! -f ./pg2701.txt ]; then
  echo "*** Retrieving some sample input data"
  wget 'http://www.gutenberg.org/cache/epub/2701/pg2701.txt'
fi

# where to store the temp configure files.
# make sure that the configure files are GLOBALLY ACCESSIBLE
export HADOOP_CONF_DIR=/apps2/myHadoop/Sep012015/scratch/${USER}_hadoop-conf.$SLURM_JOBID

# If you want to use Hive, please add the following line
#export HIVE_CONF_DIR=$HADOOP_CONF_DIR/hive

#################NO CHANGE############################
# Setup the cleanup script in case the job is canceled during the running.
sbatch -N $SLURM_JOB_NUM_NODES -c $SLURM_CPUS_PER_TASK  --ntasks-per-node=1 --exclusive --job-name=HADOOP.CLEANUP --dependency=afternotok:$SLURM_JOBID --nodelist=$SLURM_JOB_NODELIST --time=00:10:00 --output=/dev/null --error=/dev/null /apps2/myHadoop/Sep012015/bin/fisbatch_helpers/_epilog

export SPARK_CONF_DIR=$HADOOP_CONF_DIR

if [ "z$HADOOP_OLD_DIR" == "z" ]; then
  myhadoop-configure.sh
else
  myhadoop-configure.sh -p $HADOOP_OLD_DIR
fi

# test if the HADOOP_CONF_DIR is globally accessible
if ! srun ls -d $HADOOP_CONF_DIR; then
  echo "The configure files are not globally accessible.
       Please consider the the shared, home, or scratch directory to put your HADOOP_CONF_DIR.
       For example, export HADOOP_CONF_DIR=/scratch/$USER_hadoop-conf.$SLURM_JOBID"
  myhadoop-cleanup.sh
  rm -rf $HADOOP_CONF_DIR
  exit 1
fi
$HADOOP_HOME/sbin/start-all.sh
sleep 5
hdfs dfs -ls /
$SPARK_HOME/sbin/start-all.sh
hdfs dfs -mkdir -p /tmp/hive/$USER
hdfs dfs -chmod -R 777 /tmp
#################NO CHANGE END########################

# make the data dir. Here we need the direct dir in the hdfs
hdfs dfs -mkdir /data
hdfs dfs -put ./pg2701.txt /data
hdfs dfs -ls /data
# run spark
spark-shell -i example.scala
# copy out the results
hdfs dfs -ls /scala_outputs
hdfs dfs -get /scala_outputs ./

#################NO CHANGE############################
$SPARK_HOME/sbin/stop-all.sh
$HADOOP_HOME/sbin/stop-all.sh
myhadoop-cleanup.sh
rm -rf $HADOOP_CONF_DIR
#################NO CHANGE END########################

Interactive Spark(beta) use with SLURM (not recomended)

NOTE: *any* interruption to the network will cause your job to crash irrecoverably.

To run an interactive Hive / Hadoop session, you will want to do the following:

[NetID@cn65 ~]$ module load zlib/1.2.8 protobuf/2.5.0 openssl java/1.8.0_31 myhadoop/Sep012015 spark/1.5.1
# You can change number of nodes you want to run the Hadoop.
# Please always KEEP "--exclusive --ntasks-per-node=1". 
[NetID@cn65 ~]$ fisbatch.hadoop --exclusive --ntasks-per-node=1 -c24 -N 4
FISBATCH -- the maximum time for the interactive screen is limited to 6 hours. You can add QoS to overwrite it.
FISBATCH -- waiting for JOBID 24187 to start on cluster=cluster and partition=Westmere
...!
FISBATCH -- booting the Hadoop nodes
*******************!
FISBATCH -- Connecting to head node (cn43)
[NetID@cn43 ~]$ hdfs dfs -put pg2701.txt /tmp/
[NetID@cn43 ~]$ pyspark
Python 2.6.6 (r266:84292, May 22 2015, 08:34:51)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-15)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.1
      /_/

Using Python version 2.6.6 (r266:84292, May 22 2015 08:34:51)
SparkContext available as sc, HiveContext available as sqlContext.
>>> # compute the pi
...
>>> import random
>>> def sample(p):
...     x,y=random.random(), random.random()
...     return 1 if x*x+y*y<1 else 0
...
>>> count=sc.parallelize(xrange(0,100000000)).map(sample).reduce(lambda a,b:a+b)
....................................
15/12/30 10:51:55 INFO DAGScheduler: ResultStage 0 (reduce at <stdin>:1) finished in 2.341 s
15/12/30 10:51:55 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/12/30 10:51:55 INFO DAGScheduler: Job 0 finished: reduce at <stdin>:1, took 2.489716 s
>>> print "Pi is roughly %f" % (4.0 * count / 100000000)
Pi is roughly 3.142013
>>>
>>>
>>>
>>>
>>> # word count example
...
>>> text_file=sc.textFile("hdfs://ib-"+os.environ.get('HOSTNAME')+":54310/tmp/pg2701.txt") # hostname: ib-$HOSTNAME. port: 54310
15/12/30 10:55:26 INFO MemoryStore: ensureFreeSpace(110576) called with curMem=22983, maxMem=2222739947
15/12/30 10:55:26 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 108.0 KB, free 2.1 GB)
15/12/30 10:55:26 INFO MemoryStore: ensureFreeSpace(10273) called with curMem=133559, maxMem=2222739947
15/12/30 10:55:26 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 10.0 KB, free 2.1 GB)
15/12/30 10:55:26 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.100.8:46081 (size: 10.0 KB, free: 2.1 GB)
15/12/30 10:55:26 INFO SparkContext: Created broadcast 3 from textFile at NativeMethodAccessorImpl.java:-2
>>>counts=text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a,b:a+b).map(lambda (word,count):(count,word)).sortByKey(False)
.....................................
15/12/30 10:59:59 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool
15/12/30 10:59:59 INFO DAGScheduler: ResultStage 6 (sortByKey at <stdin>:1) finished in 0.127 s
15/12/30 10:59:59 INFO DAGScheduler: Job 4 finished: sortByKey at <stdin>:1, took 0.137471 s
>>> counts.saveAsTextFile("hdfs://ib-"+os.environ.get('HOSTNAME')+":54310/tmp/outputs")
.....................................
15/12/30 11:01:14 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool
15/12/30 11:01:14 INFO DAGScheduler: ResultStage 9 (saveAsTextFile at NativeMethodAccessorImpl.java:-2) finished in 1.021 s
15/12/30 11:01:14 INFO DAGScheduler: Job 5 finished: saveAsTextFile at NativeMethodAccessorImpl.java:-2, took 1.214476 s
>>> exit()

[NetID@cn43 ~]$ hdfs dfs -get /tmp/outputs
[NetID@cn43 ~]$ head outputs/part-00000
(13765, u'the')
(6587, u'of')
(5951, u'and')
(4533, u'a')
(4510, u'to')
(4347, u)
(3879, u'in')
(2693, u'that')
(2415, u'his')
(1724, u'I')
[NetID@cn43 ~]$ exit
[screen is terminating]
Connection to cn43 closed.
FISBATCH -- exiting job
# This command is optional. See the following explainations.
[NetID@cn65 ~]$ sjobs
       JobID  Partition        QOS    JobName      User              Submit      State    Elapsed   NNodes      NCPUS        NodeList               Start
------------ ---------- ---------- ---------- --------- ------------------- ---------- ---------- -------- ---------- --------------- -------------------
24187          Westmere   standard HADOOP.FI+     NetID 2015-09-10T21:54:33    RUNNING   00:32:32        4         48       cn[43-46] 2015-09-10T21:54:34
24187.0                                screen           2015-09-10T21:55:03  COMPLETED   00:32:01        1         12            cn43 2015-09-10T21:55:03
24194          Westmere   standard HADOOP.CL+  tix11001 2015-09-10T22:27:04    PENDING   00:00:00        2          0   None assigned             Unknown

You job may still runs for a while to turn off the Hadoop daemon and cleanup the tmp files. Or you may find another job called "HADOOP.CLEANUP" is running/pending in your job queue. Just let it run. The cleanup steps should only takes 20-60 seconds.

NOTE: please DO NOT FORGET to EXIT from the nodes so that the other users can use it.

Help to debug

This is a beta version of interactive Hive / Hadoop. So please contact us at mailto:hpchelp@engr.uconn.edu if you face any errors or find any bugs.

Start Spark with previous job

If you want to restart a new job using the HDFS creating in the previous job, you need to do the following steps. First, find out your hadoop hdfs folder. By default, it is located at: /scratch/${USER}_hadoop.$SLURM_JOB_ID

Submission Script

Firstly, you need to point out the old HDFS folder by issuing:

$ export HADOOP_OLD_DIR=/path/to/old/hadoop/folder # by default, it is /scratch/${USER}_hadoop.$SLURM_JOB_ID

Then, start your submission script:

$ sbatch slurm.sh

Interactive

Before issuing fisbatch.hadoop, you need export HADOOP_OLD_DIR.

$ export HADOOP_OLD_DIR=/path/to/old/hadoop/folder # by default, it is /scratch/${USER}_hadoop.$SLURM_JOB_ID
$ fisbatch.hadoop --exclusive --ntasks-per-node=1 -c24 -N 4
...