[Spark streaming] No assigned partition error during seek

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[Spark streaming] No assigned partition error during seek

venks61176
Version: 2.2 with Kafka010

Hi,

We are running spark streaming on AWS and trying to process incoming
messages on Kafka topics. All was well.
Recently we wanted to migrate from 0.8 to 0.11 version of Spark library and
Kafka 0.11 version of server.

With this new version of software we are facing issues with regard to 'No
assignment to partition for a topic and it happens intermittently'. I
construct four DStreams with different group.ids as suggested.

The main source of code thats causing the issue is this one

if (!toSeek.isEmpty) {
      // work around KAFKA-3370 when reset is none
      // poll will throw if no position, i.e. auto offset reset none and no
explicit position
      // but cant seek to a position before poll, because poll is what gets
subscription partitions
      // So, poll, suppress the first exception, then seek
      val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
      val shouldSuppress = aor != null &&
aor.asInstanceOf[String].toUpperCase == "NONE"
      try {
        consumer.poll(0)
      } catch {
        case x: NoOffsetForPartitionException if shouldSuppress =>
          logWarning("Catching NoOffsetForPartitionException since " +
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
KAFKA-3370")
      }
      toSeek.asScala.foreach { case (topicPartition, offset) =>
          *consumer.seek(topicPartition, offset)*
      }
    }

At the start of the job, I also ensure we are supplying all required offsets
correctly

private Map<TopicPartition, Long> getCommittedOffsets(String topic) {
    Map<TopicPartition, Long> offsets = new HashMap<>();
    List<TopicPartition> topicPartitions =
        consumer.partitionsFor(topic).stream().map(partitionInfo ->
            new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()))
            .collect(Collectors.toList());
    Map<TopicPartition, Long> earliestOffsets =
consumer.beginningOffsets(topicPartitions);
    // pick committed offsets
    for (TopicPartition topicAndPartition : topicPartitions) {
      final OffsetAndMetadata committed =
consumer.committed(topicAndPartition);
      Long earliestOffset = earliestOffsets.get(topicAndPartition);
      if (committed != null && committed.offset() > earliestOffset) {
        logger
            .warn(
                "Committed offset found for: {} offset:{} -> Hence adding
committed offset",
                topicAndPartition, committed.offset());
        offsets.put(topicAndPartition, committed.offset());
      } else {
        logger
            .warn(
                "New partition/stale offset found for: {} offset:{} -> Hence
adding earliest offset",
                topicAndPartition, earliestOffset);
        offsets.put(topicAndPartition, earliestOffset);
      }
    }
    return offsets;
  }

The actual stack trace:

Caused by: java.lang.IllegalStateException: No current assignment for
partition genericEvents-1
2017-11-23 10:35:24,677 -    at
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
2017-11-23 10:35:24,677 -    at
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
2017-11-23 10:35:24,677 -    at
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
2017-11-23 10:35:24,678 -    at
org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
2017-11-23 10:35:24,678 -    at
org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
2017-11-23 10:35:24,678 -    at
scala.collection.Iterator$class.foreach(Iterator.scala:893)
2017-11-23 10:35:24,678 -    at
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
2017-11-23 10:35:24,678 -    at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
2017-11-23 10:35:24,678 -    at
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
2017-11-23 10:35:24,678 -    at
org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
2017-11-23 10:35:24,679 -    at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
2017-11-23 10:35:24,679 -    at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
2017-11-23 10:35:24,679 -    at
org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
2017-11-23 10:35:24,679 -    at
org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
2017-11-23 10:35:24,679 -    at
scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
2017-11-23 10:35:24,679 -    at
scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
2017-11-23 10:35:24,679 -    at
scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
2017-11-23 10:35:24,680 -    at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
2017-11-23 10:35:24,680 -    at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
2017-11-23 10:35:24,680 -    at
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
2017-11-23 10:35:24,680 -    at
scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
2017-11-23 10:35:24,680 -    at
scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
2017-11-23 10:35:24,680 -    at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
2017-11-23 10:35:24,680 -    at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
2017-11-23 10:35:24,681 -    at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
2017-11-23 10:35:24,681 -    at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
2017-11-23 10:35:24,681 -    at
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
2017-11-23 10:35:24,681 -    at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Spark streaming] No assigned partition error during seek

