Spark Streaming Kinesis Missing Records

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark Streaming Kinesis Missing Records

Richard Moorhead


 

I have a spark streaming job that reads from several kinesis streams and unions them together in a single streaming context.

val streams = ingestionStreams.map(streamName => {
  KinesisInputDStream.builder.checkpointAppName(s"${jobName}_$streamName")
    .streamName(streamName)
    .streamingContext(ssc)
    .endpointUrl(endpointUrl)
    .regionName(regionName)
    .initialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
    .checkpointInterval(kinesisCheckpointInterval)
    .storageLevel(StorageLevel.MEMORY_ONLY)
    .buildWithMessageHandler(KinesisRecordHandler.recordHandler)
})

import spark.sqlContext.implicits._
ssc.union(streams)
  .checkpoint(batchInterval)
  .foreachRDD(jsonRdd => ...)

I see correct numbers of records within the Spark Streaming tab in the UI. However the number of actual records processed by foreachRDD is less.

Within the executor logs I see many ProvisionedThroughputExceededException however this should be benign in that the KCL should retry those records.

Unfortunately I am not seeing the missing records processed at a later date. Where to look next?


. . . . . . . . . . . . . . . . . . . . . . . . . . .

Richard Moorhead
Software Engineer
[hidden email]

C2FO: The World's Market for Working Capital®

     

The information contained in this message and any attachment may be privileged, confidential, and protected from disclosure. If you are not the intended recipient, or an employee, or agent responsible for delivering this message to the intended recipient, you are hereby notified that any dissemination, distribution, or copying of this communication is strictly prohibited. If you have received this communication in error, please notify us immediately by replying to the message and deleting from your computer.