Problem in Spark-Kafka Connector

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem in Spark-Kafka Connector

skmishra
Hi,

I am trying to connect my Spark cluster to a single Kafka Topic which running as a separate process in a machine. While submitting the spark application, I am getting the following error.



17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar, byteCount=186935315, body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar, offset=0, length=186935315}} to /129.82.44.156:55168; closing connection
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
17/12/25 16:56:57 INFO TaskSetManager: Starting task 21.0 in stage 0.0 (TID 21, 129.82.44.156, executor 9, partition 21, PROCESS_LOCAL, 4706 bytes)
17/12/25 16:56:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 129.82.44.156, executor 9): java.nio.channels.ClosedChannelException
at org.apache.spark.network.client.StreamInterceptor.channelInactive(StreamInterceptor.java:60)
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:179)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)

17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar, byteCount=186935315, body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar, offset=0, length=186935315}} to /129.82.44.164:45988; closing connection
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar, byteCount=186935315, body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar, offset=0, length=186935315}} to /129.82.44.142:56136; closing connection



I looked over the web and I found only the following relevant link "https://stackoverflow.com/questions/29781489/apache-spark-network-errors-between-executors?noredirect=1&lq=1". I tried with the suggestion given in the discussion as below.


val conf = new SparkConf().setAppName("KafkaInput").set("spark.shuffle.blockTransferService", "nio")


But still it does not work. I am using "spark-2.2.0-bin-hadoop2.7" version of spark. Please help me with this issue and let me know if you need any other information from my side.



Thanks and Regards,
Sitakanta Mishra
Reply | Threaded
Open this post in threaded view
|

Re: Problem in Spark-Kafka Connector

skmishra
Hi,

Kindly help me with this problem, for which I will be grateful.

Thanks and Regards,
Sitakanta Mishra

On Tue, Dec 26, 2017 at 12:34 PM, Sitakant Mishra <[hidden email]> wrote:
Hi,

I am trying to connect my Spark cluster to a single Kafka Topic which running as a separate process in a machine. While submitting the spark application, I am getting the following error.



17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar, byteCount=186935315, body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar, offset=0, length=186935315}} to /129.82.44.156:55168; closing connection
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
17/12/25 16:56:57 INFO TaskSetManager: Starting task 21.0 in stage 0.0 (TID 21, 129.82.44.156, executor 9, partition 21, PROCESS_LOCAL, 4706 bytes)
17/12/25 16:56:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 129.82.44.156, executor 9): java.nio.channels.ClosedChannelException
at org.apache.spark.network.client.StreamInterceptor.channelInactive(StreamInterceptor.java:60)
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:179)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)

17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar, byteCount=186935315, body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar, offset=0, length=186935315}} to /129.82.44.164:45988; closing connection
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar, byteCount=186935315, body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar, offset=0, length=186935315}} to /129.82.44.142:56136; closing connection



I looked over the web and I found only the following relevant link "https://stackoverflow.com/questions/29781489/apache-spark-network-errors-between-executors?noredirect=1&lq=1". I tried with the suggestion given in the discussion as below.


val conf = new SparkConf().setAppName("KafkaInput").set("spark.shuffle.blockTransferService", "nio")


But still it does not work. I am using "spark-2.2.0-bin-hadoop2.7" version of spark. Please help me with this issue and let me know if you need any other information from my side.



Thanks and Regards,
Sitakanta Mishra