Difference between revisions of "Hive"

From Storrs HPC Wiki
Jump to: navigation, search
(Start Hive with previous job)
(logging in to the HPC Cluster)
Line 1: Line 1:
 
The Apache Hive is the CLI of the Hadoop. It is in the same module of [[Hadoop|Hadoop]].
 
The Apache Hive is the CLI of the Hadoop. It is in the same module of [[Hadoop|Hadoop]].
 
= logging in to the HPC Cluster=
 
Please read [http://www.becat.uconn.edu/wiki/index.php/HPC_Getting_Started How to connect to the HPC Cluster] if you are not familiar with that.
 
  
 
= Load modules =
 
= Load modules =

Revision as of 16:30, 16 March 2017

The Apache Hive is the CLI of the Hadoop. It is in the same module of Hadoop.

Load modules

The following modules are needed by myHadoop:

zlib/1.2.8
openssl/1.0.1e
java/1.8.0_31
protobuf/2.5.0
myhadoop/Sep012015

To Load the modules:

module load zlib/1.2.8 openssl/1.0.1e java/1.8.0_31 protobuf/2.5.0 myhadoop/Sep012015

Start Hadoop Cluster

Here is an example SLURM batch script running word counting mapreduce job. It is based on the code written by Dr. Lockwood. We tailored it to our cluster. This script uses persistent mode.

Usually, the data of the namenode and the datanodes are located at:

/scratch/${USER}_hadoop.$SLURM_JOBID

It is better to clean up the above folder after using Hadoop if you don't need it in the future. Otherwise, you may receive several warning emails about that the scratch data will be automatically deleted.

There are some work

SQL command file

NOTE: the Hive script is using "--" to comment out the lines.

-- set the configuration parameters
SET hive.exec.reducers.max=${env:SLURM_NNODES};
SET mapreduce.job.reduces=${env:SLURM_NNODES};

-- Create the table
CREATE TABLE u_data (
  userid INT,
  movieid INT,
  rating INT,
  unixtime STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;

-- Load the data
LOAD DATA LOCAL INPATH 'ml-100k/u.data'
OVERWRITE INTO TABLE u_data;

-- Count the rows
SELECT COUNT(*) FROM u_data;

-- Analysis the data with weekday_mapper.py
CREATE TABLE u_data_new (
  userid INT,
  movieid INT,
  rating INT,
  weekday INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';

add FILE weekday_mapper.py;

INSERT OVERWRITE TABLE u_data_new
SELECT
  TRANSFORM (userid, movieid, rating, unixtime)
  USING 'python weekday_mapper.py'
  AS (userid, movieid, rating, weekday)
FROM u_data;

SELECT weekday, COUNT(*)
FROM u_data_new
GROUP BY weekday;

Python code for Analysis

The weekday_mapper.py can produce more complex analysis of the data:

import sys
import datetime

for line in sys.stdin:
  line = line.strip()
  userid, movieid, rating, unixtime = line.split('\t')
  weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
  print '\t'.join([userid, movieid, rating, str(weekday)])

Submission Script

Then submit the SQL commands with Python file using the following script:

#!/bin/bash
################################################################################
#  slurm.sbatch - A sample submit script for SLURM that illustrates how to
#  spin up a Hadoop cluster for a map/reduce task using myHadoop
#
# Created:
#  Glenn K. Lockwood, San Diego Supercomputer Center             February 2014
# Revised:
#  Tingyang Xu, Administrator of the HORNET cluster, BECAT      September 2015
################################################################################
## -p Westmere indicates we are using Westmere partition.
#SBATCH -p Westmere
## -N 4 means we are going to use 4 nodes to run Hadoop cluster
#SBATCH -N 4
## -c 12 means we are going to use 12 cores on each node.
#SBATCH -c 12
## --ntasks-per-node=1 means each node runs single datanode/namenode.
## When you write your own SLURM batch, you DON'T need to change ntasks-per-node or exclusive.
#SBATCH --ntasks-per-node=1
#SBATCH --exclusive

# download example data to your folder for later on mapredue job.
# where to store the temp configure files.
# make sure that the configure files are GLOBALLY ACCESSIBLE
export HADOOP_CONF_DIR=$PWD/hadoop-conf.$SLURM_JOBID

#################NO CHANGE############################
export HIVE_CONF_DIR=$HADOOP_CONF_DIR/hive

if [ "z$HADOOP_OLD_DIR" == "z" ]; then
  myhadoop-configure.sh
else
  myhadoop-configure.sh -p $HADOOP_OLD_DIR
fi

# test if the HADOOP_CONF_DIR is globally accessible
if ! srun ls -d $HADOOP_CONF_DIR; then
  echo "The configure files are not globally accessible.
       Please consider the the shared, home, or scratch directory to put your HADOOP_CONF_DIR.
       For example, export HADOOP_CONF_DIR=/scratch/$USER_hadoop-conf.$SLURM_JOBID"
  myhadoop-cleanup.sh
  rm -rf $HADOOP_CONF_DIR
  exit 1
fi
start-all.sh
# make the default dirs for Hive
hdfs dfs -mkdir -p /tmp
hdfs dfs -chmod g+w /tmp
hdfs dfs -mkdir -p /usr/hive/warehouse
hdfs dfs -chmod g+w /usr/hive/warehouse
#################NO CHANGE END########################

# download the data
wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
unzip ml-100k.zip

# run the SQL script
hive -f hive.sql

#################NO CHANGE############################
stop-all.sh
myhadoop-cleanup.sh
rm -rf $HADOOP_CONF_DIR
#################NO CHANGE END########################

Output

You can review the results in the Hive by using the following section "Interactive Hive / Hadoop".

Or add following line in the submission script after the command "hive -f hive.sql":

hive -e "SELECT * FROM u_data_new;" > Results.txt

!!! PLEASE DO NOT RUN THE Hive IN THE LOGIN NODE !!!

Interactive Hive / Hadoop (beta) use with SLURM (not recomended)

NOTE: *any* interruption to the network will cause your job to crash irrecoverably.

Command Line Interface

To run an interactive Hive / Hadoop session, you will want to do the following:

[NetID@cn65 ~]$ module load zlib/1.2.8 protobuf/2.5.0 openssl java/1.8.0_31 myhadoop/Sep012015
# You can change number of nodes you want to run the Hadoop.
# Please always KEEP "--exclusive --ntasks-per-node=1". 
[NetID@cn65 ~]$ fisbatch.hadoop --exclusive --ntasks-per-node=1 -c12 -p Westmere  -N 4
FISBATCH -- the maximum time for the interactive screen is limited to 6 hours. You can add QoS to overwrite it.
FISBATCH -- waiting for JOBID 24187 to start on cluster=cluster and partition=Westmere
...!
FISBATCH -- booting the Hadoop nodes
*******************!
FISBATCH -- Connecting to head node (cn43)
[NetID@cn43 ~]$ hive
15/10/03 14:03:08 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
15/10/03 14:03:08 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative
15/10/03 14:03:08 INFO Configuration.deprecation: mapred.committer.job.setup.cleanup.needed is deprecated. Instead, use mapreduce.job.committer.setup.cleanup.needed
15/10/03 14:03:08 INFO Configuration.deprecation: mapred.min.split.size.per.rack is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.rack
15/10/03 14:03:08 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
15/10/03 14:03:08 INFO Configuration.deprecation: mapred.min.split.size.per.node is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.node
15/10/03 14:03:08 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
15/10/03 14:03:08 INFO Configuration.deprecation: mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive

Logging initialized using configuration in file:/gpfs/gpfs1/apps2/myHadoop/Sep012015/scratch/tix11001_hadoop_conf.28527/hive/hive-log4j.properties
hive> CREATE TABLE pokes (foo INT, bar STRING);
OK
Time taken: 1.757 seconds
hive> exit;
[NetID@cn43 ~]$ exit
[screen is terminating]
Connection to cn43 closed.
FISBATCH -- exiting job
# This command is optional. See the following explainations.
[NetID@cn65 ~]$ sjobs
       JobID  Partition        QOS    JobName      User              Submit      State    Elapsed   NNodes      NCPUS        NodeList               Start
------------ ---------- ---------- ---------- --------- ------------------- ---------- ---------- -------- ---------- --------------- -------------------
24187          Westmere   standard HADOOP.FI+     NetID 2015-09-10T21:54:33    RUNNING   00:32:32        4         48       cn[43-46] 2015-09-10T21:54:34
24187.0                                screen           2015-09-10T21:55:03  COMPLETED   00:32:01        1         12            cn43 2015-09-10T21:55:03
24194          Westmere   standard HADOOP.CL+  tix11001 2015-09-10T22:27:04    PENDING   00:00:00        2          0   None assigned             Unknown

You job may still runs for a while to turn off the Hadoop daemon and cleanup the tmp files. Or you may find another job called "HADOOP.CLEANUP" is running/pending in your job queue. Just let it run. The cleanup steps should only takes 20-60 seconds.

NOTE: please DO NOT FORGET to EXIT from the nodes so that the other users can use it.

Run with Spark

You can run spark with hive:

$ spark-sql
spark-sql>

The syntax is the same as the hive.

Help to debug

This is a beta version of interactive Hive / Hadoop. So please contact us at mailto:hpchelp@engr.uconn.edu if you face any errors or find any bugs.

Start Hive with previous job

If you want to restart a new job using the HDFS creating in the previous job, you need to do the following steps. First, find out your hadoop hdfs folder. By default, it is located at:

/scratch/${USER}_hadoop.$SLURM_JOB_ID

Submission Script

Firstly, you need to point out the old HDFS folder by issuing:

$ export HADOOP_OLD_DIR=/path/to/old/hadoop/folder # by default, it is /scratch/${USER}_hadoop.$SLURM_JOB_ID

Then, start your submission script:

$ sbatch slurm.sh

Interactive

Before issuing fisbatch.hadoop, you need export HADOOP_OLD_DIR.

$ export HADOOP_OLD_DIR=/path/to/old/hadoop/folder # by default, it is /scratch/${USER}_hadoop.$SLURM_JOB_ID
$ fisbatch.hadoop --exclusive --ntasks-per-node=1 -c12 -p Westmere  -N 4、
...

Known Issue

When the namenode is located at the different node from the old HDFS session. Hive will complain about:

hive> show tables;
OK
u_data
u_data_new
Time taken: 1.038 seconds, Fetched: 2 row(s)
hive> select count(*) from u_data;
...........
Job Submission failed with exception 'java.net.ConnectException(Call From cn44/192.168.100.44 to ib-cn48:54310 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused)'
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask

That is because in the Hive records, the table should be still in the old HDFS session as shown in LOCATION:

hive> describe formatted  u_data;
OK
# col_name              data_type               comment

userid                  int
movieid                 int
rating                  int
unixtime                string 

# Detailed Table Information
Database:               default
Owner:                  tix11001
CreateTime:             Wed Dec 30 17:01:08 EST 2015
LastAccessTime:         UNKNOWN
Protect Mode:           None
Retention:              0
Location:               hdfs://ib-cn48:54310/user/hive/warehouse/u_data
Table Type:             MANAGED_TABLE
Table Parameters:
        COLUMN_STATS_ACCURATE   false
        last_modified_by        tix11001
        last_modified_time      1451524844
        numFiles                1
        numRows                 -1
        rawDataSize             -1
        totalSize               1979173
        transient_lastDdlTime   1451524844

# Storage Information
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
        field.delim             \t
        serialization.format    \t
Time taken: 0.062 seconds, Fetched: 37 row(s)

Solution 1

So the LOCATION of the table should be manually changed. For example, assume that the new namenode is cn44:

$ # hive --service metatool -updateLocation <newfsDefaultFSValue> <old_fsDefaultFSValue>
$ hive --service metatool -updateLocation hdfs://ib-cn44:54310 hdfs://ib-cn48:54310

Solution 2

If the nodes that old HDFS session was on are idle, you can try to start the new session in the same nodelist by adding the following sbatch argument:

-w cn[xx-xx] # the old nodelist

Or in the submission script, add:

#SBATCH -w cn[xx-xx] # the old nodelist