spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean R. Owen (Jira)" <j...@apache.org>
Subject [jira] [Resolved] (SPARK-29500) Support partition column when writing to Kafka
Date Fri, 25 Oct 2019 13:08:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-29500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Sean R. Owen resolved SPARK-29500.
----------------------------------
    Fix Version/s: 3.0.0
       Resolution: Fixed

Issue resolved by pull request 26153
[https://github.com/apache/spark/pull/26153]

> Support partition column when writing to Kafka
> ----------------------------------------------
>
>                 Key: SPARK-29500
>                 URL: https://issues.apache.org/jira/browse/SPARK-29500
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.4.4, 3.0.0
>            Reporter: Nicola Bova
>            Assignee: Nicola Bova
>            Priority: Major
>              Labels: starter
>             Fix For: 3.0.0
>
>
> When writing to a Kafka topic, `KafkaWriter` does not support selecting the ouput kafka
partition through a DataFrame column.
> While it is possible to configure a custom Kafka Partitioner with 
>  `.option({color:#6a8759}"kafka.partitioner.class"{color}{color:#cc7832}, {color}{color:#6a8759}"my.custom.Partitioner"{color})`,
this is not enough for certain use cases.
> After the introduction of GDPR, it is a common pattern to emit records with unique Kafka
keys, thus allowing to tombstone individual records.
> This strategy implies that the totality of the key information cannot be used to calculate
the topic partition and users need to resort to custom partitioners.
> However, as stated at [https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations],
"Keys/Values are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame
operations to explicitly serialize keys/values into either strings or byte arrays."
> Therefore, a custom partitioner would need to
>  - deserialize the key (or value)
>  - calculate the output partition using a subset of the key (or value) fields
> This is inefficient because it requires an unnecessary deserialization step. It also
makes it impossible to use Spark batch writer to send Kafka tombstones when the partition
is calculated from a subset of the kafka value.
> It would be a nice addition to let the user choose a partition by setting a value in
the "partition" column of the dataframe, as already done for `topic`, `key`, `value`, and
`headers` in `KafkaWriter`, also mirroring the `ProducerRecord` API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message