Cody Koeninger
You mentioned 0.11 version; the latest version of org.apache.kafka
kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
has an appropriate dependency.

Are you manually depending on a different version of the kafka-clients artifact?

On Fri, Nov 24, 2017 at 7:39 PM, venks61176 <[hidden email]> wrote:

> Version: 2.2 with Kafka010
>
> Hi,
>
> We are running spark streaming on AWS and trying to process incoming
> messages on Kafka topics. All was well.
> Recently we wanted to migrate from 0.8 to 0.11 version of Spark library and
> Kafka 0.11 version of server.
>
> With this new version of software we are facing issues with regard to 'No
> assignment to partition for a topic and it happens intermittently'. I
> construct four DStreams with different group.ids as suggested.
>
> The main source of code thats causing the issue is this one
>
> if (!toSeek.isEmpty) {
>       // work around KAFKA-3370 when reset is none
>       // poll will throw if no position, i.e. auto offset reset none and no
> explicit position
>       // but cant seek to a position before poll, because poll is what gets
> subscription partitions
>       // So, poll, suppress the first exception, then seek
>       val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>       val shouldSuppress = aor != null &&
> aor.asInstanceOf[String].toUpperCase == "NONE"
>       try {
>         consumer.poll(0)
>       } catch {
>         case x: NoOffsetForPartitionException if shouldSuppress =>
>           logWarning("Catching NoOffsetForPartitionException since " +
>             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
> KAFKA-3370")
>       }
>       toSeek.asScala.foreach { case (topicPartition, offset) =>
>           *consumer.seek(topicPartition, offset)*
>       }
>     }
>
> At the start of the job, I also ensure we are supplying all required offsets
> correctly
>
> private Map<TopicPartition, Long> getCommittedOffsets(String topic) {
>     Map<TopicPartition, Long> offsets = new HashMap<>();
>     List<TopicPartition> topicPartitions =
>         consumer.partitionsFor(topic).stream().map(partitionInfo ->
>             new TopicPartition(partitionInfo.topic(),
> partitionInfo.partition()))
>             .collect(Collectors.toList());
>     Map<TopicPartition, Long> earliestOffsets =
> consumer.beginningOffsets(topicPartitions);
>     // pick committed offsets
>     for (TopicPartition topicAndPartition : topicPartitions) {
>       final OffsetAndMetadata committed =
> consumer.committed(topicAndPartition);
>       Long earliestOffset = earliestOffsets.get(topicAndPartition);
>       if (committed != null && committed.offset() > earliestOffset) {
>         logger
>             .warn(
>                 "Committed offset found for: {} offset:{} -> Hence adding
> committed offset",
>                 topicAndPartition, committed.offset());
>         offsets.put(topicAndPartition, committed.offset());
>       } else {
>         logger
>             .warn(
>                 "New partition/stale offset found for: {} offset:{} -> Hence
> adding earliest offset",
>                 topicAndPartition, earliestOffset);
>         offsets.put(topicAndPartition, earliestOffset);
>       }
>     }
>     return offsets;
>   }
>
> The actual stack trace:
>
> Caused by: java.lang.IllegalStateException: No current assignment for
> partition genericEvents-1
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
> 2017-11-23 10:35:24,681 -    at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
> 2017-11-23 10:35:24,681 -    at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> 2017-11-23 10:35:24,681 -    at
> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> 2017-11-23 10:35:24,681 -    at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Spark streaming] No assigned partition error during seek

venks61176
Yes I use latest Kafka clients 0.11 to determine beginning offsets without seek and also I use Kafka offsets commits externally.
I dont find the spark async commit  useful for our needs.

Thanks
Venkat

On Fri, 1 Dec 2017 at 02:39 Cody Koeninger <[hidden email]> wrote:
You mentioned 0.11 version; the latest version of org.apache.kafka
kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
has an appropriate dependency.

