Configuring Spark and Running Spark Applications

Ravi Chamarthy
6 min readSep 17, 2020

This is a fourth part of the Apache Hadoop ecosystem setup as explained in Apache Hadoop Multi-Node Kerberized Cluster Setup, where in the previous stories we had had gone through the overall deployment architecture followed by setup of initial system with Kerberos, and then setup of multi-node Hadoop with HDFS and YARN. In this story, we will go through the steps to setup Spark and run applications.

Chapter 5. Configure Spark and Run Spark Applications

1. spark-defaults.conf

Create /home/Hadoop/spark/conf/spark-defaults.conf to specify the Spark communication with YARN over kerberos principal and the keytab file. Specify the same principal in all nodes.

[hadoop@turin1 logs]$ cd ../../spark/conf/
[hadoop@turin1 conf]$ pwd
/home/hadoop/spark/conf
[hadoop@turin1 conf]$ mv spark-defaults.conf.template spark-defaults.conf
[hadoop@turin1 conf]$ vi spark-defaults.conf
[hadoop@turin1 conf]$ cat spark-defaults.conf
spark.master yarn
spark.yarn.security.tokens.hive.enabled true
spark.yarn.principal yarn/sicily1.wsdm.ami.com@HADOOPCLUSTER.LOCAL
spark.yarn.keytab /home/hadoop/hadoop/etc/hadoop/yarn.keytab

2. spark-env.sh

Create the /home/Hadoop/spark/conf/spark-env.sh file and specify the environment variables for Spark pointing to HADOOP HOME, SPARK HOME, and other configuration directories.

