[Structured Streaming] Join stream of readings with collection of previous related readings

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[Structured Streaming] Join stream of readings with collection of previous related readings

nathan.brinks
Working on a POC to determine if Spark is a good fit for our use case.

I have a stream of readings from IoT devices that are being published to a
Kafka topic. I need to consume those readings along with recently consumed
readings for the same device to determine the quality of the current
reading. For example, is the current reading changing too fast (ie. did it
change more than three times the standard deviation of the last 20
readings)? Another example might be, for the last 2 hours has there been at
least a minimal amount of variation in the reading, or is it "stuck"?

These quality checks are pretty specific and don't fit well into the typical
SQL paradigm as far as I can tell so I am planning to need some amount of
UDFs and/or UDAFs to accomplish this. The part I am getting stuck on though
is getting ahold of the previous related readings so I have all the
information I need to perform the quality evaluation. I am basically
attempting to enrich a stream of readings with quality analysis. That
analysis is dependent on previous readings.

*Does this seem like a good fit for Spark or should I be looking at
alternatives?*
*Does it seem possible to do this with pyspark or should I be looking at
Java/Scala to get access to mapGroupsWithState/flatMapGroupsWithState?*

Thank you so much for your help!

---------
/If you want to read more....../

*Reference:*

    schema = StructType() \
        .add("id", StringType()) \
        .add("readingValue", IntegerType()) \
        .add("readingTime", TimestampType()) \
        .add("uid", StringType())

    dfRaw = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrapServers) \
        .option(subscribeType, topics) \
        .load()

    df = dfRaw \
        .select(from_json(col("value").cast("string"),
schema).alias("parsedValue")) \
        .withColumn("eventTime", col("parsedValue.readingTime"))

    dfWithWatermark = df.withWatermark("eventTime", "10
seconds").alias("current")

*A few things I have tried:*

Join the dataframe to itself using a fixed interval


    dfSelfJoined = dfWithWatermark.alias("leftDf").join(
        dfWithWatermark.alias("rightDf"),
        expr("""
        leftDf.parsedValue.id = rightDf.parsedValue.id AND
        leftDf.parsedValue.uid != rightDf.parsedValue.uid AND
        leftDf.eventTime >= rightDf.eventTime AND
        rightDf.eventTime >= leftDf.eventTime - interval 10 minutes
        """)
    )

This does what you might think. It outputs a row for each match between the
current reading and any readings within the last 10 minutes of that reading.
The only problem is I don't want a row for each match I want them
aggregated...

    dfSelfJoined \
        .groupBy("leftDf.parsedValue.uid", "leftDf.eventTime") \
        .agg(collect_list(col("rightDf.parsedValue"))) \
        .writeStream \
        .outputMode('append') \
        .format('console') \
        .option('truncate', 'false') \
        .start()

This aggregates them but there is no output until, I assume, we have crossed
the watermark. I suppose this is expected. Is there any harm in setting the
watermark to be 0 second so it is immediately appended? Do I have any
guarantee that there will be no late data if all the data is published to
Kafka with the same key, in order?

Join the dataframe to an aggregated stream of recent readings

   
    dfRecent = dfWithWatermark \
        .groupBy(window("eventTime", "10 minutes", "5 minutes"),
"parsedValue.id") \
        .agg(max(col("parsedValue.readingTime")).alias("maxTime"),
             collect_list(col("parsedValue")).alias("values")
             ).alias("recent") \
        .withColumn("secondsFromStart", col("maxTime").cast("Long") -
col("window.start").cast("Long")) \
        .withColumn("secondsToEnd", col("window.end").cast("Long") -
col("maxTime").cast("Long")) \
        .filter(col("secondsFromStart") >= col("secondsToEnd"))

    dfWithWatermark \
        .join(dfRecent, expr("current.parsedValue.id == recent.id")) \
        .writeStream \
        .outputMode('append') \
        .format('console') \
        .option('truncate', 'false') \
        .start()

This makes sense in my head :-) but nothing is ever output. Reading the docs
some more I concluded that joining a streaming aggregation to another stream
is not supported.


I have been investigating this for a few days now and would greatly
appreciate another opinion.

Thanks for reading!

-Nathan



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Join stream of readings with collection of previous related readings

Lalwani, Jayesh
Append mode will wait till watermark expires https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
This is because it append mode doesn't output rows until a row is finalized. Thereotically, data for any row can appear anytime as long as it's in the watermark.

I don't know if the following has changed. It's been 2 years since I worked with Streaming: For Structured streaming, "current time" is defined as the event time of the latest records. It's not the wall clock. So, you need one more event to close a window. What I mean is, let's say you have window of 10 minutes, and watermark of 10 seconds; you received event with a time stamp of 12:00, 12:02 and 12:08. These 3 events land in the window 12:00-12:10. The window won't close until you get an event at 12:11. It doesn't matter what the clock says. If the last event is at 12:08, Spark think it's 12:08 and the window stays open.

