Spark Streaming fails with unable to get records after polling for 512 ms

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark Streaming fails with unable to get records after polling for 512 ms

jkagitala
Hi,

I'm trying to add spark-streaming to our kafka topic. But, I keep getting
this error
java.lang.AssertionError: assertion failed: Failed to get record after
polling for 512 ms.

I tried to add different params like max.poll.interval.ms,
spark.streaming.kafka.consumer.poll.ms to 10000ms in kafkaParams.
But, i still get failed to get records after 512ms. Not sure, even adding
the above params doesn't change the polling time.

Without spark-streaming, i'm able to fetch the records. Only with
spark-streaming addon, i get this error.

Any help is greatly appreciated. Below, is the code i'm using.

SparkConf sparkConf = new
SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
Durations.seconds(10));

kafkaParams.put("bootstrap.servers", hosts);
kafkaParams.put("group.id", groupid);
kafkaParams.put("auto.commit.enable", false);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", BytesDeserializer.class);
kafkaParams.put("auto.offset.reset", "earliest");
//kafkaParams.put("max.poll.interval.ms", 12000);
//kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000);
//kafkaParams.put("request.timeout.ms", 12000);
               
               
JavaInputDStream<ConsumerRecord&lt;String, List&lt;Bytes>>> messages =
                          KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(),
                                                                     
ConsumerStrategies.Subscribe(topics, kafkaParams));
messages.foreachRDD(rdd -> {
                List<ConsumerRecord&lt;String, List&lt;Bytes>>> input = rdd.collect();
                System.out.println("count is"+input.size());
        });
ssc.start();
ssc.awaitTermination();

Thanks
Jagadish



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming fails with unable to get records after polling for 512 ms

Cody Koeninger
spark.streaming.kafka.consumer.poll.ms  is a spark configuration, not
a kafka parameter.

see http://spark.apache.org/docs/latest/configuration.html

On Tue, Nov 14, 2017 at 8:56 PM, jkagitala <[hidden email]> wrote:

> Hi,
>
> I'm trying to add spark-streaming to our kafka topic. But, I keep getting
> this error
> java.lang.AssertionError: assertion failed: Failed to get record after
> polling for 512 ms.
>
> I tried to add different params like max.poll.interval.ms,
> spark.streaming.kafka.consumer.poll.ms to 10000ms in kafkaParams.
> But, i still get failed to get records after 512ms. Not sure, even adding
> the above params doesn't change the polling time.
>
> Without spark-streaming, i'm able to fetch the records. Only with
> spark-streaming addon, i get this error.
>
> Any help is greatly appreciated. Below, is the code i'm using.
>
> SparkConf sparkConf = new
> SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]");
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(10));
>
> kafkaParams.put("bootstrap.servers", hosts);
> kafkaParams.put("group.id", groupid);
> kafkaParams.put("auto.commit.enable", false);
> kafkaParams.put("key.deserializer", StringDeserializer.class);
> kafkaParams.put("value.deserializer", BytesDeserializer.class);
> kafkaParams.put("auto.offset.reset", "earliest");
> //kafkaParams.put("max.poll.interval.ms", 12000);
> //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000);
> //kafkaParams.put("request.timeout.ms", 12000);
>
>
> JavaInputDStream<ConsumerRecord&lt;String, List&lt;Bytes>>> messages =
>                           KafkaUtils.createDirectStream(ssc,
> LocationStrategies.PreferConsistent(),
>
> ConsumerStrategies.Subscribe(topics, kafkaParams));
> messages.foreachRDD(rdd -> {
>                 List<ConsumerRecord&lt;String, List&lt;Bytes>>> input = rdd.collect();
>                 System.out.println("count is"+input.size());
>         });
> ssc.start();
> ssc.awaitTermination();
>
> Thanks
> Jagadish
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming fails with unable to get records after polling for 512 ms

jkagitala
Hi Cody,

It worked, after moving the parameter to sparkConf. I don't see that error.
But, Now i'm seeing the count for each RDD returns 0. But, there are records in the topic i'm reading.

Do you see anything wrong with how i'm creating the Direct Stream ?

Thanks
Jagadish

On Wed, Nov 15, 2017 at 11:23 AM, Cody Koeninger <[hidden email]> wrote:
spark.streaming.kafka.consumer.poll.ms  is a spark configuration, not
a kafka parameter.

see http://spark.apache.org/docs/latest/configuration.html

