how spark structrued stream write to kudu

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

how spark structrued stream write to kudu

lk_spark
hi,all:
       I'm using spark 2.4.4 to readstream data from kafka and want to write to kudu 1.7.0 , my code like below :
 
    val kuduContext = new KuduContext("master:7051", spark.sparkContext)
 
    val console = cnew.select("*").as[CstoreNew]
      .writeStream
      .option("checkpointLocation", "/tmp/t3/")
      .trigger(Trigger.Once())
      .foreach(new ForeachWriter[CstoreNew] {
        override def open(partitionId: Long, version: Long): Boolean = {
          true
        }
        override def process(value: CstoreNew): Unit = {
          val spark = SparkSessionSingleton.getInstance(sparkConf)
          val valueDF = Seq(value).toDF()   // GET WRONG
          kuduContext.upsertRows(valueDF, "impala::test.cstore_bury_event_data")
        }
        override def close(errorOrNull: Throwable): Unit = {
        }
      })
    val query = console.start()
    query.awaitTermination()
 
when run to val valueDF = Seq(value).toDF() I got error msg :
Caused by: java.lang.NullPointerException
 at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:228)
 at com.gaojihealth.spark.kafkaconsumer.CstoreNew2KUDU$$anon$1.process(CstoreNew2KUDU.scala:122)
...
 
and  SQLImplicits.scala:228 is :
 
227:   implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
228:        DatasetHolder(_sqlContext.createDataset(s))
229:   }
 
can anyone give me some help?
2019-11-25

lk_spark
Reply | Threaded
Open this post in threaded view
|

Re: how spark structrued stream write to kudu

lk_spark
I found _sqlContext is null , how to resolve it ?
 
2019-11-25
lk_spark

发件人:"lk_spark"<[hidden email]>
发送时间:2019-11-25 16:00
主题:how spark structrued stream write to kudu
收件人:"user.spark"<[hidden email]>
抄送:
 
hi,all:
       I'm using spark 2.4.4 to readstream data from kafka and want to write to kudu 1.7.0 , my code like below :
 
    val kuduContext = new KuduContext("master:7051", spark.sparkContext)
 
    val console = cnew.select("*").as[CstoreNew]
      .writeStream
      .option("checkpointLocation", "/tmp/t3/")
      .trigger(Trigger.Once())
      .foreach(new ForeachWriter[CstoreNew] {
        override def open(partitionId: Long, version: Long): Boolean = {
          true
        }
        override def process(value: CstoreNew): Unit = {
          val spark = SparkSessionSingleton.getInstance(sparkConf)
          val valueDF = Seq(value).toDF()   // GET WRONG
          kuduContext.upsertRows(valueDF, "impala::test.cstore_bury_event_data")
        }
        override def close(errorOrNull: Throwable): Unit = {
        }
      })
    val query = console.start()
    query.awaitTermination()
 
when run to val valueDF = Seq(value).toDF() I got error msg :
Caused by: java.lang.NullPointerException
 at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:228)
 at com.gaojihealth.spark.kafkaconsumer.CstoreNew2KUDU$$anon$1.process(CstoreNew2KUDU.scala:122)
...
 
and  SQLImplicits.scala:228 is :
 
227:   implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
228:        DatasetHolder(_sqlContext.createDataset(s))
229:   }
 
can anyone give me some help?
2019-11-25

lk_spark