Structured Streaming + Kafka Integration unable to read new messages after sometimes

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Structured Streaming + Kafka Integration unable to read new messages after sometimes

NikhilP
This post has NOT been accepted by the mailing list yet.
I working on structured streaming reading the data from kafka and writng it back to kafka.I am facing a weird issue,Sometime the jobs stops reading the data from kafka topics and after deleting the checkpoint directory it starts again reading the kafka topics.

can anybody help me in resolving the issue?



import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._;
import argonaut._;



/**
  * Created by Nikhil.R.Patil on 20-07-2017.
  */

object App {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
                    .appName("StructuredCDNLogParser")
                    .config("es.index.auto.create","true")
                    .config("spark.streaming.backpressure.enabled","true")
                    .config("es.nodes",CommonUtils.HOSTNAME)
                  .getOrCreate()


    import spark.implicits._


    val beat = new StructType().add("hostname",StringType).add("name", StringType).add("version",StringType)
    val userSchema = new StructType().add("@timestamp", StringType).add("beat", beat).add("input_type",StringType).add("message",StringType).add("offset",IntegerType).add("source",StringType).add("type",StringType)
    //val df = spark.readStream.option("sep","\\n").schema(userSchema).json("file:///c://cdnlogssample//")
    val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", CommonUtils.KAFKACLUSTER).option("subscribe", CommonUtils.KAFKATOPICS).option("startingOffsets","latest").option("failOnDataLoss","false").load().selectExpr("CAST(value AS STRING) AS Tuple","CAST(timestamp AS LONG) As Ts")
    if(!df.isStreaming){
      throw new Exception("Unable to Start Steaming")
    }

    val kalfkadf = df.map(rows=>{
      val tuple = rows.getAs[String]("Tuple")
      val message: String = Parse.parseWith(tuple, _.field("message").flatMap(_.string).getOrElse(""), msg => msg)
      val messagetype: String = Parse.parseWith(tuple, _.field("type").flatMap(_.string).getOrElse(""), msg => msg)
      val timestamp = rows.getAs[Long]("Ts")
      (message,messagetype,timestamp)
    }).toDF("message","type","timestamp")

     val flinesMap = kalfkadf.select("message","type","timestamp").filter(rows => {
        val message = rows.getAs[String]("message")
        val mtype = rows.getAs[String]("type")
        if(message!=null && mtype!=null && message!="" && mtype!=""){
          true
        }else{
          false
        }}).flatMap(row =>{
      val message = row.getAs[String]("message")
      val mtype = row.getAs[String]("type")
      val ts = row.getAs[Long]("timestamp")
      CommonUtils.logParser(message,mtype,ts)
    }).toDF("newtimestamp","keystring","datastring")

    val flinesMapDS = flinesMap.select($"newtimestamp".cast("timestamp") as 'timestamp,$"keystring",$"datastring").toDF()
    //val computedStreamDS = flinesMapDS.withWatermark("timestamp",CommonUtils.WATERMARKTIMESTAMP).groupBy(window($"timestamp",CommonUtils.WINDOWTIMESTAMP),$"keystring").agg($"keystring",sum("datastring") as 'count,avg("datastring") as 'avg)
    val computedStreamDS = flinesMap.groupBy($"keystring").agg($"keystring",sum("datastring") as 'count,avg("datastring") as 'avg)

    /* val splitds = computedStreamDS.map(rows => {
      val keystring = rows.getAs[String]("keystring")
      val count = rows.getAs[Double]("count")
      val avg = rows.getAs[Double]("avg")
      CommonUtils.SplitKeyValues(keystring,count,avg)
    }).toDF("esTypeName","timestamp","apptype","hostname","serverlocation","servertype","subtype","value","type")*/

    val stream = computedStreamDS.selectExpr("CAST(keystring AS STRING) AS key","to_json(struct(keystring,count,avg)) AS value").writeStream.format("kafka").option("kafka.bootstrap.servers", CommonUtils.KAFKACLUSTER).option("topic", CommonUtils.KAFKAWRITETOPICS).option("auto.offset.reset","latest").option("checkpointLocation",CommonUtils.CHECKPOINTDIR).outputMode("complete").start()
    /*val stream = computedStreamDS.writeStream.format("console").option("checkpointLocation",CommonUtils.CHECKPOINTDIR).outputMode("append").start()*/

    stream.awaitTermination()

  }
}
Loading...