Day 03 - Package your application

Now that we've written a basic word count program using Apache Spark™ APIs, let's package the application and deploy to the cluster, that we created in the first step.

Hey! Hold on. What's the need of packaging ?

Good question! The main reason being is that your application is going to be run at a specific interval defined by the scheduler like Apache Airflow, and not run interactively as you just ran in the REPL. For this purpose, Apache Spark™ exposes a command spark-submit that allows us to run the application from command line, and schedule using cron job or some other scheduler.

If you want to list all the spark commands available then type spark and press tab key.

$ spark
spark-beeline spark-shell.cmd spark-submit sparkR2.cmd
spark-class spark-shell2.cmd spark-submit.cmd
spark-class.cmd spark-sql spark-submit2.cmd
spark-class2.cmd spark-sql.cmd sparkR
spark-shell spark-sql2.cmd sparkR.cmd

spark-submit command options#

Before we package the application, let's understand the frequently used options that spark-submit provides to configure the deployment.

  • --name : Provide the name of the spark application

  • --master : Specifies the cluster manager that is used while submitting the spark application. A cluster manager is responsible for distributing the job across multiple workers in the cluster. Specifying local will result in using single executor in the same JVM as the driver.

    For this example, we will be using local as this won't require any Spark cluster running in the desktop.

Wo wo wo ! Now what are workers, executors and driver ?

Spark when running in Standalone mode, manages the cluster resources using master-slave architecture. Each slave or worker is responsibile for taking the job from master, and scheduling them in that node using executor. Each executor is a separate JVM process, that is assigned specific heap memory and cores while initiating.

Driver is the first executor process that is responsible for interacting with the cluster manager through SparkContext object. Once you've create a SparkSession in the program, you can connect to datasources and implement our data pipeline. Internally, it uses SparkContext to allocate the resources using the configured Cluster manager to run the job. When using the local as the spark master, the driver will act as the executor responsible for run the data pipeline. This is ideal for quick testing with small dataset.

Now let's get back to other spark-submit options

  • --py-files : You can include multiple python files when submitting the application, which are comma separated.
  • --conf : This allows to configure the runtime of your spark application, which we would be looking in great detail tomorrow. A quick example is --conf spark.executor.memory=2g, which instructs the cluster manager to allocate 2GB of heap memory.
  • --class : Specifies the entrypoint of the application in the jar.

Setup development environment for Python#

For this guide, we'll go with VS Code, although there are other alternatives like PyCharm. VSCode has support for multiple languages through plugins. Refer the official guide to set up and install the Python plugin.

Once you've installed the plugin, create project with directory spark-samples/src/main/py and paste the following into requirement.txt file.

pyspark == 3.2.0
pandas

Assuming you're in the directory spark-samples, create an virtual environment and activate it.

python3 -m /path/to/spark-samples/venv
source venv/bin/activate

You can now install the above dependencies.

pip3 install -r requirements.txt

Make sure in VSCode you've selected the correct interpreter by pressing CTRL+SHIFT+P in Windows and CMD+SHIFT+P on Mac to select the interpreter located in venv.

Select python interpreter!

Setup development environment for Scala or Java#

Follow the below steps to start writing and debugging code from your IDE.

  • Download and install IntelliJ
  • Set up the Scala Plugin
  • Next step would be dependent on which build tool you're more familiar with.

Maven is a commonly used build tool for Java and other JVM language projects. In order to setup maven build tool, download and install the same. Once you're done with installation, you can create maven project using this guide.

Now, paste the following content in your pom.xml

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gigahex.spark.samples</groupId>
<artifactId>spark-samples</artifactId>
<version>0.1.0-SNAPSHOT</version>
<properties>
<spark.version>3.2.0</spark.version>
<scala.version>2.12.11</scala.version>
<scala.compat.version>2.12</scala.compat.version>
<jdk.version>1.8</jdk.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- Test Scopes -->
<dependency>
<groupId>org.scalactic</groupId>
<artifactId>scalactic_${scala.compat.version}</artifactId>
<version>3.0.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.compat.version}</artifactId>
<version>3.0.5</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- see https://davidb.github.io/scala-maven-plugin/plugin-info.html -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.5.6</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- enable scalatest -->
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>2.0.0</version>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- disable surefire -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<skipTests>true</skipTests>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- Use the shade plugin to remove all the provided artifacts (such as spark itself) from the jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<configuration>
<!-- Remove signed keys to prevent security exceptions on uber jar -->
<!-- See https://stackoverflow.com/a/6743609/7245239 -->
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.gigahex.spark.dataframes.DFtoDag</mainClass>
</transformer>
</transformers>
<artifactSet>
<excludes>
<exclude>javax.servlet:*</exclude>
<exclude>org.apache.hadoop:*</exclude>
<exclude>org.apache.maven.plugins:*</exclude>
<exclude>org.apache.spark:*</exclude>
<exclude>org.apache.avro:*</exclude>
<exclude>org.apache.parquet:*</exclude>
<exclude>org.scala-lang:*</exclude>
</excludes>
</artifactSet>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Complete program#

Now, we're ready to implement the program, which reads a text file and collects a list of words which are greater than two characters.

package com.gigahex.samples
import org.apache.spark.sql.SparkSession
object HelloWord {
def main(args: Array[String]): Unit = {
/**
* Setup the spark session
*/
val spark = SparkSession.builder()
.master("local")
.appName("count-words-scala")
.getOrCreate()
import spark.implicits._
val text = spark.read.textFile("")
val words = text.flatMap(x => x.split(" "))
val largeWords = words.filter(w => w.length > 2)
largeWords.write.text("/path/to/large-words")
spark.stop()
}
}

Package and deploy the application#

As a last step, lets package the application using the build tools respectively.

We can either use SBT or Maven for building the project. In this case, let's go ahead with the SBT

sbt assembly

The above command should produce a jar in the target directory with name spark-samples.jar Lets deploy this jar locally.

spark-submit --name hello-world \
--master local \
--class com.gigahex.samples.HelloWord \
/Users/gigahex/spark-samples/target/scala-2.12/spark-samples.jar

Summary#

Today we've looked at how we can develop, package and deploy the program in Java, Scala and Python. If everything works, then you should be able to view the list of words generated in the output directory as specified in the program.

The entire source code is available in Github.

For any queries or issues that you face, feel free to discuss in the Slack workspace.

Tomorrow, we'll be exploring how to deploy the Spark cluster in cluster mode and customize the runtime with different configuration options available.