This is not a problem for applications that have a steady stream of events. However, this is something that tripped us up during testing, because we were manually adding events to the queue. This might be a problem for applications that are low TPS or have lulls in events.


´╗┐On 11/9/20, 11:30 AM, "nathan.brinks" <[hidden email]> wrote:

    CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



    Working on a POC to determine if Spark is a good fit for our use case.

    I have a stream of readings from IoT devices that are being published to a
    Kafka topic. I need to consume those readings along with recently consumed
    readings for the same device to determine the quality of the current
    reading. For example, is the current reading changing too fast (ie. did it
    change more than three times the standard deviation of the last 20
    readings)? Another example might be, for the last 2 hours has there been at
    least a minimal amount of variation in the reading, or is it "stuck"?

    These quality checks are pretty specific and don't fit well into the typical
    SQL paradigm as far as I can tell so I am planning to need some amount of
    UDFs and/or UDAFs to accomplish this. The part I am getting stuck on though
    is getting ahold of the previous related readings so I have all the
    information I need to perform the quality evaluation. I am basically
    attempting to enrich a stream of readings with quality analysis. That
    analysis is dependent on previous readings.

    *Does this seem like a good fit for Spark or should I be looking at
    alternatives?*
    *Does it seem possible to do this with pyspark or should I be looking at
    Java/Scala to get access to mapGroupsWithState/flatMapGroupsWithState?*

    Thank you so much for your help!

    ---------
    /If you want to read more....../

    *Reference:*

        schema = StructType() \
            .add("id", StringType()) \
            .add("readingValue", IntegerType()) \
            .add("readingTime", TimestampType()) \
            .add("uid", StringType())

        dfRaw = spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", bootstrapServers) \
            .option(subscribeType, topics) \
            .load()

        df = dfRaw \
            .select(from_json(col("value").cast("string"),
    schema).alias("parsedValue")) \
            .withColumn("eventTime", col("parsedValue.readingTime"))

        dfWithWatermark = df.withWatermark("eventTime", "10
    seconds").alias("current")

    *A few things I have tried:*

    Join the dataframe to itself using a fixed interval


        dfSelfJoined = dfWithWatermark.alias("leftDf").join(
            dfWithWatermark.alias("rightDf"),
            expr("""
            leftDf.parsedValue.id = rightDf.parsedValue.id AND
            leftDf.parsedValue.uid != rightDf.parsedValue.uid AND
            leftDf.eventTime >= rightDf.eventTime AND
            rightDf.eventTime >= leftDf.eventTime - interval 10 minutes
            """)
        )

    This does what you might think. It outputs a row for each match between the
    current reading and any readings within the last 10 minutes of that reading.
    The only problem is I don't want a row for each match I want them
    aggregated...

        dfSelfJoined \
            .groupBy("leftDf.parsedValue.uid", "leftDf.eventTime") \
            .agg(collect_list(col("rightDf.parsedValue"))) \
            .writeStream \
            .outputMode('append') \
            .format('console') \
            .option('truncate', 'false') \
            .start()

    This aggregates them but there is no output until, I assume, we have crossed
    the watermark. I suppose this is expected. Is there any harm in setting the
    watermark to be 0 second so it is immediately appended? Do I have any
    guarantee that there will be no late data if all the data is published to
    Kafka with the same key, in order?

    Join the dataframe to an aggregated stream of recent readings


        dfRecent = dfWithWatermark \
            .groupBy(window("eventTime", "10 minutes", "5 minutes"),
    "parsedValue.id") \
            .agg(max(col("parsedValue.readingTime")).alias("maxTime"),
                 collect_list(col("parsedValue")).alias("values")
                 ).alias("recent") \
            .withColumn("secondsFromStart", col("maxTime").cast("Long") -
    col("window.start").cast("Long")) \
            .withColumn("secondsToEnd", col("window.end").cast("Long") -
    col("maxTime").cast("Long")) \
            .filter(col("secondsFromStart") >= col("secondsToEnd"))

        dfWithWatermark \
            .join(dfRecent, expr("current.parsedValue.id == recent.id")) \
            .writeStream \
            .outputMode('append') \
            .format('console') \
            .option('truncate', 'false') \
            .start()

    This makes sense in my head :-) but nothing is ever output. Reading the docs
    some more I concluded that joining a streaming aggregation to another stream
    is not supported.


    I have been investigating this for a few days now and would greatly
    appreciate another opinion.

    Thanks for reading!

    -Nathan



    --
    Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

    ---------------------------------------------------------------------
    To unsubscribe e-mail: [hidden email]



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]