Write pyspark dataframe into kms encrypted s3 bucket

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Write pyspark dataframe into kms encrypted s3 bucket

Devi P V

Hi All,

I am trying to write a pyspark dataframe into KMS encrypted S3 bucket.I am using spark-3.0.1-bin-hadoop3.2. I have given all the possible configurations as shown below.

sc = spark.sparkContext
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", "XXX")
hadoop_conf.set("fs.s3a.secret.key","XXX")
hadoop_conf.set("fs.s3a.multipart.size", "104857600")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.setBoolean("fs.s3a.sse.enabled",True)
hadoop_conf.set("fs.s3a.server-side-encryption-algorithm", "SSE-KMS")
hadoop_conf.set("fs.s3a.sse.kms.keyId", "XXXX")



df = spark.createDataFrame(
    [
        (1, 'one'), 
        (2, 'two'),
    ],
    ['id', 'txt'] 
)
df.write.csv('s3a://bucket_name/test_data',header='true')

Getting exception

: java.lang.IllegalArgumentException
        at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1314)
        at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1237)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:274)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
        at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:459)
        at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:559)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
        at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:953)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

Any idea to resolve this issue ?

Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Write pyspark dataframe into kms encrypted s3 bucket

Hariharan
fs.s3a.multipart.size needs to be a long value, not a string, so you
will need to use

hadoop_conf.set("fs.s3a.multipart.size", 104857600L)

~ Hariharan

On Thu, Oct 15, 2020 at 6:32 PM Devi P V <[hidden email]> wrote:

>
> Hi All,
>
> I am trying to write a pyspark dataframe into KMS encrypted S3 bucket.I am using spark-3.0.1-bin-hadoop3.2. I have given all the possible configurations as shown below.
>
> sc = spark.sparkContext
> hadoop_conf = sc._jsc.hadoopConfiguration()
> hadoop_conf.set("fs.s3a.access.key", "XXX")
> hadoop_conf.set("fs.s3a.secret.key","XXX")
> hadoop_conf.set("fs.s3a.multipart.size", "104857600")
> hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
> hadoop_conf.setBoolean("fs.s3a.sse.enabled",True)
> hadoop_conf.set("fs.s3a.server-side-encryption-algorithm", "SSE-KMS")
> hadoop_conf.set("fs.s3a.sse.kms.keyId", "XXXX")
>
>
>
> df = spark.createDataFrame(
>     [
>         (1, 'one'),
>         (2, 'two'),
>     ],
>     ['id', 'txt']
> )
> df.write.csv('s3a://bucket_name/test_data',header='true')
>
> Getting exception
>
> : java.lang.IllegalArgumentException
>         at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1314)
>         at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1237)
>         at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:274)
>         at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
>         at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
>         at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
>         at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
>         at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
>         at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
>         at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:459)
>         at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:559)
>         at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
>         at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
>         at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
>         at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:953)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>         at py4j.Gateway.invoke(Gateway.java:282)
>         at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at py4j.GatewayConnection.run(GatewayConnection.java:238)
>         at java.lang.Thread.run(Thread.java:748)
>
> Any idea to resolve this issue ?
>
> Thanks

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Write pyspark dataframe into kms encrypted s3 bucket

Devi P V
hadoop_conf.set("fs.s3a.multipart.size", 104857600L)

.set only allows string values. Its throwing invalid syntax. 

I tried following also. But issue not fixed. 

hadoop_conf.setLong("fs.s3a.multipart.size", 104857600)

Thanks 


On Thu, Oct 15, 2020, 7:22 PM Hariharan <[hidden email]> wrote:
fs.s3a.multipart.size needs to be a long value, not a string, so you
will need to use

hadoop_conf.set("fs.s3a.multipart.size", 104857600L)

~ Hariharan

