Spark

From Storrs HPC Wiki
Revision as of 11:03, 30 December 2015 by Tix11001 (talk | contribs) (Created page with "The Spark standalone cluster with hadoop. It is based on the module of Hadoop. = login HORNET cluster= Please read [http://www.becat.uconn.edu/wiki/index.php/HPC_G...")
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to: navigation, search

The Spark standalone cluster with hadoop. It is based on the module of Hadoop.

login HORNET cluster

Please read How to connect to HORNET cluster if you are not familiar with that.

Load modules

The following modules are needed by Spark:

zlib/1.2.8
openssl/1.0.1e
java/1.8.0_31
protobuf/2.5.0
myhadoop/Sep012015
spark/1.5.1

To Load the modules:

module load zlib/1.2.8 openssl/1.0.1e java/1.8.0_31 protobuf/2.5.0 myhadoop/Sep012015 spark/1.5.1

Start Spark Cluster

Here is an example SLURM batch script running word counting mapreduce job. It is based on the code written by Dr. Lockwood. We tailed it to adapt our cluster. This script uses persistent mode.

Usually, the data of the namenode and the datanodes are located at:

/scratch/scratch0/${USER}_hadoop.$SLURM_JOBID

It is better to clean up the above folder after using Spark if you don't need it in the future. Otherwise, you may receive several warning emails about that the scratch data will be automatically deleted.

Submission Script (under construction)

Then submit the SQL commands with Python file using the following script:

#!/bin/bash
################################################################################
#  slurm.sbatch - A sample submit script for SLURM that illustrates how to
#  spin up a Hadoop cluster for a map/reduce task using myHadoop
#
# Created:
#  Glenn K. Lockwood, San Diego Supercomputer Center             February 2014
# Revised:
#  Tingyang Xu, Administrator of the Hornet cluster, BECAT      September 2015
################################################################################
## -p Westmere indicates we are using Westmere partition.
#SBATCH -p Westmere
## -N 4 means we are going to use 4 nodes to run Hadoop cluster
#SBATCH -N 4
## -c 12 means we are going to use 12 cores on each node.
#SBATCH -c 12
## --ntasks-per-node=1 means each node runs single datanode/namenode.
## When you write your own SLURM batch, you DON'T need to change ntasks-per-node or exclusive.
#SBATCH --ntasks-per-node=1
#SBATCH --exclusive

# download example data to your folder for later on mapredue job.
# where to store the temp configure files.
# make sure that the configure files are GLOBALLY ACCESSIBLE
export HADOOP_CONF_DIR=$PWD/hadoop-conf.$SLURM_JOBID

#################NO CHANGE############################
export HIVE_CONF_DIR=$HADOOP_CONF_DIR/hive
myhadoop-configure.sh
# test if the HADOOP_CONF_DIR is globally accessible
if ! srun ls -d $HADOOP_CONF_DIR; then
  echo "The configure files are not globally accessible.
       Please consider the the shared, home, or scratch directory to put your HADOOP_CONF_DIR.
       For example, export HADOOP_CONF_DIR=/scratch/scratch0/$USER_hadoop-conf.$SLURM_JOBID"
  myhadoop-cleanup.sh
  rm -rf $HADOOP_CONF_DIR
  exit 1
fi
start-all.sh
# make the default dirs for Hive
hdfs dfs -mkdir -p /tmp
hdfs dfs -chmod g+w /tmp
hdfs dfs -mkdir -p /usr/hive/warehouse
hdfs dfs -chmod g+w /usr/hive/warehouse
#################NO CHANGE END########################

# download the data
wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
unzip ml-100k.zip

# run the SQL script
hive -f hive.sql

#################NO CHANGE############################
stop-all.sh
myhadoop-cleanup.sh
rm -rf $HADOOP_CONF_DIR
#################NO CHANGE END########################

Output

You can review the results in the Hive by using the following section "Interactive Hive / Hadoop".

Or add following line in the submission script after the command "hive -f hive.sql":

hive -e "SELECT * FROM u_data_new;" > Results.txt

!!! PLEASE DO NOT RUN THE Hive IN THE LOGIN NODE !!!

Interactive Hive / Hadoop (beta) use with SLURM (not recomended)

NOTE: *any* interruption to the network will cause your job to crash irrecoverably.

To run an interactive Hive / Hadoop session, you will want to do the following:

