Spark structured streaming seems to work on local mode only

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark structured streaming seems to work on local mode only

Mich Talebzadeh
Hi,

Spark 3.1.1

We have an instance of PyCharm on LInux host that we run for testing Spark structured streaming.

By default it runs on a single JVM and writes to Google BigQuery table. The batch interval is 2 seconds ad obviously the performance is not good. However, it does write to table and we have confirmed it.

image.png

However, when we run the same program on the cluster itself using PySpark and yarn client mode, it does not work and crashes eventually.


spark-submit --master yarn --deploy-mode client --driver-memory 8G --executor-memory 8G --num-executors 2 --executor-cores 2 MDStreamingBQ.py


2021-03-03 15:53:16,274 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@75b5dfcf{/static/sql,null,AVAILABLE,@Spark}

{'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}

[]

None

2021-03-03 16:01:25,614 ERROR cluster.YarnClientSchedulerBackend: YARN application has exited unexpectedly with state FAILED! Check the YARN application logs for more details.

2021-03-03 16:01:25,614 ERROR cluster.YarnClientSchedulerBackend: Diagnostics message: Max number of executor failures (4) reached

2021-03-03 16:01:25,622 ERROR v2.WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@540c415c is aborting.

2021-03-03 16:01:25,622 ERROR v2.WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@540c415c aborted.

2021-03-03 16:01:25,628 ERROR streaming.MicroBatchExecution: Query [id = 69f102dd-8026-44ef-8122-4fd68bf6abc5, runId = 3abe5bb9-b874-4331-a125-25f1ca1517b0] terminated with error

org.apache.spark.SparkException: Writing job aborted.

        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)

        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)

        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:297)

        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:304)

        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)

        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)

        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:46)

        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)

        at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2965)

        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)

        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)

        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)

        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)

        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2965)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:589)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)

        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)

        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)

        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)

        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)

        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)

        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)

        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)

        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)

Caused by: org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down

        at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1084)

        at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1082)

        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)

        at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1082)

        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2458)

        at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)

        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2364)

        at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2075)

        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1419)

        at org.apache.spark.SparkContext.stop(SparkContext.scala:2075)

        at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:124)

        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)

        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)

        ... 37 more

2021-03-03 16:01:25,644 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from /50.140.197.230:46358 is closed

2021-03-03 16:01:25,645 ERROR cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Sending RequestExecutors(Map(),Map(),Map(),Set()) to AM was unsuccessful

java.io.IOException: Connection reset by peer

        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

        at sun.nio.ch.IOUtil.read(IOUtil.java:192)

        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

        at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)

        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)

        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)

        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)

        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

        at java.lang.Thread.run(Thread.java:748)

2021-03-03 16:01:25,646 ERROR util.Utils: Uncaught exception in thread YARN application state monitor

org.apache.spark.SparkException: Exception thrown in awaitResult:

        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)

        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:742)

        at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.stop(YarnSchedulerBackend.scala:114)

        at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:168)

        at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:881)

        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2365)

        at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2075)

        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1419)

        at org.apache.spark.SparkContext.stop(SparkContext.scala:2075)

        at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:124)

Caused by: java.io.IOException: Connection reset by peer

        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

        at sun.nio.ch.IOUtil.read(IOUtil.java:192)

        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

        at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)

        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)

        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)

        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)

        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

        at java.lang.Thread.run(Thread.java:748)

Traceback (most recent call last):

  File "/home/hduser/dba/bin/python/DSBQ/src/MDStreamingBQ.py", line 92, in <module>

    streamingDataFrame = mdstreaming.fetch_data()

  File "/home/hduser/dba/bin/python/DSBQ/src/MDStreamingBQ.py", line 84, in fetch_data

    result.awaitTermination()

  File "/d4T/hduser/spark-3.1.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination

  File "/d4T/hduser/spark-3.1.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__

  File "/d4T/hduser/spark-3.1.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco

pyspark.sql.utils.StreamingQueryException: Writing job aborted.

=== Streaming Query ===

Identifier: [id = 69f102dd-8026-44ef-8122-4fd68bf6abc5, runId = 3abe5bb9-b874-4331-a125-25f1ca1517b0]

Current Committed Offsets: {}

