Difference between revisions of "Hive"
(→logging in to the HPC Cluster) |
(→Submission Script) |
||
Line 88: | Line 88: | ||
# Glenn K. Lockwood, San Diego Supercomputer Center February 2014 | # Glenn K. Lockwood, San Diego Supercomputer Center February 2014 | ||
# Revised: | # Revised: | ||
− | # Tingyang Xu | + | # Tingyang Xu September 2015 |
################################################################################ | ################################################################################ | ||
## -p Westmere indicates we are using Westmere partition. | ## -p Westmere indicates we are using Westmere partition. |
Latest revision as of 16:31, 16 March 2017
The Apache Hive is the CLI of the Hadoop. It is in the same module of Hadoop.
Contents
Load modules
The following modules are needed by myHadoop:
zlib/1.2.8 openssl/1.0.1e java/1.8.0_31 protobuf/2.5.0 myhadoop/Sep012015
To Load the modules:
module load zlib/1.2.8 openssl/1.0.1e java/1.8.0_31 protobuf/2.5.0 myhadoop/Sep012015
Start Hadoop Cluster
Here is an example SLURM batch script running word counting mapreduce job. It is based on the code written by Dr. Lockwood. We tailored it to our cluster. This script uses persistent mode.
Usually, the data of the namenode and the datanodes are located at:
/scratch/${USER}_hadoop.$SLURM_JOBID
It is better to clean up the above folder after using Hadoop if you don't need it in the future. Otherwise, you may receive several warning emails about that the scratch data will be automatically deleted.
There are some work
SQL command file
NOTE: the Hive script is using "--" to comment out the lines.
-- set the configuration parameters SET hive.exec.reducers.max=${env:SLURM_NNODES}; SET mapreduce.job.reduces=${env:SLURM_NNODES}; -- Create the table CREATE TABLE u_data ( userid INT, movieid INT, rating INT, unixtime STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE; -- Load the data LOAD DATA LOCAL INPATH 'ml-100k/u.data' OVERWRITE INTO TABLE u_data; -- Count the rows SELECT COUNT(*) FROM u_data; -- Analysis the data with weekday_mapper.py CREATE TABLE u_data_new ( userid INT, movieid INT, rating INT, weekday INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'; add FILE weekday_mapper.py; INSERT OVERWRITE TABLE u_data_new SELECT TRANSFORM (userid, movieid, rating, unixtime) USING 'python weekday_mapper.py' AS (userid, movieid, rating, weekday) FROM u_data; SELECT weekday, COUNT(*) FROM u_data_new GROUP BY weekday;
Python code for Analysis
The weekday_mapper.py can produce more complex analysis of the data:
import sys import datetime for line in sys.stdin: line = line.strip() userid, movieid, rating, unixtime = line.split('\t') weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday() print '\t'.join([userid, movieid, rating, str(weekday)])
Submission Script
Then submit the SQL commands with Python file using the following script:
#!/bin/bash ################################################################################ # slurm.sbatch - A sample submit script for SLURM that illustrates how to # spin up a Hadoop cluster for a map/reduce task using myHadoop # # Created: # Glenn K. Lockwood, San Diego Supercomputer Center February 2014 # Revised: # Tingyang Xu September 2015 ################################################################################ ## -p Westmere indicates we are using Westmere partition. #SBATCH -p Westmere ## -N 4 means we are going to use 4 nodes to run Hadoop cluster #SBATCH -N 4 ## -c 12 means we are going to use 12 cores on each node. #SBATCH -c 12 ## --ntasks-per-node=1 means each node runs single datanode/namenode. ## When you write your own SLURM batch, you DON'T need to change ntasks-per-node or exclusive. #SBATCH --ntasks-per-node=1 #SBATCH --exclusive # download example data to your folder for later on mapredue job. # where to store the temp configure files. # make sure that the configure files are GLOBALLY ACCESSIBLE export HADOOP_CONF_DIR=$PWD/hadoop-conf.$SLURM_JOBID #################NO CHANGE############################ export HIVE_CONF_DIR=$HADOOP_CONF_DIR/hive if [ "z$HADOOP_OLD_DIR" == "z" ]; then myhadoop-configure.sh else myhadoop-configure.sh -p $HADOOP_OLD_DIR fi # test if the HADOOP_CONF_DIR is globally accessible if ! srun ls -d $HADOOP_CONF_DIR; then echo "The configure files are not globally accessible. Please consider the the shared, home, or scratch directory to put your HADOOP_CONF_DIR. For example, export HADOOP_CONF_DIR=/scratch/$USER_hadoop-conf.$SLURM_JOBID" myhadoop-cleanup.sh rm -rf $HADOOP_CONF_DIR exit 1 fi start-all.sh # make the default dirs for Hive hdfs dfs -mkdir -p /tmp hdfs dfs -chmod g+w /tmp hdfs dfs -mkdir -p /usr/hive/warehouse hdfs dfs -chmod g+w /usr/hive/warehouse #################NO CHANGE END######################## # download the data wget http://files.grouplens.org/datasets/movielens/ml-100k.zip unzip ml-100k.zip # run the SQL script hive -f hive.sql #################NO CHANGE############################ stop-all.sh myhadoop-cleanup.sh rm -rf $HADOOP_CONF_DIR #################NO CHANGE END########################
Output
You can review the results in the Hive by using the following section "Interactive Hive / Hadoop".
Or add following line in the submission script after the command "hive -f hive.sql":
hive -e "SELECT * FROM u_data_new;" > Results.txt
!!! PLEASE DO NOT RUN THE Hive IN THE LOGIN NODE !!!
Interactive Hive / Hadoop (beta) use with SLURM (not recomended)
NOTE: *any* interruption to the network will cause your job to crash irrecoverably.
Command Line Interface
To run an interactive Hive / Hadoop session, you will want to do the following:
[NetID@cn65 ~]$ module load zlib/1.2.8 protobuf/2.5.0 openssl java/1.8.0_31 myhadoop/Sep012015 # You can change number of nodes you want to run the Hadoop. # Please always KEEP "--exclusive --ntasks-per-node=1". [NetID@cn65 ~]$ fisbatch.hadoop --exclusive --ntasks-per-node=1 -c12 -p Westmere -N 4 FISBATCH -- the maximum time for the interactive screen is limited to 6 hours. You can add QoS to overwrite it. FISBATCH -- waiting for JOBID 24187 to start on cluster=cluster and partition=Westmere ...! FISBATCH -- booting the Hadoop nodes *******************! FISBATCH -- Connecting to head node (cn43) [NetID@cn43 ~]$ hive 15/10/03 14:03:08 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize 15/10/03 14:03:08 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative 15/10/03 14:03:08 INFO Configuration.deprecation: mapred.committer.job.setup.cleanup.needed is deprecated. Instead, use mapreduce.job.committer.setup.cleanup.needed 15/10/03 14:03:08 INFO Configuration.deprecation: mapred.min.split.size.per.rack is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.rack 15/10/03 14:03:08 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize 15/10/03 14:03:08 INFO Configuration.deprecation: mapred.min.split.size.per.node is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.node 15/10/03 14:03:08 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces 15/10/03 14:03:08 INFO Configuration.deprecation: mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive Logging initialized using configuration in file:/gpfs/gpfs1/apps2/myHadoop/Sep012015/scratch/tix11001_hadoop_conf.28527/hive/hive-log4j.properties hive> CREATE TABLE pokes (foo INT, bar STRING); OK Time taken: 1.757 seconds hive> exit; [NetID@cn43 ~]$ exit [screen is terminating] Connection to cn43 closed. FISBATCH -- exiting job # This command is optional. See the following explainations. [NetID@cn65 ~]$ sjobs JobID Partition QOS JobName User Submit State Elapsed NNodes NCPUS NodeList Start ------------ ---------- ---------- ---------- --------- ------------------- ---------- ---------- -------- ---------- --------------- ------------------- 24187 Westmere standard HADOOP.FI+ NetID 2015-09-10T21:54:33 RUNNING 00:32:32 4 48 cn[43-46] 2015-09-10T21:54:34 24187.0 screen 2015-09-10T21:55:03 COMPLETED 00:32:01 1 12 cn43 2015-09-10T21:55:03 24194 Westmere standard HADOOP.CL+ tix11001 2015-09-10T22:27:04 PENDING 00:00:00 2 0 None assigned Unknown
You job may still runs for a while to turn off the Hadoop daemon and cleanup the tmp files. Or you may find another job called "HADOOP.CLEANUP" is running/pending in your job queue. Just let it run. The cleanup steps should only takes 20-60 seconds.
NOTE: please DO NOT FORGET to EXIT from the nodes so that the other users can use it.
Run with Spark
You can run spark with hive:
$ spark-sql spark-sql>
The syntax is the same as the hive.
Help to debug
This is a beta version of interactive Hive / Hadoop. So please contact us at mailto:hpchelp@engr.uconn.edu if you face any errors or find any bugs.
Start Hive with previous job
If you want to restart a new job using the HDFS creating in the previous job, you need to do the following steps. First, find out your hadoop hdfs folder. By default, it is located at:
/scratch/${USER}_hadoop.$SLURM_JOB_ID
Submission Script
Firstly, you need to point out the old HDFS folder by issuing:
$ export HADOOP_OLD_DIR=/path/to/old/hadoop/folder # by default, it is /scratch/${USER}_hadoop.$SLURM_JOB_ID
Then, start your submission script:
$ sbatch slurm.sh
Interactive
Before issuing fisbatch.hadoop
, you need export HADOOP_OLD_DIR
.
$ export HADOOP_OLD_DIR=/path/to/old/hadoop/folder # by default, it is /scratch/${USER}_hadoop.$SLURM_JOB_ID $ fisbatch.hadoop --exclusive --ntasks-per-node=1 -c12 -p Westmere -N 4、 ...
Known Issue
When the namenode is located at the different node from the old HDFS session. Hive will complain about:
hive> show tables; OK u_data u_data_new Time taken: 1.038 seconds, Fetched: 2 row(s) hive> select count(*) from u_data; ........... Job Submission failed with exception 'java.net.ConnectException(Call From cn44/192.168.100.44 to ib-cn48:54310 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused)' FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
That is because in the Hive records, the table should be still in the old HDFS session as shown in LOCATION:
hive> describe formatted u_data; OK # col_name data_type comment userid int movieid int rating int unixtime string # Detailed Table Information Database: default Owner: tix11001 CreateTime: Wed Dec 30 17:01:08 EST 2015 LastAccessTime: UNKNOWN Protect Mode: None Retention: 0 Location: hdfs://ib-cn48:54310/user/hive/warehouse/u_data Table Type: MANAGED_TABLE Table Parameters: COLUMN_STATS_ACCURATE false last_modified_by tix11001 last_modified_time 1451524844 numFiles 1 numRows -1 rawDataSize -1 totalSize 1979173 transient_lastDdlTime 1451524844 # Storage Information SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Compressed: No Num Buckets: -1 Bucket Columns: [] Sort Columns: [] Storage Desc Params: field.delim \t serialization.format \t Time taken: 0.062 seconds, Fetched: 37 row(s)
Solution 1
So the LOCATION of the table should be manually changed. For example, assume that the new namenode is cn44:
$ # hive --service metatool -updateLocation <newfsDefaultFSValue> <old_fsDefaultFSValue> $ hive --service metatool -updateLocation hdfs://ib-cn44:54310 hdfs://ib-cn48:54310
Solution 2
If the nodes that old HDFS session was on are idle, you can try to start the new session in the same nodelist by adding the following sbatch
argument:
-w cn[xx-xx] # the old nodelist
Or in the submission script, add:
#SBATCH -w cn[xx-xx] # the old nodelist