Are you manually depending on a different version of the kafka-clients artifact?

On Fri, Nov 24, 2017 at 7:39 PM, venks61176 <[hidden email]> wrote:
> Version: 2.2 with Kafka010
>
> Hi,
>
> We are running spark streaming on AWS and trying to process incoming
> messages on Kafka topics. All was well.
> Recently we wanted to migrate from 0.8 to 0.11 version of Spark library and
> Kafka 0.11 version of server.
>
> With this new version of software we are facing issues with regard to 'No
> assignment to partition for a topic and it happens intermittently'. I
> construct four DStreams with different group.ids as suggested.
>
> The main source of code thats causing the issue is this one
>
> if (!toSeek.isEmpty) {
>       // work around KAFKA-3370 when reset is none
>       // poll will throw if no position, i.e. auto offset reset none and no
> explicit position
>       // but cant seek to a position before poll, because poll is what gets
> subscription partitions
>       // So, poll, suppress the first exception, then seek
>       val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>       val shouldSuppress = aor != null &&
> aor.asInstanceOf[String].toUpperCase == "NONE"
>       try {
>         consumer.poll(0)
>       } catch {
>         case x: NoOffsetForPartitionException if shouldSuppress =>
>           logWarning("Catching NoOffsetForPartitionException since " +
>             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
> KAFKA-3370")
>       }
>       toSeek.asScala.foreach { case (topicPartition, offset) =>
>           *consumer.seek(topicPartition, offset)*
>       }
>     }
>
> At the start of the job, I also ensure we are supplying all required offsets
> correctly
>
> private Map<TopicPartition, Long> getCommittedOffsets(String topic) {
>     Map<TopicPartition, Long> offsets = new HashMap<>();
>     List<TopicPartition> topicPartitions =
>         consumer.partitionsFor(topic).stream().map(partitionInfo ->
>             new TopicPartition(partitionInfo.topic(),
> partitionInfo.partition()))
>             .collect(Collectors.toList());
>     Map<TopicPartition, Long> earliestOffsets =
> consumer.beginningOffsets(topicPartitions);
>     // pick committed offsets
>     for (TopicPartition topicAndPartition : topicPartitions) {
>       final OffsetAndMetadata committed =
> consumer.committed(topicAndPartition);
>       Long earliestOffset = earliestOffsets.get(topicAndPartition);
>       if (committed != null && committed.offset() > earliestOffset) {
>         logger
>             .warn(
>                 "Committed offset found for: {} offset:{} -> Hence adding
> committed offset",
>                 topicAndPartition, committed.offset());
>         offsets.put(topicAndPartition, committed.offset());
>       } else {
>         logger
>             .warn(
>                 "New partition/stale offset found for: {} offset:{} -> Hence
> adding earliest offset",
>                 topicAndPartition, earliestOffset);
>         offsets.put(topicAndPartition, earliestOffset);
>       }
>     }
>     return offsets;
>   }
>
> The actual stack trace:
>
> Caused by: java.lang.IllegalStateException: No current assignment for
> partition genericEvents-1
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
> 2017-11-23 10:35:24,681 -    at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
> 2017-11-23 10:35:24,681 -    at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> 2017-11-23 10:35:24,681 -    at
> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> 2017-11-23 10:35:24,681 -    at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: [Spark streaming] No assigned partition error during seek

venks61176
I notice that 'Do not manually add dependencies on org.apache.kafka artifacts (e.g. kafka-clients). The spark-streaming-kafka-0-10 artifact has the appropriate transitive dependencies already, and different versions may be incompatible in hard to diagnose way' after your query.

Does this imply that we should not be adding kafka clients in our jars?.

Thanks
Venkat

On Fri, 1 Dec 2017 at 06:45 venkat <[hidden email]> wrote:
Yes I use latest Kafka clients 0.11 to determine beginning offsets without seek and also I use Kafka offsets commits externally.
I dont find the spark async commit  useful for our needs.

Thanks
Venkat

On Fri, 1 Dec 2017 at 02:39 Cody Koeninger <[hidden email]> wrote:
You mentioned 0.11 version; the latest version of org.apache.kafka
kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
has an appropriate dependency.

Are you manually depending on a different version of the kafka-clients artifact?

On Fri, Nov 24, 2017 at 7:39 PM, venks61176 <[hidden email]> wrote:
> Version: 2.2 with Kafka010
>
> Hi,
>
> We are running spark streaming on AWS and trying to process incoming
> messages on Kafka topics. All was well.
> Recently we wanted to migrate from 0.8 to 0.11 version of Spark library and
> Kafka 0.11 version of server.
>
> With this new version of software we are facing issues with regard to 'No
> assignment to partition for a topic and it happens intermittently'. I
> construct four DStreams with different group.ids as suggested.
>
> The main source of code thats causing the issue is this one
>
> if (!toSeek.isEmpty) {
>       // work around KAFKA-3370 when reset is none
>       // poll will throw if no position, i.e. auto offset reset none and no
> explicit position
>       // but cant seek to a position before poll, because poll is what gets
> subscription partitions
>       // So, poll, suppress the first exception, then seek
>       val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>       val shouldSuppress = aor != null &&
> aor.asInstanceOf[String].toUpperCase == "NONE"
>       try {
>         consumer.poll(0)
>       } catch {
>         case x: NoOffsetForPartitionException if shouldSuppress =>
>           logWarning("Catching NoOffsetForPartitionException since " +
>             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
> KAFKA-3370")
>       }
>       toSeek.asScala.foreach { case (topicPartition, offset) =>
>           *consumer.seek(topicPartition, offset)*
>       }
>     }
>
> At the start of the job, I also ensure we are supplying all required offsets
> correctly
>
> private Map<TopicPartition, Long> getCommittedOffsets(String topic) {
>     Map<TopicPartition, Long> offsets = new HashMap<>();
>     List<TopicPartition> topicPartitions =
>         consumer.partitionsFor(topic).stream().map(partitionInfo ->
>             new TopicPartition(partitionInfo.topic(),
> partitionInfo.partition()))
>             .collect(Collectors.toList());
>     Map<TopicPartition, Long> earliestOffsets =
> consumer.beginningOffsets(topicPartitions);
>     // pick committed offsets
>     for (TopicPartition topicAndPartition : topicPartitions) {
>       final OffsetAndMetadata committed =
> consumer.committed(topicAndPartition);
>       Long earliestOffset = earliestOffsets.get(topicAndPartition);
>       if (committed != null && committed.offset() > earliestOffset) {
>         logger
>             .warn(
>                 "Committed offset found for: {} offset:{} -> Hence adding
> committed offset",
>                 topicAndPartition, committed.offset());
>         offsets.put(topicAndPartition, committed.offset());
>       } else {
>         logger
>             .warn(
>                 "New partition/stale offset found for: {} offset:{} -> Hence
> adding earliest offset",
>                 topicAndPartition, earliestOffset);
>         offsets.put(topicAndPartition, earliestOffset);
>       }
>     }
>     return offsets;
>   }
>
> The actual stack trace:
>
> Caused by: java.lang.IllegalStateException: No current assignment for
> partition genericEvents-1
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
> 2017-11-23 10:35:24,681 -    at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
> 2017-11-23 10:35:24,681 -    at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> 2017-11-23 10:35:24,681 -    at
> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> 2017-11-23 10:35:24,681 -    at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: [Spark streaming] No assigned partition error during seek

Qiao, Richard

In your case, it looks it’s trying to make 2 versions Kafka existed in the same JVM at runtime. There is version conflict.

 

About “I dont find the spark async commit  useful for our needs”, do you mean to say the code like below?

kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)

 

 

Best Regards

Richard

 

 

From: venkat <[hidden email]>
Date: Thursday, November 30, 2017 at 8:16 PM
To: Cody Koeninger <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: [Spark streaming] No assigned partition error during seek

 

I notice that 'Do not manually add dependencies on org.apache.kafka artifacts (e.g. kafka-clients). The spark-streaming-kafka-0-10 artifact has the appropriate transitive dependencies already, and different versions may be incompatible in hard to diagnose way' after your query.

Does this imply that we should not be adding kafka clients in our jars?.

Thanks

Venkat

 

On Fri, 1 Dec 2017 at 06:45 venkat <[hidden email]> wrote:

Yes I use latest Kafka clients 0.11 to determine beginning offsets without seek and also I use Kafka offsets commits externally.

I dont find the spark async commit  useful for our needs.

Thanks

Venkat

 

On Fri, 1 Dec 2017 at 02:39 Cody Koeninger <[hidden email]> wrote:

You mentioned 0.11 version; the latest version of org.apache.kafka
kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
has an appropriate dependency.

Are you manually depending on a different version of the kafka-clients artifact?

On Fri, Nov 24, 2017 at 7:39 PM, venks61176 <[hidden email]> wrote:
> Version: 2.2 with Kafka010
>
> Hi,
>
> We are running spark streaming on AWS and trying to process incoming
> messages on Kafka topics. All was well.
> Recently we wanted to migrate from 0.8 to 0.11 version of Spark library and
> Kafka 0.11 version of server.
>
> With this new version of software we are facing issues with regard to 'No
> assignment to partition for a topic and it happens intermittently'. I
> construct four DStreams with different group.ids as suggested.
>
> The main source of code thats causing the issue is this one
>
> if (!toSeek.isEmpty) {
>       // work around KAFKA-3370 when reset is none
>       // poll will throw if no position, i.e. auto offset reset none and no
> explicit position
>       // but cant seek to a position before poll, because poll is what gets
> subscription partitions
>       // So, poll, suppress the first exception, then seek
>       val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>       val shouldSuppress = aor != null &&
> aor.asInstanceOf[String].toUpperCase == "NONE"
>       try {
>         consumer.poll(0)
>       } catch {
>         case x: NoOffsetForPartitionException if shouldSuppress =>
>           logWarning("Catching NoOffsetForPartitionException since " +
>             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
> KAFKA-3370")
>       }
>       toSeek.asScala.foreach { case (topicPartition, offset) =>
>           *consumer.seek(topicPartition, offset)*
>       }
>     }
>
> At the start of the job, I also ensure we are supplying all required offsets
> correctly
>
> private Map<TopicPartition, Long> getCommittedOffsets(String topic) {
>     Map<TopicPartition, Long> offsets = new HashMap<>();
>     List<TopicPartition> topicPartitions =
>         consumer.partitionsFor(topic).stream().map(partitionInfo ->
>             new TopicPartition(partitionInfo.topic(),
> partitionInfo.partition()))
>             .collect(Collectors.toList());
>     Map<TopicPartition, Long> earliestOffsets =
> consumer.beginningOffsets(topicPartitions);
>     // pick committed offsets
>     for (TopicPartition topicAndPartition : topicPartitions) {
>       final OffsetAndMetadata committed =
> consumer.committed(topicAndPartition);
>       Long earliestOffset = earliestOffsets.get(topicAndPartition);
>       if (committed != null && committed.offset() > earliestOffset) {
>         logger
>             .warn(
>                 "Committed offset found for: {} offset:{} -> Hence adding
> committed offset",
>                 topicAndPartition, committed.offset());
>         offsets.put(topicAndPartition, committed.offset());
>       } else {
>         logger
>             .warn(
>                 "New partition/stale offset found for: {} offset:{} -> Hence
> adding earliest offset",
>                 topicAndPartition, earliestOffset);
>         offsets.put(topicAndPartition, earliestOffset);
>       }
>     }
>     return offsets;
>   }
>
> The actual stack trace:
>
> Caused by: java.lang.IllegalStateException: No current assignment for
> partition genericEvents-1
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
> 2017-11-23 10:35:24,677 -    at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 2017-11-23 10:35:24,678 -    at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 2017-11-23 10:35:24,678 -    at
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> 2017-11-23 10:35:24,679 -    at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
> 2017-11-23 10:35:24,679 -    at
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
> 2017-11-23 10:35:24,680 -    at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
> 2017-11-23 10:35:24,681 -    at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
> 2017-11-23 10:35:24,681 -    at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> 2017-11-23 10:35:24,681 -    at
> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> 2017-11-23 10:35:24,681 -    at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: [Spark streaming] No assigned partition error during seek

Cody Koeninger
Yeah, don't mix multiple versions of kafka clients.  That's not 100%
certain to be the cause of your problem, but it can't be helping.

As for your comments about async commits, read

https://issues.apache.org/jira/browse/SPARK-22486

and if you think your use case is still relevant to others given those
constraints, then share it.

On Fri, Dec 1, 2017 at 4:11 AM, Qiao, Richard
<[hidden email]> wrote:

> In your case, it looks it’s trying to make 2 versions Kafka existed in the
> same JVM at runtime. There is version conflict.
>
>
>
> About “I dont find the spark async commit  useful for our needs”, do you
> mean to say the code like below?
>
> kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
>
>
>
>
>
> Best Regards
>
> Richard
>
>
>
>
>
> From: venkat <[hidden email]>
> Date: Thursday, November 30, 2017 at 8:16 PM
> To: Cody Koeninger <[hidden email]>
> Cc: "[hidden email]" <[hidden email]>
> Subject: Re: [Spark streaming] No assigned partition error during seek
>
>
>
> I notice that 'Do not manually add dependencies on org.apache.kafka
> artifacts (e.g. kafka-clients). The spark-streaming-kafka-0-10 artifact has
> the appropriate transitive dependencies already, and different versions may
> be incompatible in hard to diagnose way' after your query.
>
> Does this imply that we should not be adding kafka clients in our jars?.
>
> Thanks
>
> Venkat
>
>
>
> On Fri, 1 Dec 2017 at 06:45 venkat <[hidden email]> wrote:
>
> Yes I use latest Kafka clients 0.11 to determine beginning offsets without
> seek and also I use Kafka offsets commits externally.
>
> I dont find the spark async commit  useful for our needs.
>
> Thanks
>
> Venkat
>
>
>
> On Fri, 1 Dec 2017 at 02:39 Cody Koeninger <[hidden email]> wrote:
>
> You mentioned 0.11 version; the latest version of org.apache.kafka
> kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
> has an appropriate dependency.
>
> Are you manually depending on a different version of the kafka-clients
> artifact?
>
> On Fri, Nov 24, 2017 at 7:39 PM, venks61176 <[hidden email]> wrote:
>> Version: 2.2 with Kafka010
>>
>> Hi,
>>
>> We are running spark streaming on AWS and trying to process incoming
>> messages on Kafka topics. All was well.
>> Recently we wanted to migrate from 0.8 to 0.11 version of Spark library
>> and
>> Kafka 0.11 version of server.
>>
>> With this new version of software we are facing issues with regard to 'No
>> assignment to partition for a topic and it happens intermittently'. I
>> construct four DStreams with different group.ids as suggested.
>>
>> The main source of code thats causing the issue is this one
>>
>> if (!toSeek.isEmpty) {
>>       // work around KAFKA-3370 when reset is none
>>       // poll will throw if no position, i.e. auto offset reset none and
>> no
>> explicit position
>>       // but cant seek to a position before poll, because poll is what
>> gets
>> subscription partitions
>>       // So, poll, suppress the first exception, then seek
>>       val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>>       val shouldSuppress = aor != null &&
>> aor.asInstanceOf[String].toUpperCase == "NONE"
>>       try {
>>         consumer.poll(0)
>>       } catch {
>>         case x: NoOffsetForPartitionException if shouldSuppress =>
>>           logWarning("Catching NoOffsetForPartitionException since " +
>>             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
>> KAFKA-3370")
>>       }
>>       toSeek.asScala.foreach { case (topicPartition, offset) =>
>>           *consumer.seek(topicPartition, offset)*
>>       }
>>     }
>>
>> At the start of the job, I also ensure we are supplying all required
>> offsets
>> correctly
>>
>> private Map<TopicPartition, Long> getCommittedOffsets(String topic) {
>>     Map<TopicPartition, Long> offsets = new HashMap<>();
>>     List<TopicPartition> topicPartitions =
>>         consumer.partitionsFor(topic).stream().map(partitionInfo ->
>>             new TopicPartition(partitionInfo.topic(),
>> partitionInfo.partition()))
>>             .collect(Collectors.toList());
>>     Map<TopicPartition, Long> earliestOffsets =
>> consumer.beginningOffsets(topicPartitions);
>>     // pick committed offsets
>>     for (TopicPartition topicAndPartition : topicPartitions) {
>>       final OffsetAndMetadata committed =
>> consumer.committed(topicAndPartition);
>>       Long earliestOffset = earliestOffsets.get(topicAndPartition);
>>       if (committed != null && committed.offset() > earliestOffset) {
>>         logger
>>             .warn(
>>                 "Committed offset found for: {} offset:{} -> Hence adding
>> committed offset",
>>                 topicAndPartition, committed.offset());
>>         offsets.put(topicAndPartition, committed.offset());
>>       } else {
>>         logger
>>             .warn(
>>                 "New partition/stale offset found for: {} offset:{} ->
>> Hence
>> adding earliest offset",
>>                 topicAndPartition, earliestOffset);
>>         offsets.put(topicAndPartition, earliestOffset);
>>       }
>>     }
>>     return offsets;
>>   }
>>
>> The actual stack trace:
>>
>> Caused by: java.lang.IllegalStateException: No current assignment for
>> partition genericEvents-1
>> 2017-11-23 10:35:24,677 -    at
>>
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
>> 2017-11-23 10:35:24,677 -    at
>>
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
>> 2017-11-23 10:35:24,677 -    at
>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
>> 2017-11-23 10:35:24,678 -    at
>>
>> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
>> 2017-11-23 10:35:24,678 -    at
>>
>> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
>> 2017-11-23 10:35:24,678 -    at
>> scala.collection.Iterator$class.foreach(Iterator.scala:893)
>> 2017-11-23 10:35:24,678 -    at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>> 2017-11-23 10:35:24,678 -    at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> 2017-11-23 10:35:24,678 -    at
>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> 2017-11-23 10:35:24,678 -    at
>>
>> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
>> 2017-11-23 10:35:24,679 -    at
>>
>> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
>> 2017-11-23 10:35:24,679 -    at
>>
>> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
>> 2017-11-23 10:35:24,679 -    at
>>
>> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
>> 2017-11-23 10:35:24,679 -    at
>>
>> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
>> 2017-11-23 10:35:24,679 -    at
>>
>> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
>> 2017-11-23 10:35:24,679 -    at
>>
>> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
>> 2017-11-23 10:35:24,679 -    at
>>
>> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
>> 2017-11-23 10:35:24,680 -    at
>>
>> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>> 2017-11-23 10:35:24,680 -    at
>> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>> 2017-11-23 10:35:24,680 -    at
>> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>> 2017-11-23 10:35:24,680 -    at
>> scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>> 2017-11-23 10:35:24,680 -    at
>>
>> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
>> 2017-11-23 10:35:24,680 -    at
>>
>> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
>> 2017-11-23 10:35:24,680 -    at
>>
>> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
>> 2017-11-23 10:35:24,681 -    at
>>
>> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
>> 2017-11-23 10:35:24,681 -    at
>>
>> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>> 2017-11-23 10:35:24,681 -    at
>> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>> 2017-11-23 10:35:24,681 -    at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: [hidden email]
>>
>
>
> ________________________________
>
> The information contained in this e-mail is confidential and/or proprietary
> to Capital One and/or its affiliates and may only be used solely in
> performance of work or services for Capital One. The information transmitted
> herewith is intended only for use by the individual or entity to which it is
> addressed. If the reader of this message is not the intended recipient, you
> are hereby notified that any review, retransmission, dissemination,
> distribution, copying or other use of, or taking of any action in reliance
> upon this information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the material
> from your computer.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]