Kafka with Spark Streaming work on local but it doesn't work in Standalone mode

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka with Spark Streaming work on local but it doesn't work in Standalone mode

Davide Curcio
Hi,

I'm trying to use Spark Streaming with a very simple script like this:

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


sc = SparkContext(appName="PythonSparkStreamingKafka")


ssc = StreamingContext(sc, 1)
kafkaParams = {"metadata.broker.list": "172.31.71.104:9092",
                       "auto.offset.reset": "smallest"}

training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)

training.pprint()

ssc.start()
ssc.awaitTermination()
But although locally it works, with the cluster using Standalone mode it crashes. I have a cluster with 4 machines:

1 machine with Kafka Producer, 1 Broker and 1 Zookeeper
1 machine is the driver
2 machines are the workers.

The strange thing is that when I had Kafka Producer, Broker and Zookeeper in the same machine in which I have the driver, it worked both locally and in the cluster. But obviously for the sake of scalability and modularity I'd like to use the current configuration.

I'm using Spark 2.4.6, the Kafka Streaming API are "spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that I'm currently using is kafka_2.11-2.4.1

The result is the following:

020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 172.31.69.185, executor 0): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on 172.31.69.185, executor 0: java.nio.channels.ClosedChannelException (null) [duplicate 1]
2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on 172.31.69.185, executor 0: java.nio.channels.ClosedChannelException (null) [duplicate 2]
2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time 1595584106000 ms
2020-07-24 09:48:26,375 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.31.79.221:44371 (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,022 INFO scheduler.JobScheduler: Added jobs for time 1595584107000 ms
2020-07-24 09:48:27,165 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on 172.31.79.221, executor 1: java.nio.channels.ClosedChannelException (null) [duplicate 3]
2020-07-24 09:48:27,167 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
2020-07-24 09:48:27,171 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled
2020-07-24 09:48:27,174 INFO scheduler.DAGScheduler: ResultStage 0 (runJob at PythonRDD.scala:153) failed in 2.943 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Driver stacktrace:
2020-07-24 09:48:27,179 INFO scheduler.DAGScheduler: Job 0 failed: runJob at PythonRDD.scala:153, took 3.010820 s
2020-07-24 09:48:27,190 INFO scheduler.JobScheduler: Finished job streaming job 1595584104000 ms.0 from job set of time 1595584104000 ms
2020-07-24 09:48:27,191 INFO scheduler.JobScheduler: Starting job streaming job 1595584105000 ms.0 from job set of time 1595584105000 ms
2020-07-24 09:48:27,193 ERROR scheduler.JobScheduler: Error running job streaming job 1595584104000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 173, in takeAndPrint
    taken = rdd.take(num + 1)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1360, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/context.py", line 1069, in runJob
    sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)


at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-24 09:48:27,211 INFO spark.SparkContext: Starting job: runJob at PythonRDD.scala:153
2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Got job 1 (runJob at PythonRDD.scala:153) with 1 output partitions
2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (runJob at PythonRDD.scala:153)
2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Parents of final stage: List()
2020-07-24 09:48:27,216 INFO scheduler.DAGScheduler: Missing parents: List()
2020-07-24 09:48:27,216 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:53), which has no missing parents
2020-07-24 09:48:27,220 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.9 KB, free 366.3 MB)
2020-07-24 09:48:27,223 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.0 KB, free 366.3 MB)
2020-07-24 09:48:27,225 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-172-31-69-46.ec2.internal:41579 (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,226 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1163
2020-07-24 09:48:27,227 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:53) (first 15 tasks are for partitions Vector(0))
2020-07-24 09:48:27,229 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
2020-07-24 09:48:27,230 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 4, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:27,248 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.31.69.185:44675 (size: 4.0 KB, free: 366.3 MB)
Traceback (most recent call last):
  File "/home/ubuntu/./prova2.py", line 22, in <module>
    ssc.awaitTermination()
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 192, in awaitTermination
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
2020-07-24 09:48:27,315 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 4) in 85 ms on 172.31.69.185 (executor 0) (1/1)
2020-07-24 09:48:27,316 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
2020-07-24 09:48:27,321 INFO python.PythonAccumulatorV2: Connected to AccumulatorServer at host: 127.0.0.1 port: 34673
2020-07-24 09:48:27,324 INFO scheduler.DAGScheduler: ResultStage 1 (runJob at PythonRDD.scala:153) finished in 0.106 s
2020-07-24 09:48:27,325 INFO scheduler.DAGScheduler: Job 1 finished: runJob at PythonRDD.scala:153, took 0.113169 s
2020-07-24 09:48:27,448 INFO spark.ContextCleaner: Cleaned accumulator 20
2020-07-24 09:48:27,448 INFO spark.ContextCleaner: Cleaned accumulator 13
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 3
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 8
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 7
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 10
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 4
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 6
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 11
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 5
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 25
py4j.protocol.Py4JJavaError: An error occurred while calling o23.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 173, in takeAndPrint
    taken = rdd.take(num + 1)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1360, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/context.py", line 1069, in runJob
    sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)