Current Available Offsets: {KafkaV2[Subscribe[md]]: {"md":{"8":2126395,"2":2127495,"5":2125643,"4":2125577,"7":2128375,"1":2124913,"3":2126568,"6":2124793,"0":2124924}}}


Current State: ACTIVE

Thread State: RUNNABLE


Logical Plan:

WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false]

+- Project [rowkey#26, ticker#27, timeissued#28, price#29, currency#38, op_type#44, current_timestamp() AS op_time#51]

   +- Project [rowkey#26, ticker#27, timeissued#28, price#29, currency#38, 1 AS op_type#44]

      +- Project [rowkey#26, ticker#27, timeissued#28, price#29, GBP AS currency#38]

         +- Project [parsed_value#24.rowkey AS rowkey#26, parsed_value#24.ticker AS ticker#27, parsed_value#24.timeissued AS timeissued#28, parsed_value#24.price AS price#29]

            +- Project [from_json(StructField(rowkey,StringType,true), StructField(ticker,StringType,true), StructField(timeissued,TimestampType,true), StructField(price,FloatType,true), cast(value#9 as string), Some(Europe/London)) AS parsed_value#24]

               +- StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@28673c68, KafkaV2[Subscribe[md]]


Is there some issue with running structured streaming with PySpark  in client mode? We have had many times doing Spark streaming (not structured streaming) with spark version 2.4.3 etc with Scala and it was always performant.


Thanks


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming seems to work on local mode only

Sachit Murarka
Hi Mich,

Fortunately, I was facing same error. I resolved it by increasing driver memory.
Try putting 5GB or more. I think you are using default which is very less.

Hope this helps.

Thanks
Sachit

On Wed, 3 Mar 2021, 22:52 Mich Talebzadeh, <[hidden email]> wrote:
Hi,

Spark 3.1.1

We have an instance of PyCharm on LInux host that we run for testing Spark structured streaming.

By default it runs on a single JVM and writes to Google BigQuery table. The batch interval is 2 seconds ad obviously the performance is not good. However, it does write to table and we have confirmed it.

image.png

However, when we run the same program on the cluster itself using PySpark and yarn client mode, it does not work and crashes eventually.


spark-submit --master yarn --deploy-mode client --driver-memory 8G --executor-memory 8G --num-executors 2 --executor-cores 2 MDStreamingBQ.py


2021-03-03 15:53:16,274 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@75b5dfcf{/static/sql,null,AVAILABLE,@Spark}

{'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}

[]

None

2021-03-03 16:01:25,614 ERROR cluster.YarnClientSchedulerBackend: YARN application has exited unexpectedly with state FAILED! Check the YARN application logs for more details.

2021-03-03 16:01:25,614 ERROR cluster.YarnClientSchedulerBackend: Diagnostics message: Max number of executor failures (4) reached

2021-03-03 16:01:25,622 ERROR v2.WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@540c415c is aborting.

2021-03-03 16:01:25,622 ERROR v2.WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@540c415c aborted.

2021-03-03 16:01:25,628 ERROR streaming.MicroBatchExecution: Query [id = 69f102dd-8026-44ef-8122-4fd68bf6abc5, runId = 3abe5bb9-b874-4331-a125-25f1ca1517b0] terminated with error

org.apache.spark.SparkException: Writing job aborted.

        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)

        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)

        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:297)

        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:304)

        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)

        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)

        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:46)

        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)

        at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2965)

        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)

        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)

        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)

        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)

        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2965)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:589)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)

        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)

        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)

        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)

        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)

        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)

        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)

        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)

        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)

Caused by: org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down

        at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1084)

        at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1082)

        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)

        at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1082)

        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2458)

        at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)

        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2364)

        at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2075)

        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1419)

        at org.apache.spark.SparkContext.stop(SparkContext.scala:2075)

        at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:124)

        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)

        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)

        ... 37 more

2021-03-03 16:01:25,644 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from /50.140.197.230:46358 is closed

2021-03-03 16:01:25,645 ERROR cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Sending RequestExecutors(Map(),Map(),Map(),Set()) to AM was unsuccessful