[NetID@cn65 ~]$ module load zlib/1.2.8 protobuf/2.5.0 openssl java/1.8.0_31 myhadoop/Sep012015 spark/1.5.1
# You can change number of nodes you want to run the Hadoop.
# Please always KEEP "--exclusive --ntasks-per-node=1". 
[NetID@cn65 ~]$ fisbatch.hadoop --exclusive --ntasks-per-node=1 -c12 -p Westmere  -N 4
FISBATCH -- the maximum time for the interactive screen is limited to 6 hours. You can add QoS to overwrite it.
FISBATCH -- waiting for JOBID 24187 to start on cluster=cluster and partition=Westmere
...!
FISBATCH -- booting the Hadoop nodes
*******************!
FISBATCH -- Connecting to head node (cn43)
[NetID@cn43 ~]$ hdfs dfs -put pg2701.txt /tmp/
[NetID@cn43 ~]$ module load python/2.7.6 # if you are going to use pyspark
[NetID@cn43 ~]$ pyspark
Python 2.6.6 (r266:84292, May 22 2015, 08:34:51)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-15)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.1
      /_/

Using Python version 2.6.6 (r266:84292, May 22 2015 08:34:51)
SparkContext available as sc, HiveContext available as sqlContext.
>>> # compute the pi
...
>>> import random
>>> def sample(p):
...     x,y=random.random(), random.random()
...     return 1 if x*x+y*y<1 else 0
...
>>> count=sc.parallelize(xrange(0,100000000)).map(sample).reduce(lambda a,b:a+b)
....................................
15/12/30 10:51:55 INFO DAGScheduler: ResultStage 0 (reduce at <stdin>:1) finished in 2.341 s
15/12/30 10:51:55 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/12/30 10:51:55 INFO DAGScheduler: Job 0 finished: reduce at <stdin>:1, took 2.489716 s
>>> print "Pi is roughly %f" % (4.0 * count / 100000000)
Pi is roughly 3.142013
>>> # word count example
...
>>> text_file=sc.textFile("hdfs://ib-"+os.environ.get('HOSTNAME')+":54310/tmp/pg2701.txt") # hostname: ib-$HOSTNAME. port: 54310
15/12/30 10:55:26 INFO MemoryStore: ensureFreeSpace(110576) called with curMem=22983, maxMem=2222739947
15/12/30 10:55:26 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 108.0 KB, free 2.1 GB)
15/12/30 10:55:26 INFO MemoryStore: ensureFreeSpace(10273) called with curMem=133559, maxMem=2222739947
15/12/30 10:55:26 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 10.0 KB, free 2.1 GB)
15/12/30 10:55:26 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.100.8:46081 (size: 10.0 KB, free: 2.1 GB)
15/12/30 10:55:26 INFO SparkContext: Created broadcast 3 from textFile at NativeMethodAccessorImpl.java:-2
>>>counts=text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a,b:a+b).map(lambda (word,count):(count,word)).sortByKey(False)
.....................................
15/12/30 10:59:59 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool
15/12/30 10:59:59 INFO DAGScheduler: ResultStage 6 (sortByKey at <stdin>:1) finished in 0.127 s
15/12/30 10:59:59 INFO DAGScheduler: Job 4 finished: sortByKey at <stdin>:1, took 0.137471 s
>>> counts.saveAsTextFile("hdfs://ib-"+os.environ.get('HOSTNAME')+":54310/tmp/outputs")
.....................................
15/12/30 11:01:14 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool
15/12/30 11:01:14 INFO DAGScheduler: ResultStage 9 (saveAsTextFile at NativeMethodAccessorImpl.java:-2) finished in 1.021 s
15/12/30 11:01:14 INFO DAGScheduler: Job 5 finished: saveAsTextFile at NativeMethodAccessorImpl.java:-2, took 1.214476 s
>>> exit()

[NetID@cn43 ~]$ hdfs dfs -get /tmp/outputs
[NetID@cn43 ~]$ head outputs/part-00000
(13765, u'the')
(6587, u'of')
(5951, u'and')
(4533, u'a')
(4510, u'to')
(4347, u)
(3879, u'in')
(2693, u'that')
(2415, u'his')
(1724, u'I')
[NetID@cn43 ~]$ exit
[screen is terminating]
Connection to cn43 closed.
FISBATCH -- exiting job
# This command is optional. See the following explainations.
[NetID@cn65 ~]$ sjobs
       JobID  Partition        QOS    JobName      User              Submit      State    Elapsed   NNodes      NCPUS        NodeList               Start
