Submit Spark Applications using Hive from Edge Node and Client Node
This is the sixth and the final part of the Apache Hadoop ecosystem setup as explained in Apache Hadoop Multi-Node Kerberized Cluster Setup, where in the previous stories we had had gone through the following chapters:
Chapter 1. Users Creation and initial setup
Chapter 2. Kerberos Installation and configuration
Chapter 3. Unpacking Hadoop Distributions
Chapter 4. Configuring HDFS and YARN
Chapter 5. Configure Spark and Run Spark Applications
Chapter 6. Configuring Edge Node and Run Spark Applications
Chapter 7. Hive Configuration
In this story we shall submit Spark applications using Hive as the data source from the Edge node.
Chapter 8. Integrating Hive with Spark in Resource Manager and in Edge Node
Copy the hive-site.xml from the hive node to spark conf folder of all nodes.
[root@verona1 ~]# su — hadoop
[hadoop@verona1 ~]$ cd hive/conf/
[hadoop@verona1 conf]$ pwd
/home/hadoop/hive/conf
[hadoop@verona1 conf]$ scp hive-site.xml hadoop@florence1.wsdm.ami.com:/home/hadoop/spark/conf
[hadoop@verona1 conf]$ scp hive-site.xml hadoop@sicily1.wsdm.ami.com:/home/hadoop/spark/conf
[hadoop@verona1 conf]$ scp hive-site.xml hadoop@turin1.wsdm.ami.com:/home/hadoop/spark/conf
[hadoop@verona1 conf]$ scp hive-site.xml hadoop@tuscany1.wsdm.ami.com:/home/hadoop/spark/conf
Configure Spark classpath with Hive libraries
As such vanilla spark does not come with the hive libraries, so we need to download the spark with hadoop integrated distribution and from there, make use of some of the libraries to be set to the Spark classpath.
Download the spark-2.4.6-bin-hadoop2.7.tgz library
[root@sicily1 tmp]# pwd
/tmp
[root@sicily1 tmp]# wget http://apachemirror.wuchna.com/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
[root@sicily1 tmp]# chown hadoop:hadoop spark-2.4.6-bin-hadoop2.7.tgz
[root@sicily1 tmp]# su — hadoop
[hadoop@sicily1 tmp]$ tar xvzf spark-2.4.6-bin-hadoop2.7.tgz
[hadoop@sicily1 tmp]$ cd spark-2.4.6-bin-hadoop2.7/
[hadoop@sicily1 spark-2.4.6-bin-hadoop2.7]$ cp -R jars/ /home/hadoop/spark/jars.all
Copy the same library to other nodes — Florence, Turin, Tuscany, and repeat the above steps to move the libraries to /home/hadoop/sparkjars.all folder
[hadoop@sicily1 tmp]$ scp spark-2.4.6-bin-hadoop2.7.tgz hadoop@florence1.wsdm.ami.com:/tmp
[hadoop@sicily1 tmp]$ scp spark-2.4.6-bin-hadoop2.7.tgz hadoop@turin1.wsdm.ami.com:/tmp
[hadoop@sicily1 tmp]$ scp spark-2.4.6-bin-hadoop2.7.tgz hadoop@tuscany1.wsdm.ami.com:/tmp
Hive JDBC Driver
Copy the Hive JDBC driver to all nodes from verona
[hadoop@verona1 jdbc]$ pwd
/home/hadoop/hive/jdbc
[hadoop@verona1 jdbc]$ ls -alt
…
-rw-r — r — 1 hadoop hadoop 59767390 Apr 7 19:46 hive-jdbc-2.3.7-standalone.jar
[hadoop@verona1 jdbc]$ scp hive-jdbc-2.3.7-standalone.jar hadoop@florence1.wsdm.ami.com:/home/hadoop/spark/jars.all
[hadoop@verona1 jdbc]$ scp hive-jdbc-2.3.7-standalone.jar hadoop@sicily1.wsdm.ami.com:/home/hadoop/spark/jars.all
[hadoop@verona1 jdbc]$ scp hive-jdbc-2.3.7-standalone.jar hadoop@turin1.wsdm.ami.com:/home/hadoop/spark/jars.all
[hadoop@verona1 jdbc]$ scp hive-jdbc-2.3.7-standalone.jar hadoop@tuscany1.wsdm.ami.com:/home/hadoop/spark/jars.all
Spark Classpath to reflect Hive libraries
Change the spark-env.sh on all nodes by updating the SPARK_DIST_CLASSPATH to point to the hive libraries. After updating here are the env. variables in the spark-end.sh of sicily node.
export HADOOP_HOME=/home/hadoop/hadoop
export HADOOP_CONF_DIR=/home/hadoop/hadoop/etc/hadoop/
export SPARK_HOME=/home/hadoop/spark
export HIVE_HOME=/home/hadoop/hive
export SPARK_DIST_CLASSPATH=/home/hadoop/hadoop/etc/hadoop:/home/hadoop/hadoop/share/hadoop/common/lib/*:/home/hadoop/hadoop/share/hadoop/common/*:/home/hadoop/hadoop/share/hadoop/hdfs:/home/hadoop/hadoop/share/hadoop/hdfs/lib/*:/home/hadoop/hadoop/share/hadoop/hdfs/*:/home/hadoop/hadoop/share/hadoop/yarn:/home/hadoop/hadoop/share/hadoop/yarn/lib/*:/home/hadoop/hadoop/share/hadoop/yarn/*:/home/hadoop/hadoop/share/hadoop/mapreduce/lib/*:/home/hadoop/hadoop/share/hadoop/mapreduce/*:/home/hadoop/hadoop/contrib/capacity-scheduler/*.jar:/home/hadoop/spark/jars.all/apacheds-kerberos-codec-2.0.0-M15.jar:/home/hadoop/spark/jars.all/commons-logging-1.1.3.jar:/home/hadoop/spark/jars.all/guava-14.0.1.jar:/home/hadoop/spark/jars.all/hadoop-auth-2.7.3.jar:/home/hadoop/spark/jars.all/hadoop-common-2.7.3.jar:/home/hadoop/spark/jars.all/hive-beeline-1.2.1.spark2.jar:/home/hadoop/spark/jars.all/hive-cli-1.2.1.spark2.jar:/home/hadoop/spark/jars.all/hive-exec-1.2.1.spark2.jar:/home/hadoop/spark/jars.all/hive-metastore-1.2.1.spark2.jar:/home/hadoop/spark/jars.all/htrace-core-3.1.0-incubating.jar:/home/hadoop/spark/jars.all/httpclient-4.5.6.jar:/home/hadoop/spark/jars.all/httpcore-4.4.10.jar:/home/hadoop/spark/jars.all/libfb303–0.9.3.jar:/home/hadoop/spark/jars.all/libthrift-0.9.3.jar:/home/hadoop/spark/jars.all/spark-hive_2.11–2.4.6.jar:/home/hadoop/spark/jars.all/spark-hive-thriftserver_2.11–2.4.6.jar
export YARN_CONF_DIR=/home/hadoop/hadoop/etc/hadoop/
Update spark-defaults.conf with Hive details
Update the spark-defaults.conf with Hive related details
spark.master yarn
spark.yarn.security.tokens.hive.enabled true
spark.yarn.principal yarn/sicily1.wsdm.ami.com@HADOOPCLUSTER.LOCAL
spark.yarn.keytab /home/hadoop/hadoop/etc/hadoop/yarn.keytab
spark.executor.extraJavaOptions -Djavax.security.auth.useSubjectCredsOnly=false
Chapter 9. Running Spark application communicating with Kerberized Hive
Now, we have reached to the main purpose of this manual — ability to run a spark application communicating with Kerberized hive in a Kerberized environment.
Submit the spark job using spark-submit with python application in HDFS.
- This application explicitly specifies the hive.metastore.uris
[hadoop@sicily1 ~]$ hdfs dfs -cat /examples/spark.hive.testing.py
from __future__ import print_function
import sys
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import pandas as pd
if __name__ == “__main__”:
sparkSession = (SparkSession.builder.appName(‘Spark Hive Testing 1’).config(“hive.metastore.uris”, “thrift://verona1.wsdm.ami.com:9083”, conf=SparkConf()).enableHiveSupport().getOrCreate())
all_databases_df = sparkSession.sql(“show databases”)
all_databases_pd = all_databases_df.toPandas()
print(‘all databases’)
print(all_databases_pd)
print(‘number of airports’)
rows_count_df = sparkSession.sql(“select count(*) from testing_data.airports”)
rows_count_pd = rows_count_df.toPandas()
print(rows_count_pd)
sparkSession.stop()
[hadoop@sicily1 ~]$
Spark job and its output
[root@florence1 conf]# spark-submit — deploy-mode cluster hdfs://sicily1.wsdm.ami.com:9000/examples/spark.hive.testing.py
…
20/09/07 08:13:57 INFO yarn.Client: Application report for application_1599490510093_0004 (state: RUNNING)
20/09/07 08:13:58 INFO yarn.Client: Application report for application_1599490510093_0004 (state: FINISHED)
…
final status: SUCCEEDED
…
[root@florence1 conf]# ssh root@sicily1.wsdm.ami.com
[root@sicily1 container_1599490510093_0004_01_000001]# pwd
/home/hadoop/hadoop/logs/userlogs/application_1599490510093_0004/container_1599490510093_0004_01_000001
[root@sicily1 container_1599490510093_0004_01_000001]# cat stdout
all databases
databaseName
0 default
1 payload_data
2 testing_data
number of airports
count(1)
0 8108
- This application DOES NOT explicitly specifies the hive.metastore.uris.
It uses the hive details from the configured hive-site.xml file with Spark.
[root@florence1 conf]# hdfs dfs -cat /examples/spark.hive.kerberized.testing.pyimport os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
if __name__ == “__main__”:
sparkSession =SparkSession.builder.appName(‘Spark Hive Kerberized Testing’).enableHiveSupport().getOrCreate()
all_databases_df = sparkSession.sql(“show databases”)
all_databases_pd = all_databases_df.toPandas()
print(‘all databases’)
print(all_databases_pd)
print(‘number of airports’)
rows_count_df = sparkSession.sql(“select count(*) from testing_data.airports”)
rows_count_pd = rows_count_df.toPandas()
print(rows_count_pd)
sparkSession.stop()[root@florence1 conf]# spark-submit — deploy-mode cluster hdfs://sicily1.wsdm.ami.com:9000/examples/spark.hive.kerberized.testing.py
…
20/09/07 08:22:17 INFO yarn.Client: Application report for application_1599490510093_0005 (state: RUNNING)
20/09/07 08:22:18 INFO yarn.Client: Application report for application_1599490510093_0005 (state: FINISHED)[root@sicily1 container_1599490510093_0005_01_000001]# pwd
/home/hadoop/hadoop/logs/userlogs/application_1599490510093_0005/container_1599490510093_0005_01_000001
[root@sicily1 container_1599490510093_0005_01_000001]# cat stdout
all databases
databaseName
0 default
1 payload_data
2 testing_data
number of airports
count(1)
0 8108
- Submit the spark application against Livy
[root@florence1 conf]# curl -i — negotiate -u : -X POST -d ‘{“file”:”hdfs://sicily1.wsdm.ami.com:9000/examples/spark.hive.kerberized.testing.py”}’ -H “Content-Type: application/json” “http://florence1.wsdm.ami.com:8998/batches"
…{“id”:0,”name”:null,”owner”:”HTTP”,”proxyUser”:null,”state”:”starting”,”appId”:null,”appInfo”:{“driverLogUrl”:null,”sparkUiUrl”:null},”log”:[“stdout: “,”\nstderr: “,”\nYARN Diagnostics: “]}[root@florence1 conf]# curl — negotiate -u : -X GET “http://florence1.wsdm.ami.com:8998/batches/0/state"{“id”:0,”state”:”starting”}[root@florence1 ~]# curl — negotiate -u : -X GET “http://florence1.wsdm.ami.com:8998/batches/0/state"{“id”:0,”state”:”running”}[root@florence1 conf]# curl — negotiate -u : -X GET “http://florence1.wsdm.ami.com:8998/batches/0/state"
{“id”:0,”state”:”success”}[root@florence1 conf]# curl — negotiate -u : -X GET “http://florence1.wsdm.ami.com:8998/batches/0
{“id”:0,”name”:null,”owner”:”HTTP”,”proxyUser”:null,”state”:”success”,”appId”:”application_1599490510093_0006",”appInfo”:{“driverLogUrl”:null,”sparkUiUrl”:”http://sicily1.wsdm.ami.com:8088/proxy/application_1599490510093_0006/"},"log":["\t queue: default”,”\t start time: 1599492330712",”\t final status: UNDEFINED”,”\t tracking URL: http://sicily1.wsdm.ami.com:8088/proxy/application_1599490510093_0006/","\t user: yarn”,”20/09/07 08:25:31 INFO util.ShutdownHookManager: Shutdown hook called”,”20/09/07 08:25:31 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-e0939a94-f9f4–4ea7-bb0f-45081757c870",”20/09/07 08:25:31 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a3dfc4f9-eeeb-47e2-a669–6b9c42412d07",”\nstderr: “,”\nYARN Diagnostics: “]}
Spark job output
[root@sicily1 container_1599490510093_0006_01_000001]# pwd
/home/hadoop/hadoop/logs/userlogs/application_1599490510093_0006/container_1599490510093_0006_01_000001
[root@sicily1 container_1599490510093_0006_01_000001]# cat stdout
all databases
databaseName
0 default
1 payload_data
2 testing_data
number of airports
count(1)
0 8108
[root@sicily1 container_1599490510093_0006_01_000001]#
As a summary, till now we have seen how to how to configure the Edge node and submit jobs and gets its output from this Node.
Next in the concluding topic of this series, we shall configure a client system and submit cURL commands to Livy Batches API for submitting the Spark jobs.
Submit Spark Applications to Livy Batches API from Client System
To complete the setup we shall validate by configuring a client node with Kerberos client and use cURL command to submit the batch job to Apache Livy and fetch its output.
Chapter 10. Configuring a Client Node
Install Kerberos Client
Install Kerberos Client using the following command
[root@rimini1 ~]# yum install krb5-workstation krb5-libs
krb5.conf file and keytab file
Copy the krb5.conf file and the hdfs.keytab file from the master node to the client system
[root@sicily1 ~]# scp /etc/krb5.conf root@rimini1.fyre.ibm.com:/tmp
[root@sicily1 ~]# scp hdfs.keytab root@rimini1.fyre.ibm.com:/tmp
Authenticate and submit the jobs from the client system
- For demo purpose, destroy if there are any existing tickets
[root@rimini1 ~]# kdestroy
- Send a cURL with negotiate to WebHDFS, and make sure you get a HTTP 401 with Authentication required message, as we do not have any valid ticket in the client system
[root@rimini1 ~]# curl --negotiate -u : -L http://sicily1.fyre.ibm.com:50070/webhdfs/v1/examples/spark.hive.hdfs.py?op=OPEN
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1"/>
<title>Error 401 Authentication required</title>
</head>
<body><h2>HTTP ERROR 401</h2>
<p>Problem accessing /webhdfs/v1/examples/spark.hive.hdfs.py. Reason:
<pre> Authentication required</pre></p><hr /><i><small>Powered by Jetty://</small></i><br/>
<br/>
<br/>
</body>
</html>
[root@rimini1 ~]#
- Get a ticket
[root@rimini1 ~]# kinit -kt /tmp/hdfs.keytab hdfs/sicily1.fyre.ibm.com@HADOOPCLUSTER.LOCAL
[root@rimini1 ~]#
- Then, send the same cURL to WebHDFS, which should print the python file content from folder in WebHDFS.
curl --negotiate -u : -L http://sicily1.fyre.ibm.com:50070/webhdfs/v1/examples/spark.hive.hdfs.py?op=OPEN
[root@rimini1 ~]# curl --negotiate -u : -L http://sicily1.fyre.ibm.com:50070/webhdfs/v1/examples/spark.hive.hdfs.py?op=OPEN
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, StructField, IntegerType, FloatType, ArrayType
import uuid
...
- Submit a batches API cURL to Livy and get the Job Response
[root@rimini1 ~]# curl — negotiate -u : -X POST -d ‘{“file”:”hdfs://sicily1.fyre.ibm.com:9000/examples/spark.hive.hdfs.py”}’ -H “Content-Type: application/json” “http://florence1.fyre.ibm.com:8998/batches"
{“id”:0,”name”:null,”owner”:”hdfs”,”proxyUser”:null,”state”:”starting”,”appId”:null,”appInfo”:{“driverLogUrl”:null,”sparkUiUrl”:null},”log”:[“stdout: “,”\nstderr: “,”\nYARN Diagnostics: “]}[root@rimini1 ~]# curl --negotiate -u : -X GET "http://florence1.fyre.ibm.com:8998/batches/0/state"{"id":0,"state":"starting"}[root@rimini1 ~]# curl --negotiate -u : -X GET "http://florence1.fyre.ibm.com:8998/batches/0/state"{"id":0,"state":"running"}[root@rimini1 ~]# curl --negotiate -u : -X GET "http://florence1.fyre.ibm.com:8998/batches/0/state"{"id":0,"state":"success"}[root@rimini1 ~]# curl --negotiate -u : -L http://sicily1.fyre.ibm.com:50070/webhdfs/v1//examples/428419b3-0e72-46d9-8d9f-02179a06e4d7/part-00000-19c73b71-0191-4d49-9613-6f5b1dc982e1-c000.json?op=OPEN
{"rows_count":8108,"accuracy":0.805,"true_positive_rate":0.576,"recall":0.576,"precision":0.742268,"false_positive_rate":0.09090909,"area_under_roc":0.7425454,"area_under_pr":0.6511572,"f1_measure":0.6486486,"confusion_matrix":[[250.0,25.0],[53.0,72.0]]}
Well, that concludes this setup folks!
Hope this helps.