[apache-spark] [spark-r] 503 Error - Cannot Connect to S3

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[apache-spark] [spark-r] 503 Error - Cannot Connect to S3

Khatri, Faysal

Hello-

 

I am attempting to use SparkR to read in a parquet file from S3.

The exact same operation succeeds using PySpark – but I get a 503 error using SparkR.

 

In fact, I get the 503 even if I use a bad endpoint or bad credentials. It’s as if Spark isn’t even trying to make the HTTP request. It’s the same machine and same cluster on which PySpark works flawlessly, though.

I get this warning after about 2 minutes - WARN streaming.FileStreamSink: Error while looking for metadata directory.

And the 503 appears after another minute or so.

There are no executors being started, so no logs there.

I am trying to connect to a custom endpoint, if that’s relevant.

 

Any ideas what could be going wrong?

Below is the code and full output.

 

Thanks in advance.

Faysal

 

CODE-

library(SparkR)

 

spark_config_default <- list(spark.dynamicAllocation.enabled='true',

                             spark.shuffle.service.enabled='true',

                             spark.sql.parquet.binaryAsString='true',

                             spark.dynamicAllocation.initialExecutors='3',

                             spark.dynamicAllocation.maxExecutors='30',

                             spark.driver.memory='2g',

                             spark.executor.memory='4g',

                             spark.executor.cores='3')

 

 

jars_list = c("aws-java-sdk-1.7.4.2.jar",

              "hadoop-aws-2.6.0.jar",

              "aws-s3-1.7.1.jar",

              "hadoop-common-2.6.0.jar"             

                )

 

sc <- sparkR.session(master = "yarn", deployMode = "client", sparkConfig = spark_config_default, sparkJars = jars_list)

 

hConf = SparkR:::callJMethod(sc, "conf")

SparkR:::callJMethod(hConf, "set", "fs.s3a.access.key", [access_key])

SparkR:::callJMethod(hConf, "set", "fs.s3a.secret.key", [secret_key])

SparkR:::callJMethod(hConf, "set", "fs.s3a.endpoint", [custom_endpoint)

SparkR:::callJMethod(hConf, "set", "com.amazonaws.services.s3a.enableV4", "true")

SparkR:::callJMethod(hConf, "set", "fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

 

sdf <- read.parquet("s3a://bucket/file.parquet")

 

sparkR.stop()

 

OUTPUT-

20/10/05 17:54:17 WARN streaming.FileStreamSink: Error while looking for metadata directory.

20/10/05 17:55:58 ERROR r.RBackendHandler: parquet on 10 failed

java.lang.reflect.InvocationTargetException

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:167)

     at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:108)

     at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:40)

     at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)

     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)

     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)

     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)

     at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)

     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)

     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)

     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)

     at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)

     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)

     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)

     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)

     at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)

     at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)

     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)

     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)

     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)

     at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)

     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)

     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)

     at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)

     at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)

     at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)

     at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)

     at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)

     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)

     at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)

     at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

     at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

     at java.lang.Thread.run(Thread.java:748)

Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 503, AWS Service: Amazon S3, AWS Request ID: null, AWS Error Code: null, AWS Error Message: Service Unavailable, S3 Extended Request ID: null

     at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)

     at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)

     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)

     at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3556)

     at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1036)

     at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:999)

     at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:154)

     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)

     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)

     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)

     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)

     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

     at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)

     at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)

     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)

     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)

     at scala.collection.immutable.List.foreach(List.scala:392)

     at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)

     at scala.collection.immutable.List.flatMap(List.scala:355)

     at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)

     at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)

     at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)

     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)

     at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:645)

     at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:644)

     ... 37 more

Error: Error in parquet : com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 503, AWS Service: Amazon S3, AWS Request ID: null, AWS Error Code: null, AWS Error Message: Service Unavailable, S3 Extended Request ID: null

     at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)

     at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)

     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)

     at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3556)

     at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1036)

     at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:999)

     at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:154)

     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)

     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(File