java.io.IOException: Connection reset by peer

        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

        at sun.nio.ch.IOUtil.read(IOUtil.java:192)

        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

        at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)

        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)

        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)

        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)

        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

        at java.lang.Thread.run(Thread.java:748)

2021-03-03 16:01:25,646 ERROR util.Utils: Uncaught exception in thread YARN application state monitor

org.apache.spark.SparkException: Exception thrown in awaitResult:

        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)

        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:742)

        at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.stop(YarnSchedulerBackend.scala:114)

        at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:168)

        at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:881)

        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2365)

        at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2075)

        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1419)

        at org.apache.spark.SparkContext.stop(SparkContext.scala:2075)

        at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:124)

Caused by: java.io.IOException: Connection reset by peer

        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

        at sun.nio.ch.IOUtil.read(IOUtil.java:192)

        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

        at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)

        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)

        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)

        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)

        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

        at java.lang.Thread.run(Thread.java:748)

Traceback (most recent call last):

  File "/home/hduser/dba/bin/python/DSBQ/src/MDStreamingBQ.py", line 92, in <module>

    streamingDataFrame = mdstreaming.fetch_data()

  File "/home/hduser/dba/bin/python/DSBQ/src/MDStreamingBQ.py", line 84, in fetch_data

    result.awaitTermination()

  File "/d4T/hduser/spark-3.1.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination

  File "/d4T/hduser/spark-3.1.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__

  File "/d4T/hduser/spark-3.1.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco

pyspark.sql.utils.StreamingQueryException: Writing job aborted.

=== Streaming Query ===

Identifier: [id = 69f102dd-8026-44ef-8122-4fd68bf6abc5, runId = 3abe5bb9-b874-4331-a125-25f1ca1517b0]

Current Committed Offsets: {}

Current Available Offsets: {KafkaV2[Subscribe[md]]: {"md":{"8":2126395,"2":2127495,"5":2125643,"4":2125577,"7":2128375,"1":2124913,"3":2126568,"6":2124793,"0":2124924}}}


Current State: ACTIVE

Thread State: RUNNABLE


Logical Plan:

WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false]

+- Project [rowkey#26, ticker#27, timeissued#28, price#29, currency#38, op_type#44, current_timestamp() AS op_time#51]

   +- Project [rowkey#26, ticker#27, timeissued#28, price#29, currency#38, 1 AS op_type#44]

      +- Project [rowkey#26, ticker#27, timeissued#28, price#29, GBP AS currency#38]

         +- Project [parsed_value#24.rowkey AS rowkey#26, parsed_value#24.ticker AS ticker#27, parsed_value#24.timeissued AS timeissued#28, parsed_value#24.price AS price#29]

            +- Project [from_json(StructField(rowkey,StringType,true), StructField(ticker,StringType,true), StructField(timeissued,TimestampType,true), StructField(price,FloatType,true), cast(value#9 as string), Some(Europe/London)) AS parsed_value#24]

               +- StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@28673c68, KafkaV2[Subscribe[md]]


Is there some issue with running structured streaming with PySpark  in client mode? We have had many times doing Spark streaming (not structured streaming) with spark version 2.4.3 etc with Scala and it was always performant.


Thanks


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming seems to work on local mode only

Mich Talebzadeh
Thanks Sachit. I will try.

Regards,



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 3 Mar 2021 at 17:29, Sachit Murarka <[hidden email]> wrote:
Hi Mich,

Fortunately, I was facing same error. I resolved it by increasing driver memory.
Try putting 5GB or more. I think you are using default which is very less.

Hope this helps.

Thanks
Sachit

On Wed, 3 Mar 2021, 22:52 Mich Talebzadeh, <[hidden email]> wrote:
Hi,

Spark 3.1.1

We have an instance of PyCharm on LInux host that we run for testing Spark structured streaming.

By default it runs on a single JVM and writes to Google BigQuery table. The batch interval is 2 seconds ad obviously the performance is not good. However, it does write to table and we have confirmed it.

image.png

However, when we run the same program on the cluster itself using PySpark and yarn client mode, it does not work and crashes eventually.


spark-submit --master yarn --deploy-mode client --driver-memory 8G --executor-memory 8G --num-executors 2 --executor-cores 2 MDStreamingBQ.py


2021-03-03 15:53:16,274 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@75b5dfcf{/static/sql,null,AVAILABLE,@Spark}

{'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}

[]

None

2021-03-03 16:01:25,614 ERROR cluster.YarnClientSchedulerBackend: YARN application has exited unexpectedly with state FAILED! Check the YARN application logs for more details.

2021-03-03 16:01:25,614 ERROR cluster.YarnClientSchedulerBackend: Diagnostics message: Max number of executor failures (4) reached

2021-03-03 16:01:25,622 ERROR v2.WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@540c415c is aborting.

2021-03-03 16:01:25,622 ERROR v2.WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@540c415c aborted.

2021-03-03 16:01:25,628 ERROR streaming.MicroBatchExecution: Query [id = 69f102dd-8026-44ef-8122-4fd68bf6abc5, runId = 3abe5bb9-b874-4331-a125-25f1ca1517b0] terminated with error

org.apache.spark.SparkException: Writing job aborted.

        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)

        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)

        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:297)

        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:304)

        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)

        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)

        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:46)

        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)

        at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2965)

        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)

        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)

        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)

        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)

        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2965)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:589)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)

        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)

        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)

        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)

        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)

        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)

        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)

        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)

        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)

