Structured streaming: Tried to fetch $offset but the returned record offset was ${record.offset}"

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured streaming: Tried to fetch $offset but the returned record offset was ${record.offset}"

ARAVIND SETHURATHNAM

Hi,

We have several structured streaming jobs (spark version 2.2.0) consuming from kafka and writing to s3. They were running fine for a month, since yesterday few jobs started failing and I see the below exception in the failed jobs  log,

 

```Tried to fetch 473151075 but the returned record offset was 473151072```
```GScheduler: ResultStage 0 (start at SparkStreamingTask.java:222) failed in 77.546 s due to Job aborted due to stage failure: Task 86 in stage 0.0 failed 4 times, most recent failure: Lost task 86.3 in stage 0.0 (TID 96, ip-10-120-12-52.ec2.internal, executor 11): java.lang.IllegalStateException: Tried to fetch 473151075 but the returned record offset was 473151072
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.fetchData(CachedKafkaConsumer.scala:234)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:158)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:149)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
`

 

can someone provide some direction what could be causing this all of a sudden when consuming from those topics? 

 

regards

https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif

Aravind

 

Reply | Threaded
Open this post in threaded view
|

Re: Structured streaming: Tried to fetch $offset but the returned record offset was ${record.offset}"

Cody Koeninger
Is this possibly related to the recent post on https://issues.apache.org/jira/browse/SPARK-18057 ?

On Mon, Apr 16, 2018 at 11:57 AM, ARAVIND SETHURATHNAM <[hidden email]> wrote:

Hi,

We have several structured streaming jobs (spark version 2.2.0) consuming from kafka and writing to s3. They were running fine for a month, since yesterday few jobs started failing and I see the below exception in the failed jobs  log,

 

```Tried to fetch 473151075 but the returned record offset was 473151072```
```GScheduler: ResultStage 0 (start at SparkStreamingTask.java:222) failed in 77.546 s due to Job aborted due to stage failure: Task 86 in stage 0.0 failed 4 times, most recent failure: Lost task 86.3 in stage 0.0 (TID 96, ip-10-120-12-52.ec2.internal, executor 11): java.lang.IllegalStateException: Tried to fetch 473151075 but the returned record offset was 473151072
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.fetchData(CachedKafkaConsumer.scala:234)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:158)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:149)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
`

 

can someone provide some direction what could be causing this all of a sudden when consuming from those topics? 

 

regards

https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif

Aravind

 


Reply | Threaded
Open this post in threaded view
|

Re: Structured streaming: Tried to fetch $offset but the returned record offset was ${record.offset}"

weand


On 2018/04/17 22:34:25, Cody Koeninger <[hidden email]> wrote:

> Is this possibly related to the recent post on
> https://issues.apache.org/jira/browse/SPARK-18057 ?
>
> On Mon, Apr 16, 2018 at 11:57 AM, ARAVIND SETHURATHNAM <
> [hidden email]> wrote:
>
> > Hi,
> >
> > We have several structured streaming jobs (spark version 2.2.0) consuming
> > from kafka and writing to s3. They were running fine for a month, since
> > yesterday few jobs started failing and I see the below exception in the
> > failed jobs  log,
> >
> >
> >
> > ```Tried to fetch 473151075 but the returned record offset was 473151072```
> > ```GScheduler: ResultStage 0 (start at SparkStreamingTask.java:222) failed
> > in 77.546 s due to Job aborted due to stage failure: Task 86 in stage 0.0
> > failed 4 times, most recent failure: Lost task 86.3 in stage 0.0 (TID 96,
> > ip-10-120-12-52.ec2.internal, executor 11): java.lang.IllegalStateException:
> > Tried to fetch 473151075 but the returned record offset was 473151072
> > at org.apache.spark.sql.kafka010.CachedKafkaConsumer.fetchData(
> > CachedKafkaConsumer.scala:234)
> > at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(
> > CachedKafkaConsumer.scala:106)
> > at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.
> > getNext(KafkaSourceRDD.scala:158)
> > at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.
> > getNext(KafkaSourceRDD.scala:149)
> > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> > at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> > GeneratedIterator.processNext(Unknown Source)
> > at org.apache.spark.sql.execution.BufferedRowIterator.
> > hasNext(BufferedRowIterator.java:43)
> > at org.apache.spark.sql.execution.WholeStageCodegenExec$$
> > anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> > at scala.collection.convert.Wrappers$IteratorWrapper.
> > hasNext(Wrappers.scala:30)
> > `
> >
> >
> >
> > can someone provide some direction what could be causing this all of a
> > sudden when consuming from those topics?
> >
> >
> >
> > regards
> >
> > [image: https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif]
> >
> > Aravind
> >
> >
> >
>

Facing the same issue here with Spark 2.3.1 and Kafka Client 0.10.2.1 (--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,org.apache.kafka:kafka-clients:0.10.2.1).

Spark Code looks interesting here (seems some corner case is not considered yet in spark)
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L295
      } else if (record.offset < offset) {
        // This should not happen. If it does happen, then we probably misunderstand Kafka internal
        // mechanism.
        throw new IllegalStateException(
          s"Tried to fetch $offset but the returned record offset was ${record.offset}")
      }

Two interesting point in our case:

1) We started to consume after a Kafka Disaster where the whole cluster went down (maybe kafka has skipped corrupted messages/offsets?).

2) After recovery our structured streaming jobs were using specific startingOffsets option:
startingOffsets={"our_topic":{"0":58120761,"1":56140499,"2":57353702}}

Then we see two exceptions mentioning two different offsets (57354415 and 58121595), so it seems the Consumer did not stopped immediately, but short afterwards.

Exceptions:
2018-08-16 13:15:18 WARN  TaskSetManager:66 - Lost task 1.0 in stage 22.0 (TID 269, sparkslave-sv-02.avm.de, executor 0): java.lang.IllegalStateException: Tried to fetch 57354416 but the returned record offset was 57354415
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:297)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
        at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
        at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
        at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
        at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152)
        at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:143)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

2018-08-16 13:15:18 WARN  TaskSetManager:66 - Lost task 2.0 in stage 22.0 (TID 270, sparkslave-sv-02.avm.de, executor 0): java.lang.IllegalStateException: Tried to fetch 58121595 but the returned record offset was 58121586
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:297)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
        at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
        at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
        at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
        at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152)
        at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:143)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]