Read and write data in S3 with Spark


We will developing a sample spark application in Scala that will read JSON file from S3, do some basic calculation and then write to S3 in csv format.

About S3#

S3 is an AWS managed distributed object storage that can be used for a wide variety of scenarios like video storage, static file hosting, data warehouse storage and many more.

Configure dependencies#

Before we starting writing the program, we will declare the dependencies required for the application to work. Here is the list of dependencies that needs to be added.

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.2.0" ,
"org.apache.spark" %% "spark-sql" % "3.2.0",
"org.apache.hadoop" % "hadoop-common" % "3.3.1",
"org.apache.hadoop" % "hadoop-aws" % "3.3.1",

Program description#

We will be creating a basic Spark program that reads a json file that contains data pertaining to flight schedules and using Spark Dataframe APIs we will calculate the total flights starting from a specific city. The result of the program would be saved in CSV format.

Here is the sample record of the dataset in json format, that would be read using api

"id": 1,
"source": "New York",
"destination": "Dallas",
"departureTime": "2021-05-02 21:00:00",
"arrivalTime": "2021-05-02 24:00:00"

We will start by initializing the Spark session and inject the AWS credentials using the System property.

val spark = SparkSession.builder()
.config("spark.hadoop.fs.s3a.access.key", System.getProperty("aws.key"))
.config("spark.hadoop.fs.s3a.secret.key", System.getProperty("aws.secret"))
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

Accept the parameters for the program to read the input path and the output path where the result will be stored.

if(args.length < 2){
System.err.println("Usage: -f <file-path> -o <output-path>")
val fileArg = args.indexOf("-f") + 1
val outArg = args.indexOf("-o") + 1

Implement the data processing pipeline using Dataframe APIs as shown below.

.agg(count("id") as "flights_count")
.withColumnRenamed("source", "city")

Run the program#

You can run the program from IntelliJ using local executor by configuring the run options.

IntelliJ Spark

Source code#

Below is the entire code that we just developed. To get the entire project, head over to Github.