spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pmatpadi <pmatp...@gmail.com>
Subject How to preserve event order per key in Structured Streaming Repartitioning By Key?
Date Mon, 03 Dec 2018 22:22:30 GMT
I want to write a structured spark streaming Kafka consumer which reads data
from a one partition Kafka topic, repartitions the incoming data by "key" to
3 spark partitions while keeping the messages ordered per key, and writes
them to another Kafka topic with 3 partitions.

I used Dataframe.repartition(3, $"key") which I believe uses
HashPartitioner. 

When I executed the query with fixed-batch interval trigger type, I visually
verified the output messages were in the expected order. My assumption is
that order is not guaranteed on the resulting partition. I am looking to
receive some affirmation or veto on my assumption in terms of code pointers
in the spark code repo or documentation.

I also tried using Dataframe.sortWithinPartitions, however this does not
seem to be supported on streaming data frame without aggregation.

One option I tried was to convert the Dataframe to RDD and apply
repartitionAndSortWithinPartitions which repartitions the RDD according to
the given partitioner and, within each resulting partition, sort records by
their keys. In this case however, I cannot use the resulting RDD in the
query.writestream operation to write the result in the output Kafka topic.

1. Is there a data frame repartitioning API that helps sort the
repartitioned data in the streaming context?
2. Are there any other alternatives?
3. Does the default trigger type or fixed-interval trigger type for
micro-batch execution provide any sort of message ordering guarantees?
4. Is there any ordering possible in the Continuous trigger type?

Incoming data:

<http://apache-spark-user-list.1001560.n3.nabble.com/file/t9734/TEVcw.png> 
Code:

case class KVOutput(key: String, ts: Long, value: String, spark_partition:
Int)

val df = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers.get)
  .option("subscribe", Array(kafkaInputTopic.get).mkString(","))
  .option("maxOffsetsPerTrigger",30)
  .load()

val inputDf = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
val resDf = inputDf.repartition(3, $"key")
  .select(from_json($"value", schema).as("kv"))
  .selectExpr("kv.key", "kv.ts", "kv.value")
  .withColumn("spark_partition", spark_partition_id())
  .select($"key", $"ts", $"value", $"spark_partition").as[KVOutput]
  .sortWithinPartitions($"ts", $"value")
  .select($"key".cast(StringType).as("key"),
to_json(struct($"*")).cast(StringType).as("value"))

val query = resDf.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers.get)
  .option("topic", kafkaOutputTopic.get)
  .option("checkpointLocation", checkpointLocation.get)
  .start()

Error:

When I submit this application, it fails with
<http://apache-spark-user-list.1001560.n3.nabble.com/file/t9734/gXWvM.png> 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message