On Thu, Oct 15, 2020 at 6:32 PM Devi P V <[hidden email]> wrote:
>
> Hi All,
>
> I am trying to write a pyspark dataframe into KMS encrypted S3 bucket.I am using spark-3.0.1-bin-hadoop3.2. I have given all the possible configurations as shown below.
>
> sc = spark.sparkContext
> hadoop_conf = sc._jsc.hadoopConfiguration()
> hadoop_conf.set("fs.s3a.access.key", "XXX")
> hadoop_conf.set("fs.s3a.secret.key","XXX")
> hadoop_conf.set("fs.s3a.multipart.size", "104857600")
> hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
> hadoop_conf.setBoolean("fs.s3a.sse.enabled",True)
> hadoop_conf.set("fs.s3a.server-side-encryption-algorithm", "SSE-KMS")
> hadoop_conf.set("fs.s3a.sse.kms.keyId", "XXXX")
>
>
>
> df = spark.createDataFrame(
>     [
>         (1, 'one'),
>         (2, 'two'),
>     ],
>     ['id', 'txt']
> )
> df.write.csv('s3a://bucket_name/test_data',header='true')
>
> Getting exception
>
> : java.lang.IllegalArgumentException
>         at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1314)
>         at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1237)
>         at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:274)
>         at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
>         at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
>         at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
>         at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
>         at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
>         at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
>         at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:459)
>         at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:559)
>         at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
>         at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
>         at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
>         at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:953)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>         at py4j.Gateway.invoke(Gateway.java:282)
>         at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at py4j.GatewayConnection.run(GatewayConnection.java:238)
>         at java.lang.Thread.run(Thread.java:748)
>
> Any idea to resolve this issue ?
>
> Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Write pyspark dataframe into kms encrypted s3 bucket

Hariharan
Sorry, I meant setLong only. If you know which version of hadoop jars you're using, you can check the code here to try to find out which line exactly is throwing the error.

~ Hariharan


On Thu, Oct 15, 2020 at 8:56 PM Devi P V <[hidden email]> wrote:
hadoop_conf.set("fs.s3a.multipart.size", 104857600L)

.set only allows string values. Its throwing invalid syntax. 

I tried following also. But issue not fixed. 

hadoop_conf.setLong("fs.s3a.multipart.size", 104857600)

Thanks 


On Thu, Oct 15, 2020, 7:22 PM Hariharan <[hidden email]> wrote:
fs.s3a.multipart.size needs to be a long value, not a string, so you
will need to use

hadoop_conf.set("fs.s3a.multipart.size", 104857600L)

~ Hariharan

On Thu, Oct 15, 2020 at 6:32 PM Devi P V <[hidden email]> wrote:
>
> Hi All,
>
> I am trying to write a pyspark dataframe into KMS encrypted S3 bucket.I am using spark-3.0.1-bin-hadoop3.2. I have given all the possible configurations as shown below.
>
> sc = spark.sparkContext
> hadoop_conf = sc._jsc.hadoopConfiguration()
> hadoop_conf.set("fs.s3a.access.key", "XXX")
> hadoop_conf.set("fs.s3a.secret.key","XXX")
> hadoop_conf.set("fs.s3a.multipart.size", "104857600")
> hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
> hadoop_conf.setBoolean("fs.s3a.sse.enabled",True)
> hadoop_conf.set("fs.s3a.server-side-encryption-algorithm", "SSE-KMS")
> hadoop_conf.set("fs.s3a.sse.kms.keyId", "XXXX")
>
>
>
> df = spark.createDataFrame(
>     [
>         (1, 'one'),
>         (2, 'two'),
>     ],
>     ['id', 'txt']
> )
> df.write.csv('s3a://bucket_name/test_data',header='true')
>
> Getting exception
>
> : java.lang.IllegalArgumentException
>         at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1314)
>         at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1237)
>         at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:274)
>         at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
>         at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
>         at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
>         at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
>         at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
>         at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
>         at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:459)
>         at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:559)
>         at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
>         at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
>         at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
>         at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:953)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>         at py4j.Gateway.invoke(Gateway.java:282)
>         at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at py4j.GatewayConnection.run(GatewayConnection.java:238)
>         at java.lang.Thread.run(Thread.java:748)
>
> Any idea to resolve this issue ?
>
> Thanks