Caused by: org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down

        at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1084)

        at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1082)

        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)

        at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1082)

        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2458)

        at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)

        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2364)

        at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2075)

        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1419)

        at org.apache.spark.SparkContext.stop(SparkContext.scala:2075)

        at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:124)

        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)

        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)

        ... 37 more

2021-03-03 16:01:25,644 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from /50.140.197.230:46358 is closed

2021-03-03 16:01:25,645 ERROR cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Sending RequestExecutors(Map(),Map(),Map(),Set()) to AM was unsuccessful

java.io.IOException: Connection reset by peer

        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

        at sun.nio.ch.IOUtil.read(IOUtil.java:192)

        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

        at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)

        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)

        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)

        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)

        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

        at java.lang.Thread.run(Thread.java:748)

2021-03-03 16:01:25,646 ERROR util.Utils: Uncaught exception in thread YARN application state monitor

org.apache.spark.SparkException: Exception thrown in awaitResult:

        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)

        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:742)

        at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.stop(YarnSchedulerBackend.scala:114)

        at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:168)

        at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:881)

        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2365)

        at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2075)

        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1419)

        at org.apache.spark.SparkContext.stop(SparkContext.scala:2075)

        at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:124)

Caused by: java.io.IOException: Connection reset by peer

        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

        at sun.nio.ch.IOUtil.read(IOUtil.java:192)

        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

        at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)

        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)

        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)

        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)

        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

        at java.lang.Thread.run(Thread.java:748)

Traceback (most recent call last):

  File "/home/hduser/dba/bin/python/DSBQ/src/MDStreamingBQ.py", line 92, in <module>

    streamingDataFrame = mdstreaming.fetch_data()

  File "/home/hduser/dba/bin/python/DSBQ/src/MDStreamingBQ.py", line 84, in fetch_data

    result.awaitTermination()

  File "/d4T/hduser/spark-3.1.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination

  File "/d4T/hduser/spark-3.1.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__

  File "/d4T/hduser/spark-3.1.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco

pyspark.sql.utils.StreamingQueryException: Writing job aborted.

=== Streaming Query ===

Identifier: [id = 69f102dd-8026-44ef-8122-4fd68bf6abc5, runId = 3abe5bb9-b874-4331-a125-25f1ca1517b0]

Current Committed Offsets: {}

Current Available Offsets: {KafkaV2[Subscribe[md]]: {"md":{"8":2126395,"2":2127495,"5":2125643,"4":2125577,"7":2128375,"1":2124913,"3":2126568,"6":2124793,"0":2124924}}}


Current State: ACTIVE

Thread State: RUNNABLE


Logical Plan:

WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false]