at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

2020-07-24 09:48:27,475 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on ip-172-31-69-46.ec2.internal:41579 in memory (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,477 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 172.31.69.185:44675 in memory (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 15
2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 23
2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 21
2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 12
2020-07-24 09:48:27,511 INFO spark.ContextCleaner: Cleaned accumulator 2
2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 18
2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 1
2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 14
2020-07-24 09:48:27,514 INFO streaming.StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
2020-07-24 09:48:27,511 INFO scheduler.JobScheduler: Finished job streaming job 1595584105000 ms.0 from job set of time 1595584105000 ms
2020-07-24 09:48:27,523 INFO scheduler.ReceiverTracker: ReceiverTracker stopped
2020-07-24 09:48:27,523 INFO scheduler.JobGenerator: Stopping JobGenerator immediately
2020-07-24 09:48:27,524 INFO scheduler.JobScheduler: Starting job streaming job 1595584106000 ms.0 from job set of time 1595584106000 ms
2020-07-24 09:48:27,527 INFO scheduler.JobScheduler: Finished job streaming job 1595584106000 ms.0 from job set of time 1595584106000 ms
2020-07-24 09:48:27,528 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 172.31.69.185:44675 in memory (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,529 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 172.31.79.221:44371 in memory (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,530 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on ip-172-31-69-46.ec2.internal:41579 in memory (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,531 INFO scheduler.JobScheduler: Starting job streaming job 1595584107000 ms.0 from job set of time 1595584107000 ms
2020-07-24 09:48:27,532 INFO scheduler.JobScheduler: Finished job streaming job 1595584107000 ms.0 from job set of time 1595584107000 ms
2020-07-24 09:48:27,532 ERROR scheduler.JobScheduler: Error running job streaming job 1595584105000 ms.0
py4j.Py4JException: Error while sending a command.
at py4j.CallbackClient.sendCommand(CallbackClient.java:397)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy18.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: py4j.Py4JNetworkException: Error while sending a command: null response: c
p2
call
L1595584105000
lo96
e

at py4j.CallbackConnection.sendCommand(CallbackConnection.java:158)
at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
... 24 more
2020-07-24 09:48:27,534 ERROR scheduler.JobScheduler: Error running job streaming job 1595584106000 ms.0
py4j.Py4JException: Error while sending a command.
at py4j.CallbackClient.sendCommand(CallbackClient.java:397)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy18.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: py4j.Py4JNetworkException: Error while sending a command: null response: c
p2
call
L1595584106000
lo113
e

at py4j.CallbackConnection.sendCommand(CallbackConnection.java:158)
at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
... 24 more
2020-07-24 09:48:27,535 ERROR scheduler.JobScheduler: Error running job streaming job 1595584107000 ms.0
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy18.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-24 09:48:27,534 INFO util.RecurringTimer: Stopped timer for JobGenerator after time 1595584107000
2020-07-24 09:48:27,540 ERROR python.PythonDStream$$anon$1: Cannot connect to Python process. It's probably dead. Stopping StreamingContext.
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy18.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-24 09:48:27,544 INFO scheduler.JobGenerator: Stopped JobGenerator
2020-07-24 09:48:27,551 INFO scheduler.JobScheduler: Stopped JobScheduler
2020-07-24 09:48:27,556 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6ad44bd5{/streaming,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,557 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@68ac401f{/streaming/json,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 16
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 9
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 19
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 22
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 17
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 24
2020-07-24 09:48:27,559 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@11792245{/streaming/batch,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,559 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2e5d35e4{/streaming/batch/json,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,560 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2dedce2c{/static/streaming,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,562 INFO streaming.StreamingContext: StreamingContext stopped successfully
2020-07-24 09:48:27,562 WARN streaming.StreamingContext: StreamingContext has already been stopped
2020-07-24 09:48:27,562 INFO spark.SparkContext: Invoking stop() from shutdown hook
2020-07-24 09:48:27,569 INFO server.AbstractConnector: Stopped Spark@4c8c11ce{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2020-07-24 09:48:27,572 INFO ui.SparkUI: Stopped Spark web UI at http://ip-172-31-69-46.ec2.internal:4040
2020-07-24 09:48:27,575 INFO cluster.StandaloneSchedulerBackend: Shutting down all executors
2020-07-24 09:48:27,576 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
2020-07-24 09:48:27,592 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
2020-07-24 09:48:27,600 INFO memory.MemoryStore: MemoryStore cleared
2020-07-24 09:48:27,600 INFO storage.BlockManager: BlockManager stopped
2020-07-24 09:48:27,604 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
2020-07-24 09:48:27,607 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
2020-07-24 09:48:27,613 INFO spark.SparkContext: Successfully stopped SparkContext
2020-07-24 09:48:27,614 INFO util.ShutdownHookManager: Shutdown hook called
2020-07-24 09:48:27,615 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-85b5d4dc-0524-4148-aff4-8a77deb6ccca
2020-07-24 09:48:27,617 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-49e4b426-544b-4cb4-a3e2-c0d98985e7b7
2020-07-24 09:48:27,620 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-49e4b426-544b-4cb4-a3e2-c0d98985e7b7/pyspark-c0db4268-2afd-4336-bf77-9dc7257213d2
I debug everything but I don't have any idea about how to solve this problem. Do you have any suggestion? It could be a Kafka configuration problem?
Thanks in advance,

Davide
Reply | Threaded
Open this post in threaded view
|

Re: Kafka with Spark Streaming work on local but it doesn't work in Standalone mode

Gabor Somogyi
Hi Davide,

Please see the doc:
Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.

Have you tried the same with Structured Streaming and not with DStreams?
If you insist somehow to DStreams you can use spark-streaming-kafka-0-10 connector instead.

BR,
G


On Fri, Jul 24, 2020 at 12:08 PM Davide Curcio <[hidden email]> wrote:
Hi,

I'm trying to use Spark Streaming with a very simple script like this:

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


sc = SparkContext(appName="PythonSparkStreamingKafka")


ssc = StreamingContext(sc, 1)
kafkaParams = {"metadata.broker.list": "172.31.71.104:9092",
                       "auto.offset.reset": "smallest"}

training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)

training.pprint()

ssc.start()
ssc.awaitTermination()
But although locally it works, with the cluster using Standalone mode it crashes. I have a cluster with 4 machines:

1 machine with Kafka Producer, 1 Broker and 1 Zookeeper
1 machine is the driver
2 machines are the workers.

The strange thing is that when I had Kafka Producer, Broker and Zookeeper in the same machine in which I have the driver, it worked both locally and in the cluster. But obviously for the sake of scalability and modularity I'd like to use the current configuration.

I'm using Spark 2.4.6, the Kafka Streaming API are "spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that I'm currently using is kafka_2.11-2.4.1

The result is the following:

020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 172.31.69.185, executor 0): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on 172.31.69.185, executor 0: java.nio.channels.ClosedChannelException (null) [duplicate 1]
2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on 172.31.69.185, executor 0: java.nio.channels.ClosedChannelException (null) [duplicate 2]
2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time 1595584106000 ms
2020-07-24 09:48:26,375 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.31.79.221:44371 (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,022 INFO scheduler.JobScheduler: Added jobs for time 1595584107000 ms
2020-07-24 09:48:27,165 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on 172.31.79.221, executor 1: java.nio.channels.ClosedChannelException (null) [duplicate 3]
2020-07-24 09:48:27,167 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
2020-07-24 09:48:27,171 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled
2020-07-24 09:48:27,174 INFO scheduler.DAGScheduler: ResultStage 0 (runJob at PythonRDD.scala:153) failed in 2.943 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Driver stacktrace:
2020-07-24 09:48:27,179 INFO scheduler.DAGScheduler: Job 0 failed: runJob at PythonRDD.scala:153, took 3.010820 s
2020-07-24 09:48:27,190 INFO scheduler.JobScheduler: Finished job streaming job 1595584104000 ms.0 from job set of time 1595584104000 ms
2020-07-24 09:48:27,191 INFO scheduler.JobScheduler: Starting job streaming job 1595584105000 ms.0 from job set of time 1595584105000 ms
2020-07-24 09:48:27,193 ERROR scheduler.JobScheduler: Error running job streaming job 1595584104000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 173, in takeAndPrint
    taken = rdd.take(num + 1)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1360, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/context.py", line 1069, in runJob
    sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)


at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-24 09:48:27,211 INFO spark.SparkContext: Starting job: runJob at PythonRDD.scala:153
2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Got job 1 (runJob at PythonRDD.scala:153) with 1 output partitions
2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (runJob at PythonRDD.scala:153)
2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Parents of final stage: List()
2020-07-24 09:48:27,216 INFO scheduler.DAGScheduler: Missing parents: List()
2020-07-24 09:48:27,216 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:53), which has no missing parents
2020-07-24 09:48:27,220 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.9 KB, free 366.3 MB)
2020-07-24 09:48:27,223 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.0 KB, free 366.3 MB)
2020-07-24 09:48:27,225 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-172-31-69-46.ec2.internal:41579 (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,226 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1163
2020-07-24 09:48:27,227 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:53) (first 15 tasks are for partitions Vector(0))
2020-07-24 09:48:27,229 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
2020-07-24 09:48:27,230 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 4, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:27,248 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.31.69.185:44675 (size: 4.0 KB, free: 366.3 MB)
Traceback (most recent call last):
  File "/home/ubuntu/./prova2.py", line 22, in <module>
    ssc.awaitTermination()
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 192, in awaitTermination
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
2020-07-24 09:48:27,315 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 4) in 85 ms on 172.31.69.185 (executor 0) (1/1)
2020-07-24 09:48:27,316 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
2020-07-24 09:48:27,321 INFO python.PythonAccumulatorV2: Connected to AccumulatorServer at host: 127.0.0.1 port: 34673
2020-07-24 09:48:27,324 INFO scheduler.DAGScheduler: ResultStage 1 (runJob at PythonRDD.scala:153) finished in 0.106 s
2020-07-24 09:48:27,325 INFO scheduler.DAGScheduler: Job 1 finished: runJob at PythonRDD.scala:153, took 0.113169 s
2020-07-24 09:48:27,448 INFO spark.ContextCleaner: Cleaned accumulator 20
2020-07-24 09:48:27,448 INFO spark.ContextCleaner: Cleaned accumulator 13
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 3
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 8
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 7
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 10
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 4
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 6
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 11
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 5
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 25
py4j.protocol.Py4JJavaError: An error occurred while calling o23.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 173, in takeAndPrint
    taken = rdd.take(num + 1)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1360, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/context.py", line 1069, in runJob
    sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)


at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

2020-07-24 09:48:27,475 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on ip-172-31-69-46.ec2.internal:41579 in memory (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,477 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 172.31.69.185:44675 in memory (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 15
2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 23
2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 21
2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 12
2020-07-24 09:48:27,511 INFO spark.ContextCleaner: Cleaned accumulator 2
2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 18
2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 1
2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 14
2020-07-24 09:48:27,514 INFO streaming.StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
2020-07-24 09:48:27,511 INFO scheduler.JobScheduler: Finished job streaming job 1595584105000 ms.0 from job set of time 1595584105000 ms
2020-07-24 09:48:27,523 INFO scheduler.ReceiverTracker: ReceiverTracker stopped
2020-07-24 09:48:27,523 INFO scheduler.JobGenerator: Stopping JobGenerator immediately
2020-07-24 09:48:27,524 INFO scheduler.JobScheduler: Starting job streaming job 1595584106000 ms.0 from job set of time 1595584106000 ms
2020-07-24 09:48:27,527 INFO scheduler.JobScheduler: Finished job streaming job 1595584106000 ms.0 from job set of time 1595584106000 ms
2020-07-24 09:48:27,528 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 172.31.69.185:44675 in memory (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,529 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 172.31.79.221:44371 in memory (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,530 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on ip-172-31-69-46.ec2.internal:41579 in memory (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,531 INFO scheduler.JobScheduler: Starting job streaming job 1595584107000 ms.0 from job set of time 1595584107000 ms
2020-07-24 09:48:27,532 INFO scheduler.JobScheduler: Finished job streaming job 1595584107000 ms.0 from job set of time 1595584107000 ms
2020-07-24 09:48:27,532 ERROR scheduler.JobScheduler: Error running job streaming job 1595584105000 ms.0
py4j.Py4JException: Error while sending a command.
at py4j.CallbackClient.sendCommand(CallbackClient.java:397)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy18.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: py4j.Py4JNetworkException: Error while sending a command: null response: c
p2
call
L1595584105000
lo96
e

at py4j.CallbackConnection.sendCommand(CallbackConnection.java:158)
at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
... 24 more
2020-07-24 09:48:27,534 ERROR scheduler.JobScheduler: Error running job streaming job 1595584106000 ms.0
py4j.Py4JException: Error while sending a command.
at py4j.CallbackClient.sendCommand(CallbackClient.java:397)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy18.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: py4j.Py4JNetworkException: Error while sending a command: null response: c
p2
call
L1595584106000
lo113
e

at py4j.CallbackConnection.sendCommand(CallbackConnection.java:158)
at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
... 24 more
2020-07-24 09:48:27,535 ERROR scheduler.JobScheduler: Error running job streaming job 1595584107000 ms.0
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy18.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-24 09:48:27,534 INFO util.RecurringTimer: Stopped timer for JobGenerator after time 1595584107000
2020-07-24 09:48:27,540 ERROR python.PythonDStream$$anon$1: Cannot connect to Python process. It's probably dead. Stopping StreamingContext.
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy18.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-24 09:48:27,544 INFO scheduler.JobGenerator: Stopped JobGenerator
2020-07-24 09:48:27,551 INFO scheduler.JobScheduler: Stopped JobScheduler
2020-07-24 09:48:27,556 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6ad44bd5{/streaming,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,557 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@68ac401f{/streaming/json,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 16
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 9
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 19
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 22
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 17
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 24
2020-07-24 09:48:27,559 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@11792245{/streaming/batch,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,559 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2e5d35e4{/streaming/batch/json,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,560 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2dedce2c{/static/streaming,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,562 INFO streaming.StreamingContext: StreamingContext stopped successfully
2020-07-24 09:48:27,562 WARN streaming.StreamingContext: StreamingContext has already been stopped
2020-07-24 09:48:27,562 INFO spark.SparkContext: Invoking stop() from shutdown hook
2020-07-24 09:48:27,569 INFO server.AbstractConnector: Stopped Spark@4c8c11ce{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2020-07-24 09:48:27,572 INFO ui.SparkUI: Stopped Spark web UI at http://ip-172-31-69-46.ec2.internal:4040
2020-07-24 09:48:27,575 INFO cluster.StandaloneSchedulerBackend: Shutting down all executors
2020-07-24 09:48:27,576 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
2020-07-24 09:48:27,592 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
2020-07-24 09:48:27,600 INFO memory.MemoryStore: MemoryStore cleared
2020-07-24 09:48:27,600 INFO storage.BlockManager: BlockManager stopped
2020-07-24 09:48:27,604 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
2020-07-24 09:48:27,607 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
2020-07-24 09:48:27,613 INFO spark.SparkContext: Successfully stopped SparkContext
2020-07-24 09:48:27,614 INFO util.ShutdownHookManager: Shutdown hook called
2020-07-24 09:48:27,615 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-85b5d4dc-0524-4148-aff4-8a77deb6ccca
2020-07-24 09:48:27,617 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-49e4b426-544b-4cb4-a3e2-c0d98985e7b7
2020-07-24 09:48:27,620 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-49e4b426-544b-4cb4-a3e2-c0d98985e7b7/pyspark-c0db4268-2afd-4336-bf77-9dc7257213d2
I debug everything but I don't have any idea about how to solve this problem. Do you have any suggestion? It could be a Kafka configuration problem?
Thanks in advance,

Davide