export HADOOP_HOME=/home/hadoop/hadoop
export HADOOP_CONF_DIR=/home/hadoop/hadoop/etc/hadoop/
export SPARK_HOME=/home/hadoop/spark
export SPARK_DIST_CLASSPATH=/home/hadoop/hadoop/etc/hadoop:/home/hadoop/hadoop/share/hadoop/common/lib/*:/home/hadoop/hadoop/share/hadoop/common/*:/home/hadoop/hadoop/share/hadoop/hdfs:/home/hadoop/hadoop/share/hadoop/hdfs/lib/*:/home/hadoop/hadoop/share/hadoop/hdfs/*:/home/hadoop/hadoop/share/hadoop/yarn:/home/hadoop/hadoop/share/hadoop/yarn/lib/*:/home/hadoop/hadoop/share/hadoop/yarn/*:/home/hadoop/hadoop/share/hadoop/mapreduce/lib/*:/home/hadoop/hadoop/share/hadoop/mapreduce/*:/home/hadoop/hadoop/contrib/capacity-scheduler/*.jar
export YARN_CONF_DIR=/home/hadoop/hadoop/etc/hadoop/

3. Run sample Java based Spark application

[hadoop@sicily1 conf]$ spark-submit — deploy-mode cluster — class org.apache.spark.examples.SparkPi /home/hadoop/spark/examples/jars/spark-examples_2.11–2.4.6.jar 10
20/09/05 22:07:59 INFO yarn.Client: Kerberos credentials: principal = yarn/sicily1.wsdm.ami.com@HADOOPCLUSTER.LOCAL, keytab = /home/hadoop/hadoop/etc/hadoop/yarn.keytab

20/09/05 22:08:01 INFO yarn.Client: Uploading resource file:/home/hadoop/hadoop/etc/hadoop/yarn.keytab -> hdfs://sicily1.wsdm.ami.com:9000/user/yarn/.sparkStaging/application_1599364548601_0001/yarn.keytab

20/09/05 22:08:05 INFO yarn.Client: Uploading resource file:/tmp/spark-4478492a-cb58–40a6-b263–25ef0e691241/__spark_libs__2673407158396606380.zip -> hdfs://sicily1.wsdm.ami.com:9000/user/yarn/.sparkStaging/application_1599364548601_0001/__spark_libs__2673407158396606380.zip
20/09/05 22:08:06 INFO yarn.Client: Uploading resource file:/home/hadoop/spark/examples/jars/spark-examples_2.11–2.4.6.jar -> hdfs://sicily1.wsdm.ami.com:9000/user/yarn/.sparkStaging/application_1599364548601_0001/spark-examples_2.11–2.4.6.jar
20/09/05 22:08:07 INFO yarn.Client: Uploading resource file:/tmp/spark-4478492a-cb58–40a6-b263–25ef0e691241/__spark_conf__4573311811447328838.zip -> hdfs://sicily1.wsdm.ami.com:9000/user/yarn/.sparkStaging/application_1599364548601_0001/__spark_conf__.zip

20/09/05 22:08:32 INFO yarn.Client: Application report for application_1599364548601_0001 (state: RUNNING)
20/09/05 22:08:33 INFO yarn.Client: Application report for application_1599364548601_0001 (state: FINISHED)

final status: SUCCEEDED
tracking URL: http://sicily1.wsdm.ami.com:8088/proxy/application_1599364548601_0001/

Confirmation

[root@sicily1 container_1599364548601_0001_01_000001]# pwd
/home/hadoop/hadoop/logs/userlogs/application_1599364548601_0001/container_1599364548601_0001_01_000001
[root@sicily1 container_1599364548601_0001_01_000001]# cat stdout
Pi is roughly 3.143231143231143

4. Run sample Python based Spark application

This time we shall run as a root user. Should not matter, which user, as long as the user has a ticket to communicate we should be good.

[root@sicily1 ~]# spark-submit — deploy-mode cluster /home/hadoop/spark/examples/src/main/python/pi.py 10
20/09/05 22:38:15 INFO yarn.Client: Kerberos credentials: principal = yarn/sicily1.wsdm.ami.com@HADOOPCLUSTER.LOCAL, keytab = /home/hadoop/hadoop/etc/hadoop/yarn.keytab

20/09/05 22:38:50 INFO yarn.Client: Application report for application_1599364548601_0002 (state: RUNNING)
20/09/05 22:38:51 INFO yarn.Client: Application report for application_1599364548601_0002 (state: FINISHED)

Confirmation

[root@sicily1 container_1599364548601_0002_01_000001]# pwd
/home/hadoop/hadoop/logs/userlogs/application_1599364548601_0002/container_1599364548601_0002_01_000001
[root@sicily1 container_1599364548601_0002_01_000001]# cat stdout
Pi is roughly 3.143340

5. Some WebHDFS commands, for confirmation

  • List directories
[root@sicily1 ~]# curl — negotiate -u : http://sicily1.wsdm.ami.com:50070/webhdfs/v1/?op=LISTSTATUS
{“FileStatuses”:{“FileStatus”:[
{“accessTime”:0,”blockSize”:0,”childrenNum”:1,”fileId”:16386,”group”:”supergroup”,”length”:0,”modificationTime”:1599363769329,”owner”:”HTTP”,”pathSuffix”:”testing_data”,”permission”:”755",”replication”:0,”storagePolicy”:0,”type”:”DIRECTORY”},
{“accessTime”:0,”blockSize”:0,”childrenNum”:1,”fileId”:16389,”group”:”supergroup”,”length”:0,”modificationTime”:1599368881798,”owner”:”yarn”,”pathSuffix”:”user”,”permission”:”755",”replication”:0,”storagePolicy”:0,”type”:”DIRECTORY”}
]}}
  • List file content
[root@sicily1 ~]# curl — negotiate -u : -L http://sicily1.wsdm.ami.com:50070/webhdfs/v1/testing_data/airports.csv?op=OPEN

9540,”Deer Harbor Seaplane”,”Deer Harbor”,”United States”,”DHB”,\N,48.618397,-123.00596,0,-8,”A”,”America/Los_Angeles”
9541,”San Diego Old Town Transit Center”,”San Diego”,”United States”,”OLT”,\N,32.7552,-117.1995,0,-8,”A”,”America/Los_Angeles”
  • Create a directory
[root@sicily1 python]# curl — negotiate -u : -X PUT “http://sicily1.wsdm.ami.com:50070/webhdfs/v1/examples?op=MKDIRS"
{“boolean”:true}
  • Upload and confirm file content

Create a placeholder for the file. This would provide a “Location” of the resource to be uploaded in the response.

[root@sicily1 python]# curl -i — negotiate -u : -X PUT “http://sicily1.wsdm.ami.com:50070/webhdfs/v1/examples/pi.py?op=CREATE&overwrite=true"

Location: http://sicily1.wsdm.ami.com:1006/webhdfs/v1/examples/pi.py?op=CREATE&delegation=HAAESFRUUARIVFRQAIoBdGIGtzaKAXSGEzs2CgoUToTmsnt97-UVu7nq5SNOl6xhbIoSV0VCSERGUyBkZWxlZ2F0aW9uEDEwLjQxLjYuMTc5OjkwMDA&namenoderpcaddress=sicily1.wsdm.ami.com:9000&createflag=&createparent=true&overwrite=true
  • Use the location and upload the file to this location.
[root@sicily1 python]# curl -i — negotiate -u : -X PUT -T pi.py “http://sicily1.wsdm.ami.com:1006/webhdfs/v1/examples/pi.py?op=CREATE&delegation=HAAESFRUUARIVFRQAIoBdGIGtzaKAXSGEzs2CgoUToTmsnt97-UVu7nq5SNOl6xhbIoSV0VCSERGUyBkZWxlZ2F0aW9uEDEwLjQxLjYuMTc5OjkwMDA&namenoderpcaddress=sicily1.wsdm.ami.com:9000&createflag=&createparent=true&overwrite=true"
  • Confirm file upload
[root@sicily1 python]# curl — negotiate -u : -L http://sicily1.wsdm.ami.com:50070/webhdfs/v1/examples/pi.py?op=OPEN

count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print(“Pi is roughly %f” % (4.0 * count / n))
spark.stop()
  • Running a spark application with the application in HDFS
[root@sicily1 python]# spark-submit — deploy-mode cluster hdfs://sicily1.wsdm.ami.com:9000/examples/pi.py

ApplicationMaster host: turin1.wsdm.ami.com
20/09/05 23:14:21 INFO yarn.Client: Application report for application_1599364548601_0003 (state: RUNNING)
20/09/05 23:14:22 INFO yarn.Client: Application report for application_1599364548601_0003 (state: FINISHED)

Confirmation

[root@turin1 container_1599364548601_0003_01_000001]# pwd
/home/hadoop/hadoop/logs/userlogs/application_1599364548601_0003/container_1599364548601_0003_01_000001
[root@turin1 container_1599364548601_0003_01_000001]# cat stdout
Pi is roughly 3.139260

Chapter 6. Configuring Edge Node and Run Spark Applications

Before configuring the Edge node, it is expected that you have followed the steps mentioned in Chapter 2. Kerberos Installation and configuration#4. Configure Kerberos on all other node

1. Copy the configuration files to Edge

Copy the required Spark configuration files from Sicily (Resource Manager) to the Florence (Edge Node)

[root@sicily1 conf]# su — hadoop
[hadoop@sicily1 ~]$ cd spark/conf/
[hadoop@sicily1 conf]$ scp spark-env.sh spark-defaults.conf hadoop@florence1.wsdm.ami.com:/home/hadoop/spark/conf
spark-env.sh 100% 4943 681.4KB/s 00:00
spark-defaults.conf 100% 1618 303.8KB/s 00:00

2. Run sample Python based Spark application

[root@florence1 conf]# spark-submit — deploy-mode cluster — class org.apache.spark.examples.SparkPi /home/hadoop/spark/examples/jars/spark-examples_2.11–2.4.6.jar 10

20/09/05 23:22:28 INFO yarn.Client: Application report for application_1599364548601_0004 (state: RUNNING)
20/09/05 23:22:29 INFO yarn.Client: Application report for application_1599364548601_0004 (state: FINISHED)

ApplicationMaster host: turin1.wsdm.ami.com

Confirmation

[root@turin1 container_1599364548601_0004_01_000001]# pwd /home/hadoop/hadoop/logs/userlogs/application_1599364548601_0004/container_1599364548601_0004_01_000001
[root@turin1 container_1599364548601_0004_01_000001]# cat stdout
Pi is roughly 3.1445111445111444

3. Running the python application being in HDFS

[root@florence1 conf]# spark-submit — deploy-mode cluster hdfs://sicily1.wsdm.ami.com:9000/examples/pi.py
20/09/05 23:23:43 INFO yarn.Client: Kerberos credentials: principal = yarn/sicily1.wsdm.ami.com@HADOOPCLUSTER.LOCAL, keytab = /home/hadoop/hadoop/etc/hadoop/yarn.keytab

20/09/05 23:23:48 INFO yarn.Client: Source and destination file systems are the same. Not copying hdfs://sicily1.wsdm.ami.com:9000/examples/pi.py

20/09/05 23:24:13 INFO yarn.Client: Application report for application_1599364548601_0005 (state: RUNNING)
20/09/05 23:24:14 INFO yarn.Client: Application report for application_1599364548601_0005 (state: FINISHED)

ApplicationMaster host: sicily1.wsdm.ami.com

Confirmation

[root@sicily1 container_1599364548601_0005_01_000001]# pwd
/home/hadoop/hadoop/logs/userlogs/application_1599364548601_0005/container_1599364548601_0005_01_000001
[root@sicily1 container_1599364548601_0005_01_000001]# cat stdout
Pi is roughly 3.144260

4. Configure Livy in the Edge node (florence) to submit spark applications

Create the logs folder for livy

[hadoop@florence1 livy]$ pwd
/home/hadoop/livy
[hadoop@florence1 conf]$ mkdir logs

Add the following attributes to livy.conf file.

[hadoop@florence1 conf]$ pwd
/home/hadoop/livy/conf
[hadoop@florence1 conf]$ cat livy.conf

livy.server.port = 8998
livy.spark.master = yarn
livy.spark.deploy-mode = cluster
livy.environment= production
livy.server.session.timeout = 1h
livy.impersonation.enabled = false
livy.server.csrf-protection.enabled = false
livy.server.recovery.mode = off
livy.server.access-control.enabled = true
livy.server.access-control.allowed-users = *
livy.server.access-control.modify-users = *
livy.server.access-control.view-users = *
livy.server.auth.type = kerberos
livy.server.auth.kerberos.principal = HTTP/florence1.wsdm.ami.com@HADOOPCLUSTER.LOCAL
livy.server.auth.kerberos.keytab = /home/hadoop/hadoop/etc/hadoop/HTTP.keytab
livy.server.launch.kerberos.keytab = /home/hadoop/hadoop/etc/hadoop/hdfs.keytab
livy.server.launch.kerberos.principal = hdfs/florence1.wsdm.ami.com@HADOOPCLUSTER.LOCAL

Start the livy-server

[hadoop@florence1 livy]$ livy-server
20/09/06 00:07:47 INFO server.AccessManager: AccessControlManager acls enabled;users with view permission: *;users with modify permission: *;users with super permission: ;other allowed users: *

20/09/06 00:07:49 INFO client.RMProxy: Connecting to ResourceManager at sicily1.wsdm.ami.com/10.41.6.179:8032

20/09/06 00:07:50 INFO server.LivyServer: SPNEGO auth enabled (principal = HTTP/florence1.wsdm.ami.com@HADOOPCLUSTER.LOCAL)
20/09/06 00:07:50 INFO server.LivyServer: Access control is enabled

20/09/06 00:07:50 INFO server.KerberosAuthenticationHandler: Login using keytab /home/hadoop/hadoop/etc/hadoop/HTTP.keytab, for principal HTTP/florence1.wsdm.ami.com@HADOOPCLUSTER.LOCAL

20/09/06 00:07:50 INFO server.WebServer: Starting server on http://florence1.wsdm.ami.com:8998

Submitting a spark job remotely

Open another terminal for Florence node and then submit the spark job which is there in HDFS to Livy Server.

[root@florence1 ~]# curl -i — negotiate -u : -X POST -d ‘{“file”:”hdfs://sicily1.wsdm.ami.com:9000/examples/pi.py”}’ -H “Content-Type: application/json” “http://florence1.wsdm.ami.com:8998/batches"

{“id”:0,”name”:null,”owner”:”HTTP”,”proxyUser”:null,”state”:”starting”,”appId”:null,”appInfo”:{“driverLogUrl”:null,”sparkUiUrl”:null},”log”:[“stdout: “,”\nstderr: “,”\nYARN Diagnostics: “]}

Get the job status

[root@florence1 ~]# curl — negotiate -u : -X GET “http://florence1.wsdm.ami.com:8998/batches/0/state"
{“id”:0,”state”:”starting”}
[root@florence1 ~]# curl — negotiate -u : -X GET “http://florence1.wsdm.ami.com:8998/batches/0/state"
{“id”:0,”state”:”running”}
[root@florence1 ~]# curl — negotiate -u : -X GET “http://florence1.wsdm.ami.com:8998/batches/0/state"
{“id”:0,”state”:”success”}

Confirmation

[root@sicily1 container_1599364548601_0007_01_000001]# pwd
/home/hadoop/hadoop/logs/userlogs/application_1599364548601_0007/container_1599364548601_0007_01_000001
[root@sicily1 container_1599364548601_0007_01_000001]# cat stdout
Pi is roughly 3.141040

Running WebHDFS commands from Edge Node

Run HDFS commands from edge node to make sure they run against NameNode

[root@florence1 ~]# hdfs dfs -cat /examples/pi.py | tail
def f(_):
x = random() * 2–1
y = random() * 2–1
return 1 if x ** 2 + y ** 2 <= 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print(“Pi is roughly %f” % (4.0 * count / n))
spark.stop()

To summarise what we have done so far

* Configure Kerberos Admin Server and KDC on the master node
* Configure Kerberos Client on all nodes
* Create Kerberos principals and make sure they can get tickets for communication
* Configure Hadoop HDFS, YARN for Kerberos
* Build JSVC so that Hadoop/YARN can make remote communications securely.
* Ran sample HDFS Commands
* Configure Spark/YARN for Kerberos
* Ran sample spark-submit jobs for both Java and Python based applications
* Ran sample spark-submit job with the python based job in HDFS
* Configure the Edge node to submit Spark jobs remotely using spark-submit
* Configure Livy with Spark and Kerberos to submit Spark jobs remotely using Livy Batches API.

Next we shall proceed with Hive Configuration in the chapter <<Apache Hive Configuration with MySQL metastore>>

--

--

Ravi Chamarthy

Software Architect, watsonx.governance - Monitoring & IBM Master Inventor