+- Project [rowkey#26, ticker#27, timeissued#28, price#29, currency#38, op_type#44, current_timestamp() AS op_time#51]

   +- Project [rowkey#26, ticker#27, timeissued#28, price#29, currency#38, 1 AS op_type#44]

      +- Project [rowkey#26, ticker#27, timeissued#28, price#29, GBP AS currency#38]

         +- Project [parsed_value#24.rowkey AS rowkey#26, parsed_value#24.ticker AS ticker#27, parsed_value#24.timeissued AS timeissued#28, parsed_value#24.price AS price#29]

            +- Project [from_json(StructField(rowkey,StringType,true), StructField(ticker,StringType,true), StructField(timeissued,TimestampType,true), StructField(price,FloatType,true), cast(value#9 as string), Some(Europe/London)) AS parsed_value#24]

               +- StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@28673c68, KafkaV2[Subscribe[md]]


Is there some issue with running structured streaming with PySpark  in client mode? We have had many times doing Spark streaming (not structured streaming) with spark version 2.4.3 etc with Scala and it was always performant.


Thanks


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming seems to work on local mode only

Mich Talebzadeh
Hi all,

I managed to sort this one out to work in yarn cluster mode. As far as I can PySpark runs in yarn client mode only. Please suggest if this is incorrect.


If you are connecting from on-premise to Cloud ensure that you set the configuration parameter spark.network.timeout from 120s to a larger value. It should be in ms not seconds. So I set ours to 800000 in $SPARK_HOME/conf


spark.network.timeout              800000


2) In yarn mode ensure that all nodes use the same virtual environment with Python. For example, Python 3.7.9 etc


3). Whenever you add new jar files to $SPARK_HOME/jars, ensure that they are available throughout the cluster. Best to put it in HDFS somewhere.


3.1) Putting Spark Jar files on HDFS

In Yarn mode, it is important that Spark jar files are available throughout the Spark cluster. so do the following:

3.2) Create the archive:

    jar cv0f spark-libs.jar -C $SPARK_HOME/jars/ . # don't forget the dot

3.3) Create a directory on HDFS for the jars accessible to the application

   hdfs dfs -mkdir /jars

3.4) Upload to HDFS:

   hdfs dfs -put spark-libs.jar /jars

3.5) In $SPARK_HOME/conf/spark-defaults.conf file set

spark.yarn.archive=hdfs://<NAMENODE>:9000/jars/spark-libs.jar


Then I ran it as below

spark-submit --master yarn --deploy-mode client --driver-memory 16G --executor-memory 8G --num-executors 4 --executor-cores 2 MDStreamingBQ.py


HTH


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 3 Mar 2021 at 17:34, Mich Talebzadeh <[hidden email]> wrote:
Thanks Sachit. I will try.

Regards,



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 3 Mar 2021 at 17:29, Sachit Murarka <[hidden email]> wrote:
Hi Mich,

Fortunately, I was facing same error. I resolved it by increasing driver memory.
Try putting 5GB or more. I think you are using default which is very less.

Hope this helps.

Thanks
Sachit

On Wed, 3 Mar 2021, 22:52 Mich Talebzadeh, <[hidden email]> wrote:
Hi,

Spark 3.1.1

We have an instance of PyCharm on LInux host that we run for testing Spark structured streaming.

By default it runs on a single JVM and writes to Google BigQuery table. The batch interval is 2 seconds ad obviously the performance is not good. However, it does write to table and we have confirmed it.

image.png

However, when we run the same program on the cluster itself using PySpark and yarn client mode, it does not work and crashes eventually.


spark-submit --master yarn --deploy-mode client --driver-memory 8G --executor-memory 8G --num-executors 2 --executor-cores 2 MDStreamingBQ.py


2021-03-03 15:53:16,274 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@75b5dfcf{/static/sql,null,AVAILABLE,@Spark}

{'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}

[]

None

2021-03-03 16:01:25,614 ERROR cluster.YarnClientSchedulerBackend: YARN application has exited unexpectedly with state FAILED! Check the YARN application logs for more details.

2021-03-03 16:01:25,614 ERROR cluster.YarnClientSchedulerBackend: Diagnostics message: Max number of executor failures (4) reached

2021-03-03 16:01:25,622 ERROR v2.WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@540c415c is aborting.

2021-03-03 16:01:25,622 ERROR v2.WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@540c415c aborted.

2021-03-03 16:01:25,628 ERROR streaming.MicroBatchExecution: Query [id = 69f102dd-8026-44ef-8122-4fd68bf6abc5, runId = 3abe5bb9-b874-4331-a125-25f1ca1517b0] terminated with error

org.apache.spark.SparkException: Writing job aborted.

        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)

        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)

        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:297)

        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:304)

        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)

        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)

        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:46)

        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)

        at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2965)

        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)

        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)

        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)

        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)

        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2965)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:589)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)

        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)

        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)

        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)

        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)

        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)

        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)

        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)

        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)

        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)

        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)

        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)

