spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <>
Subject Re: MapWithState partitioning
Date Mon, 31 Oct 2016 14:55:37 GMT
You may know that those streams share the same keys, but Spark doesn't
unless you tell it.

mapWithState takes a StateSpec, which should allow you to specify a partitioner.

On Mon, Oct 31, 2016 at 9:40 AM, Andrii Biletskyi <> wrote:
> Thanks for response,
> So as I understand there is no way to "tell" mapWithState leave the
> partitioning schema as any other transformation would normally do.
> Then I would like to clarify if there is a simple way to do a transformation
> to a key-value stream and specify somehow the Partitioner that effectively
> would result in the same partitioning schema as the original stream.
> I.e.:
> stream.mapPartitions({ crs =>
> { cr =>
>         cr.key() -> cr.value()
>       }
>     }) <--- specify somehow Partitioner here for the resulting rdd.
> The reason I ask is that it simply looks strange to me that Spark will have
> to shuffle each time my input stream and "state" stream during the
> mapWithState operation when I now for sure that those two streams will
> always share same keys and will not need access to others partitions.
> Thanks,
> Andrii
> 2016-10-31 15:45 GMT+02:00 Cody Koeninger <>:
>> If you call a transformation on an rdd using the same partitioner as that
>> rdd, no shuffle will occur.  KafkaRDD doesn't have a partitioner, there's no
>> consistent partitioning scheme that works for all kafka uses. You can wrap
>> each kafkardd with an rdd that has a custom partitioner that you write to
>> match your kafka partitioning scheme, and avoid a shuffle.
>> The danger there is if you have any misbehaving producers, or translate
>> the partitioning wrongly, you'll get bad results. It's safer just to
>> shuffle.
>> On Oct 31, 2016 04:31, "Andrii Biletskyi"
>> <> wrote:
>> Hi all,
>> I'm using Spark Streaming mapWithState operation to do a stateful
>> operation on my Kafka stream (though I think similar arguments would apply
>> for any source).
>> Trying to understand a way to control mapWithState's partitioning schema.
>> My transformations are simple:
>> 1) create KafkaDStream
>> 2) mapPartitions to get a key-value stream where `key` corresponds to
>> Kafka message key
>> 3) apply mapWithState operation on key-value stream, the state stream
>> shares keys with the original stream, the resulting streams doesn't change
>> keys either
>> The problem is that, as I understand, mapWithState stream has a different
>> partitioning schema and thus I see shuffles in Spark Web UI.
>> From the mapWithState implementation I see that:
>> mapwithState uses Partitioner if specified, otherwise partitions data with
>> HashPartitioner(<default-parallelism-conf>). The thing is that original
>> KafkaDStream has a specific partitioning schema: Kafka partitions correspond
>> Spark RDD partitions.
>> Question: is there a way for mapWithState stream to inherit partitioning
>> schema from the original stream (i.e. correspond to Kafka partitions).
>> Thanks,
>> Andrii

To unsubscribe e-mail:

View raw message