How to preserve event order per key in Structured Streaming Repartitioning By Key?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to preserve event order per key in Structured Streaming Repartitioning By Key?

pmatpadi
I want to write a structured spark streaming Kafka consumer which reads data
from a one partition Kafka topic, repartitions the incoming data by "key" to
3 spark partitions while keeping the messages ordered per key, and writes
them to another Kafka topic with 3 partitions.

I used Dataframe.repartition(3, $"key") which I believe uses
HashPartitioner.

When I executed the query with fixed-batch interval trigger type, I visually
verified the output messages were in the expected order. My assumption is
that order is not guaranteed on the resulting partition. I am looking to
receive some affirmation or veto on my assumption in terms of code pointers
in the spark code repo or documentation.

I also tried using Dataframe.sortWithinPartitions, however this does not
seem to be supported on streaming data frame without aggregation.

One option I tried was to convert the Dataframe to RDD and apply
repartitionAndSortWithinPartitions which repartitions the RDD according to
the given partitioner and, within each resulting partition, sort records by
their keys. In this case however, I cannot use the resulting RDD in the
query.writestream operation to write the result in the output Kafka topic.

1. Is there a data frame repartitioning API that helps sort the
repartitioned data in the streaming context?
2. Are there any other alternatives?
3. Does the default trigger type or fixed-interval trigger type for
micro-batch execution provide any sort of message ordering guarantees?
4. Is there any ordering possible in the Continuous trigger type?

Incoming data:

<http://apache-spark-user-list.1001560.n3.nabble.com/file/t9734/TEVcw.png>
Code:

case class KVOutput(key: String, ts: Long, value: String, spark_partition:
Int)

val df = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers.get)
  .option("subscribe", Array(kafkaInputTopic.get).mkString(","))
  .option("maxOffsetsPerTrigger",30)
  .load()

val inputDf = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
val resDf = inputDf.repartition(3, $"key")
  .select(from_json($"value", schema).as("kv"))
  .selectExpr("kv.key", "kv.ts", "kv.value")
  .withColumn("spark_partition", spark_partition_id())
  .select($"key", $"ts", $"value", $"spark_partition").as[KVOutput]
  .sortWithinPartitions($"ts", $"value")
  .select($"key".cast(StringType).as("key"),
to_json(struct($"*")).cast(StringType).as("value"))

val query = resDf.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers.get)
  .option("topic", kafkaOutputTopic.get)
  .option("checkpointLocation", checkpointLocation.get)
  .start()

Error:

When I submit this application, it fails with
<http://apache-spark-user-list.1001560.n3.nabble.com/file/t9734/gXWvM.png>



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to preserve event order per key in Structured Streaming Repartitioning By Key?

Holden Karau
So it's been awhile since I poked at the streaming code base, but I don't think we make an promises about stable sort during repartition, and there's notes in there about how some of these components should be re-written into core so even if we did have stable sort I wouldn't depend on it unless it was in the docs as promise (implementations details can and will change). It's possible I've just missed something in the docs though.


One possible solution I thought of initial but requires complete output mode, would be rather than using a hash partitioner use an range partitioner of the primary key you care about with the second attribute you want to keep in order (although you could get a split on a the primary key that way between partions). Then you can apply a "global" sort which if it matches should not have to do a second shuffle.

A kind of ugly approach that I think would work would be to first add partion indexes to the elements, then re-partion, then do a groupBy + custom UDAF which ensure the order within the partion. This is a little ugly but doesn't depend too much on implementation details. You'd do the aggregate on a window the same size of your input window and no waiting for late records.

That being said, while we don't support global sort operations on append or update updates for fairly clear reasons, it seems like it might be reasonable to relax this and support sorting within partitions (e.g. non-global) but that will require a code change and we can take that discussion to the dev@ list.

On Mon, Dec 3, 2018 at 2:22 PM pmatpadi <[hidden email]> wrote:
I want to write a structured spark streaming Kafka consumer which reads data
from a one partition Kafka topic, repartitions the incoming data by "key" to
3 spark partitions while keeping the messages ordered per key, and writes
them to another Kafka topic with 3 partitions.

I used Dataframe.repartition(3, $"key") which I believe uses
HashPartitioner.

When I executed the query with fixed-batch interval trigger type, I visually
verified the output messages were in the expected order. My assumption is
that order is not guaranteed on the resulting partition. I am looking to
receive some affirmation or veto on my assumption in terms of code pointers
in the spark code repo or documentation.

I also tried using Dataframe.sortWithinPartitions, however this does not
seem to be supported on streaming data frame without aggregation.

One option I tried was to convert the Dataframe to RDD and apply
repartitionAndSortWithinPartitions which repartitions the RDD according to
the given partitioner and, within each resulting partition, sort records by
their keys. In this case however, I cannot use the resulting RDD in the
query.writestream operation to write the result in the output Kafka topic.

1. Is there a data frame repartitioning API that helps sort the
repartitioned data in the streaming context?
2. Are there any other alternatives?
3. Does the default trigger type or fixed-interval trigger type for
micro-batch execution provide any sort of message ordering guarantees?
4. Is there any ordering possible in the Continuous trigger type?

Incoming data:

<http://apache-spark-user-list.1001560.n3.nabble.com/file/t9734/TEVcw.png>
Code:

case class KVOutput(key: String, ts: Long, value: String, spark_partition:
Int)

val df = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers.get)
  .option("subscribe", Array(kafkaInputTopic.get).mkString(","))
  .option("maxOffsetsPerTrigger",30)
  .load()

val inputDf = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
val resDf = inputDf.repartition(3, $"key")
  .select(from_json($"value", schema).as("kv"))
  .selectExpr("kv.key", "kv.ts", "kv.value")
  .withColumn("spark_partition", spark_partition_id())
  .select($"key", $"ts", $"value", $"spark_partition").as[KVOutput]
  .sortWithinPartitions($"ts", $"value")
  .select($"key".cast(StringType).as("key"),
to_json(struct($"*")).cast(StringType).as("value"))

val query = resDf.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers.get)
  .option("topic", kafkaOutputTopic.get)
  .option("checkpointLocation", checkpointLocation.get)
  .start()

Error:

When I submit this application, it fails with
<http://apache-spark-user-list.1001560.n3.nabble.com/file/t9734/gXWvM.png>



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9