Caused by: org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down

        at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1084)

        at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1082)

        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)

        at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1082)

        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2458)

        at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)

        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2364)

        at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2075)

        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1419)

        at org.apache.spark.SparkContext.stop(SparkContext.scala:2075)

        at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:124)

        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)

        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)

        ... 37 more

2021-03-03 16:01:25,644 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from /50.140.197.230:46358 is closed

2021-03-03 16:01:25,645 ERROR cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Sending RequestExecutors(Map(),Map(),Map(),Set()) to AM was unsuccessful

java.io.IOException: Connection reset by peer

        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

        at sun.nio.ch.IOUtil.read(IOUtil.java:192)

        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

        at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)

        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)

        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)

        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)

        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

        at java.lang.Thread.run(Thread.java:748)

2021-03-03 16:01:25,646 ERROR util.Utils: Uncaught exception in thread YARN application state monitor

org.apache.spark.SparkException: Exception thrown in awaitResult:

        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)

        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:742)

        at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.stop(YarnSchedulerBackend.scala:114)

        at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:168)

        at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:881)

        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2365)

        at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2075)

        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1419)

        at org.apache.spark.SparkContext.stop(SparkContext.scala:2075)

        at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:124)

Caused by: java.io.IOException: Connection reset by peer

        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

        at sun.nio.ch.IOUtil.read(IOUtil.java:192)

        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

        at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)

        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)

        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)

        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)

        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

        at java.lang.Thread.run(Thread.java:748)

Traceback (most recent call last):

  File "/home/hduser/dba/bin/python/DSBQ/src/MDStreamingBQ.py", line 92, in <module>

    streamingDataFrame = mdstreaming.fetch_data()

  File "/home/hduser/dba/bin/python/DSBQ/src/MDStreamingBQ.py", line 84, in fetch_data

    result.awaitTermination()

  File "/d4T/hduser/spark-3.1.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination

  File "/d4T/hduser/spark-3.1.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__

  File "/d4T/hduser/spark-3.1.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco

pyspark.sql.utils.StreamingQueryException: Writing job aborted.

=== Streaming Query ===

Identifier: [id = 69f102dd-8026-44ef-8122-4fd68bf6abc5, runId = 3abe5bb9-b874-4331-a125-25f1ca1517b0]

Current Committed Offsets: {}

Current Available Offsets: {KafkaV2[Subscribe[md]]: {"md":{"8":2126395,"2":2127495,"5":2125643,"4":2125577,"7":2128375,"1":2124913,"3":2126568,"6":2124793,"0":2124924}}}


Current State: ACTIVE

Thread State: RUNNABLE


Logical Plan:

WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false]

+- Project [rowkey#26, ticker#27, timeissued#28, price#29, currency#38, op_type#44, current_timestamp() AS op_time#51]

   +- Project [rowkey#26, ticker#27, timeissued#28, price#29, currency#38, 1 AS op_type#44]

      +- Project [rowkey#26, ticker#27, timeissued#28, price#29, GBP AS currency#38]

         +- Project [parsed_value#24.rowkey AS rowkey#26, parsed_value#24.ticker AS ticker#27, parsed_value#24.timeissued AS timeissued#28, parsed_value#24.price AS price#29]

            +- Project [from_json(StructField(rowkey,StringType,true), StructField(ticker,StringType,true), StructField(timeissued,TimestampType,true), StructField(price,FloatType,true), cast(value#9 as string), Some(Europe/London)) AS parsed_value#24]

               +- StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@28673c68, KafkaV2[Subscribe[md]]


Is there some issue with running structured streaming with PySpark  in client mode? We have had many times doing Spark streaming (not structured streaming) with spark version 2.4.3 etc with Scala and it was always performant.


Thanks


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.