On Tue, Nov 14, 2017 at 8:56 PM, jkagitala <[hidden email]> wrote:
> Hi,
>
> I'm trying to add spark-streaming to our kafka topic. But, I keep getting
> this error
> java.lang.AssertionError: assertion failed: Failed to get record after
> polling for 512 ms.
>
> I tried to add different params like max.poll.interval.ms,
> spark.streaming.kafka.consumer.poll.ms to 10000ms in kafkaParams.
> But, i still get failed to get records after 512ms. Not sure, even adding
> the above params doesn't change the polling time.
>
> Without spark-streaming, i'm able to fetch the records. Only with
> spark-streaming addon, i get this error.
>
> Any help is greatly appreciated. Below, is the code i'm using.
>
> SparkConf sparkConf = new
> SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]");
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(10));
>
> kafkaParams.put("bootstrap.servers", hosts);
> kafkaParams.put("group.id", groupid);
> kafkaParams.put("auto.commit.enable", false);
> kafkaParams.put("key.deserializer", StringDeserializer.class);
> kafkaParams.put("value.deserializer", BytesDeserializer.class);
> kafkaParams.put("auto.offset.reset", "earliest");
> //kafkaParams.put("max.poll.interval.ms", 12000);
> //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000);
> //kafkaParams.put("request.timeout.ms", 12000);
>
>
> JavaInputDStream<ConsumerRecord&lt;String, List&lt;Bytes>>> messages =
>                           KafkaUtils.createDirectStream(ssc,
> LocationStrategies.PreferConsistent(),
>
> ConsumerStrategies.Subscribe(topics, kafkaParams));
> messages.foreachRDD(rdd -> {
>                 List<ConsumerRecord&lt;String, List&lt;Bytes>>> input = rdd.collect();
>                 System.out.println("count is"+input.size());
>         });
> ssc.start();
> ssc.awaitTermination();
>
> Thanks
> Jagadish
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming fails with unable to get records after polling for 512 ms

Cody Koeninger
I don't see anything obvious, you'd need to do more troubleshooting.

Could also try creating a single rdd for a known range of offsets:

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-an-rdd

On Wed, Nov 15, 2017 at 9:33 PM, jagadish kagitala <[hidden email]> wrote:

> Hi Cody,
>
> It worked, after moving the parameter to sparkConf. I don't see that error.
> But, Now i'm seeing the count for each RDD returns 0. But, there are records
> in the topic i'm reading.
>
> Do you see anything wrong with how i'm creating the Direct Stream ?
>
> Thanks
> Jagadish
>
> On Wed, Nov 15, 2017 at 11:23 AM, Cody Koeninger <[hidden email]> wrote:
>>
>> spark.streaming.kafka.consumer.poll.ms  is a spark configuration, not
>> a kafka parameter.
>>
>> see http://spark.apache.org/docs/latest/configuration.html
>>
>> On Tue, Nov 14, 2017 at 8:56 PM, jkagitala <[hidden email]> wrote:
>> > Hi,
>> >
>> > I'm trying to add spark-streaming to our kafka topic. But, I keep
>> > getting
>> > this error
>> > java.lang.AssertionError: assertion failed: Failed to get record after
>> > polling for 512 ms.
>> >
>> > I tried to add different params like max.poll.interval.ms,
>> > spark.streaming.kafka.consumer.poll.ms to 10000ms in kafkaParams.
>> > But, i still get failed to get records after 512ms. Not sure, even
>> > adding
>> > the above params doesn't change the polling time.
>> >
>> > Without spark-streaming, i'm able to fetch the records. Only with
>> > spark-streaming addon, i get this error.
>> >
>> > Any help is greatly appreciated. Below, is the code i'm using.
>> >
>> > SparkConf sparkConf = new
>> >
>> > SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]");
>> > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>> > Durations.seconds(10));
>> >
>> > kafkaParams.put("bootstrap.servers", hosts);
>> > kafkaParams.put("group.id", groupid);
>> > kafkaParams.put("auto.commit.enable", false);
>> > kafkaParams.put("key.deserializer", StringDeserializer.class);
>> > kafkaParams.put("value.deserializer", BytesDeserializer.class);
>> > kafkaParams.put("auto.offset.reset", "earliest");
>> > //kafkaParams.put("max.poll.interval.ms", 12000);
>> > //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000);
>> > //kafkaParams.put("request.timeout.ms", 12000);
>> >
>> >
>> > JavaInputDStream<ConsumerRecord&lt;String, List&lt;Bytes>>> messages =
>> >                           KafkaUtils.createDirectStream(ssc,
>> > LocationStrategies.PreferConsistent(),
>> >
>> > ConsumerStrategies.Subscribe(topics, kafkaParams));
>> > messages.foreachRDD(rdd -> {
>> >                 List<ConsumerRecord&lt;String, List&lt;Bytes>>> input =
>> > rdd.collect();
>> >                 System.out.println("count is"+input.size());
>> >         });
>> > ssc.start();
>> > ssc.awaitTermination();
>> >
>> > Thanks
>> > Jagadish
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe e-mail: [hidden email]
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]