Problem: Access Denied When Writing to an S3 Bucket Using RDD

Problem

Writing to an S3 bucket using RDDs fails. The driver node can write, but the worker (executor) node returns an access denied error. Writing with the DataFrame API, however works fine.

For example, let’s say you run the following code:

import java.io.File
import java.io.Serializable
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import java.net.URI
import scala.collection.mutable
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream

val ssc = new StreamingContext(sc, Seconds(10))
val rdd1 = sc.parallelize(Seq(1,2))
val rdd2 = sc.parallelize(Seq(3,4))
val inputStream = ssc.queueStream[Int](mutable.Queue(rdd1,rdd2))
val result = inputStream.map(x => x*x)
val count = result.foreachRDD { rdd =>
val config = new Configuration(sc.hadoopConfiguration) with Serializable
 rdd.mapPartitions {
   _.map { entry =>
       val fs = FileSystem.get(URI.create("s3://dx.lz.company.fldr.dev/part_0000000-123"), config)
       val path = new Path("s3://dx.lz.company.fldr.dev/part_0000000-123")
       val file = fs.create(path)
       file.write("foobar".getBytes)
       file.close()
   }
 }.count()
}

println(s"Count is $count")

ssc.start()

The following error is returned:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure:
Lost task 3.3 in stage 0.0 (TID 7, 10.205.244.228, executor 0): java.rmi.RemoteException: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied; Request ID: F81ADFACBCDFE626,
Extended Request ID: 1DNcBUHsmUFFI9a1lz0yGt4dnRjdY5V3C+J/DiEeg8Z4tMOLphZwW2U+sdxmr8fluQZ1R/3BCep,

Cause

When you write to the worker node using RDD, the IAM policy denies access if you use Serializable, as in val config = new Configuration(sc.hadoopConfiguration) with Serializable.

Solution

There are two ways to solve this problem:

Option 1: Use DataFrames instead

dbutils.fs.put("s3a://dx.lz.company.fldr.dev/test-gopi/test0.txt", "foobar")
val df = spark.read.text("s3a://dx.lz.company.fldr.dev/test-gopi/test0.txt")
df.write.text("s3a://dx.lz.company.fldr.dev/test-gopi/text1.txt")
val df1 = spark.read.text("s3a://dx.lz.company.fldr.dev/test-gopi/text1.txt")

Option 2: Use SerializableConfiguration

If you want to use RDDs, use:

val config = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))

For example:

import java.io.File
import java.io.Serializable
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import java.net.URI
import scala.collection.mutable
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.util.SerializableConfiguration

val ssc = new StreamingContext(sc, Seconds(10))
val rdd1 = sc.parallelize(Seq(1,2))
val rdd2 = sc.parallelize(Seq(3,4))
val inputStream = ssc.queueStream[Int](mutable.Queue(rdd1,rdd2))
val result = inputStream.map(x => x*x)
val count = result.foreachRDD { rdd =>
//val config = new Configuration(sc.hadoopConfiguration) with Serializable
val config = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
rdd.mapPartitions {
   _.map { entry =>
       val fs = FileSystem.get(URI.create("s3://pathpart_0000000-123"), config.value.value)
       val path = new Path("s3:///path/part_0000000-123")
       val file = fs.create(path)
       file.write("foobar".getBytes)
       file.close()
   }
 }.count()
}

println(s"Count is $count")

ssc.start()