Structured Streaming doesn't write checkpoint log when I use coalesce

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured Streaming doesn't write checkpoint log when I use coalesce

WangXiaolong
Hi,

   Lately, I encountered a problem, when I was writing as structured streaming job to write things into opentsdb.
  The write-stream part looks something like 

      outputDs
          .coalesce(14)
          .writeStream
          .outputMode("append")
          .trigger(Trigger.ProcessingTime(s"$triggerSeconds seconds"))
          .option("checkpointLocation",s"$checkpointDir/$appName/tsdb")
          .foreach {
            TsdbWriter(
              tsdbUrl,
              MongoProp(mongoUrl, mongoPort, mongoUser, mongoPassword, mongoDatabase, mongoCollection,mongoAuthenticationDatabase)
            )(createMetricBuilder(tsdbMetricPrefix))
          }
          .start()

    And when I check the checkpoint dir, I discover that the "/checkpoint/state" dir  is empty. I looked into the executor's log and found that the HDFSBackedStateStoreProvider didn't write anything on the checkpoint dir.

   Strange thing is, when I replace the "coalesce" function into "repartition" function, the problem solved. Is there a difference between these two functions when using structured streaming?

  Looking forward to you help, thanks.



 

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming doesn't write checkpoint log when I use coalesce

Jungtaek Lim
Which version do you use? Above app works with Spark 2.3.1, 200 partitions are stored for State.

    val queryStatusFile = conf.queryStatusFile()
    val rateRowPerSecond = conf.rateRowPerSecond()
    val rateRampUpTimeSecond = conf.rateRampUpTimeSecond()

    val ss = SparkSession
      .builder()
      .master("local[3]")
      .appName("state coalesce test")
      .getOrCreate()

    ss.streams.addListener(new QueryListenerWriteProgressToFile(queryStatusFile))

    import ss.implicits._

    val df = ss.readStream
      .format("rate")
      .option("rowsPerSecond", rateRowPerSecond)
      .option("rampUpTime", s"${rateRampUpTimeSecond}s")
      .load()

    df.printSchema()

    val outDf = df.withWatermark("timestamp", "10 seconds")
      .selectExpr(
        "timestamp", "mod(value, 100) as mod", "value",
        BenchmarkQueryHelper.createCaseExprStr(
          "mod(CAST(RANDN(0) * 1000 as INTEGER), 50)", 50, 10) + " as word")
      .groupBy(
        window($"timestamp", "1 minute", "10 seconds"),
        $"mod", $"word")
      .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value"))
      .coalesce(8)

    val query = outDf.writeStream
      .format("memory")
      .option("queryName", "stateCoalesceTest")
      .option("checkpointLocation", "/tmp/state-coalesce-test")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .outputMode(OutputMode.Update())
      .start()

    query.awaitTermination()

-Jungtaek Lim (HeartSaVioR)


2018년 8월 9일 (목) 오후 8:38, WangXiaolong <[hidden email]>님이 작성:
Hi,

   Lately, I encountered a problem, when I was writing as structured streaming job to write things into opentsdb.
  The write-stream part looks something like 

      outputDs
          .coalesce(14)
          .writeStream
          .outputMode("append")
          .trigger(Trigger.ProcessingTime(s"$triggerSeconds seconds"))
          .option("checkpointLocation",s"$checkpointDir/$appName/tsdb")
          .foreach {
            TsdbWriter(
              tsdbUrl,
              MongoProp(mongoUrl, mongoPort, mongoUser, mongoPassword, mongoDatabase, mongoCollection,mongoAuthenticationDatabase)
            )(createMetricBuilder(tsdbMetricPrefix))
          }
          .start()

    And when I check the checkpoint dir, I discover that the "/checkpoint/state" dir  is empty. I looked into the executor's log and found that the HDFSBackedStateStoreProvider didn't write anything on the checkpoint dir.

   Strange thing is, when I replace the "coalesce" function into "repartition" function, the problem solved. Is there a difference between these two functions when using structured streaming?

  Looking forward to you help, thanks.