------------ ---------- ---------- ---------- --------- ------------------- ---------- ---------- -------- ---------- --------------- -------------------
24187          Westmere   standard HADOOP.FI+     NetID 2015-09-10T21:54:33    RUNNING   00:32:32        4         48       cn[43-46] 2015-09-10T21:54:34
24187.0                                screen           2015-09-10T21:55:03  COMPLETED   00:32:01        1         12            cn43 2015-09-10T21:55:03
24194          Westmere   standard HADOOP.CL+  tix11001 2015-09-10T22:27:04    PENDING   00:00:00        2          0   None assigned             Unknown

You job may still runs for a while to turn off the Hadoop daemon and cleanup the tmp files. Or you may find another job called "HADOOP.CLEANUP" is running/pending in your job queue. Just let it run. The cleanup steps should only takes 20-60 seconds.

NOTE: please DO NOT FORGET to EXIT from the nodes so that the other users can use it.

Help to debug

This is a beta version of interactive Hive / Hadoop. So please contact us at mailto:hpchelp@engr.uconn.edu if you face any errors or find any bugs.

Start Hive with previous job

If you want to restart a new job using the HDFS creating in the previous job, you need to do the following steps. First, find out your hadoop hdfs folder. By default, it is located at:

/scratch/scratch2/${USER}_hadoop.$SLURM_JOB_ID

Then, according to different purposes:

script (under construction)

You need to revise the bold line.

#!/bin/bash
################################################################################
#  slurm.sbatch - A sample submit script for SLURM that illustrates how to
#  spin up a Hadoop cluster for a map/reduce task using myHadoop
#
# Created:
#  Glenn K. Lockwood, San Diego Supercomputer Center             February 2014
# Revised:
#  Tingyang Xu, Administrator of the Hornet cluster, BECAT      September 2015
################################################################################
## -p Westmere indicates we are using Westmere partition.
#SBATCH -p Westmere
## -N 4 means we are going to use 4 nodes to run Hadoop cluster
#SBATCH -N 4
## -c 12 means we are going to use 12 cores on each node.
#SBATCH -c 12
## --ntasks-per-node=1 means each node runs single datanode/namenode.
## When you write your own SLURM batch, you DON'T need to change ntasks-per-node or exclusive.
#SBATCH --ntasks-per-node=1
#SBATCH --exclusive

# download example data to your folder for later on mapredue job.
# where to store the temp configure files.
# make sure that the configure files are GLOBALLY ACCESSIBLE
export HADOOP_CONF_DIR=$PWD/hadoop-conf.$SLURM_JOBID 

#################NO CHANGE############################
export HIVE_CONF_DIR=$HADOOP_CONF_DIR/hive
#################NO CHANGE END######################## 

###############CHANGE THIS LINE#######################
myhadoop-configure.sh -p /path/to/old/hadoop/data/folder # by default, it is /scratch/scratch2/${USER}_hadoop.$SLURM_JOB_ID
###############CHANGE THIS LINE#######################

#################NO CHANGE############################
# test if the HADOOP_CONF_DIR is globally accessible
if ! srun ls -d $HADOOP_CONF_DIR; then
  echo "The configure files are not globally accessible.
       Please consider the the shared, home, or scratch directory to put your HADOOP_CONF_DIR.
       For example, export HADOOP_CONF_DIR=/scratch/scratch0/$USER_hadoop-conf.$SLURM_JOBID"
  myhadoop-cleanup.sh
  rm -rf $HADOOP_CONF_DIR
  exit 1
fi
start-all.sh
#################NO CHANGE END########################

# run the SQL script
hive -e “SELECT weekday, COUNT(*) FROM u_data_new GROUP BY weekday;”

#################NO CHANGE############################
stop-all.sh
myhadoop-cleanup.sh
rm -rf $HADOOP_CONF_DIR
#################NO CHANGE END########################

Interactive

Before issuing fisbatch.hadoop, you need export HADOOP_OLD_DIR.

$ export HADOOP_OLD_DIR=/path/to/old/hadoop/folder # by default, it is /scratch/scratch2/${USER}_hadoop.$SLURM_JOB_ID
$ fisbatch.hadoop --exclusive --ntasks-per-node=1 -c12 -p Westmere  -N 4、
...