Read and write data in S3 with Spark

Objective#

We will developing a sample spark application in Scala that will read JSON file from S3, do some basic calculation and then write to S3 in csv format.

About S3#

S3 is an AWS managed distributed object storage that can be used for a wide variety of scenarios like video storage, static file hosting, data warehouse storage and many more.

Configure dependencies#

Before we starting writing the program, we will declare the dependencies required for the application to work. Here is the list of dependencies that needs to be added.

build.sbt
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.2.0" ,
"org.apache.spark" %% "spark-sql" % "3.2.0",
"org.apache.hadoop" % "hadoop-common" % "3.3.1",
"org.apache.hadoop" % "hadoop-aws" % "3.3.1",
)

Program description#

We will be creating a basic Spark program that reads a json file that contains data pertaining to flight schedules and using Spark Dataframe APIs we will calculate the total flights starting from a specific city. The result of the program would be saved in CSV format.

Here is the sample record of the dataset in json format, that would be read using spark.read.json api

flight.json
{
"id": 1,
"source": "New York",
"destination": "Dallas",
"departureTime": "2021-05-02 21:00:00",
"arrivalTime": "2021-05-02 24:00:00"
}

We will start by initializing the Spark session and inject the AWS credentials using the System property.

S3IO.scala
val spark = SparkSession.builder()
.master("local")
.config("spark.hadoop.fs.s3a.access.key", System.getProperty("aws.key"))
.config("spark.hadoop.fs.s3a.secret.key", System.getProperty("aws.secret"))
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.appName("spark-s3-read-write")
.getOrCreate()

Accept the parameters for the program to read the input path and the output path where the result will be stored.

S3IO.scala
if(args.length < 2){
System.err.println("Usage: -f <file-path> -o <output-path>")
sys.exit(1)
}
val fileArg = args.indexOf("-f") + 1
val outArg = args.indexOf("-o") + 1

Implement the data processing pipeline using Dataframe APIs as shown below.

S3IO.scala
spark.read.json(args(fileArg))
.groupBy("source")
.agg(count("id") as "flights_count")
.select("source","flights_count")
.withColumnRenamed("source", "city")
.write.csv(args(outArg))

Run the program#

You can run the program from IntelliJ using local executor by configuring the run options.

IntelliJ Spark

Source code#

Below is the entire code that we just developed. To get the entire project, head over to Github.