Streaming AVRO data in console: java.lang.ArrayIndexOutOfBoundsException

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Streaming AVRO data in console: java.lang.ArrayIndexOutOfBoundsException

dwgw

Hi
I am trying to stream Kafka topic (in AVRO format) in the console and for
that i have loaded the avro data from kafka topic in the data-frame but when
try to stream in the console i am getting following error.

*scala>* val records = spark.
           readStream.
           format("kafka").
           option("kafka.bootstrap.servers", "broker1:9093").
           option("subscribe", "PERSON").      
           option("startingOffsets", "earliest").
           load()

records: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5
more fields]

*scala>* val jsonFormatSchema = new
String(Files.readAllBytes(Paths.get("/home/spark/person.avsc")))

jsonFormatSchema: String =
"{
  "type": "record",
  "name": "Person",
  "namespace": "io.confluent.connect.avro",
  "fields": [
...
...

*scala>* val output =
records.select(from_avro(col("value"),jsonFormatSchema).as("person"))
output: org.apache.spark.sql.DataFrame = [person: struct<SESSION_ID: bigint,
VERSION_STARTSCN: bigint ... 46 more fields>]

*scala>*  .select("icxsession.*")

res15: org.apache.spark.sql.DataFrame = [SESSION_ID: bigint,
VERSION_STARTSCN: bigint ... 46 more fields]

*Error occurs here:*

*scala>* output.writeStream
          .format("console")
          .outputMode("append")
          .start()
          .awaitTermination()

20/08/10 01:14:24 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0
(TID 20, workstation.com, executor 2):
*java.lang.ArrayIndexOutOfBoundsException: 1405994075*
        at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
        at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
        at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
        at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at
org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:50)
        at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
        at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
        at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
        at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

[Stage 5:>                                                          (0 + 1)
/ 1]20/08/10 01:14:25 ERROR scheduler.TaskSetManager: Task 0 in stage 5.0
failed 4 times; aborting job
20/08/10 01:14:25 ERROR v2.WriteToDataSourceV2Exec: Data source writer
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@488b8521
is aborting.
20/08/10 01:14:25 ERROR v2.WriteToDataSourceV2Exec: Data source writer
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@488b8521
aborted.
*20/08/10 01:14:25 ERROR streaming.MicroBatchExecution: Query [id =
5e8ffd55-fb54-45d1-8255-56ba810c1f51, runId =
1b7245de-de96-43e7-98ef-8bc62a6f697e] terminated with error
org.apache.spark.SparkException: Writing job aborted.*
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
        at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
        at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
        at
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2788)
        at
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2788)
        at
org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
        at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:540)
        at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
        at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
        at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task
0.3 in stage 5.0 (TID 23, interns-hdp-lab.infodetics.com, executor 2):
java.lang.ArrayIndexOutOfBoundsException: 1405994075
        at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
        at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
        at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
        at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at
org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:50)
        at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
        at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
        at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
        at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
        at scala.Option.foreach(Option.scala:257)
        at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
        ... 35 more
*Caused by: java.lang.ArrayIndexOutOfBoundsException: 1405994075*
        at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
        at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
        at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
        at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at
org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:50)
        at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
        at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
        at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
        at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Can anyone please guide me how to remove the error
java.lang.ArrayIndexOutOfBoundsException: in the process of writing the
stream in the console ?

Regards




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]