spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrii Biletskyi <andrii.bilets...@yahoo.com.INVALID>
Subject Re: MapWithState partitioning
Date Mon, 31 Oct 2016 15:46:30 GMT
Thanks,

As I understand for Kafka case the way to do it is to define my
kafka.Partitioner that is used when data is produced to Kafka and just
reuse this partitioner as spark.Partitioner in mapWithState spec.

I think I'll stick with that.

Thanks,
Andrii

2016-10-31 16:55 GMT+02:00 Cody Koeninger <cody@koeninger.org>:

> You may know that those streams share the same keys, but Spark doesn't
> unless you tell it.
>
> mapWithState takes a StateSpec, which should allow you to specify a
> partitioner.
>
> On Mon, Oct 31, 2016 at 9:40 AM, Andrii Biletskyi <andrbele@gmail.com>
> wrote:
> > Thanks for response,
> >
> > So as I understand there is no way to "tell" mapWithState leave the
> > partitioning schema as any other transformation would normally do.
> > Then I would like to clarify if there is a simple way to do a
> transformation
> > to a key-value stream and specify somehow the Partitioner that
> effectively
> > would result in the same partitioning schema as the original stream.
> > I.e.:
> >
> > stream.mapPartitions({ crs =>
> >       crs.map { cr =>
> >         cr.key() -> cr.value()
> >       }
> >     }) <--- specify somehow Partitioner here for the resulting rdd.
> >
> >
> > The reason I ask is that it simply looks strange to me that Spark will
> have
> > to shuffle each time my input stream and "state" stream during the
> > mapWithState operation when I now for sure that those two streams will
> > always share same keys and will not need access to others partitions.
> >
> > Thanks,
> > Andrii
> >
> >
> > 2016-10-31 15:45 GMT+02:00 Cody Koeninger <cody@koeninger.org>:
> >>
> >> If you call a transformation on an rdd using the same partitioner as
> that
> >> rdd, no shuffle will occur.  KafkaRDD doesn't have a partitioner,
> there's no
> >> consistent partitioning scheme that works for all kafka uses. You can
> wrap
> >> each kafkardd with an rdd that has a custom partitioner that you write
> to
> >> match your kafka partitioning scheme, and avoid a shuffle.
> >>
> >> The danger there is if you have any misbehaving producers, or translate
> >> the partitioning wrongly, you'll get bad results. It's safer just to
> >> shuffle.
> >>
> >>
> >> On Oct 31, 2016 04:31, "Andrii Biletskyi"
> >> <andrii.biletskyi@yahoo.com.invalid> wrote:
> >>
> >> Hi all,
> >>
> >> I'm using Spark Streaming mapWithState operation to do a stateful
> >> operation on my Kafka stream (though I think similar arguments would
> apply
> >> for any source).
> >>
> >> Trying to understand a way to control mapWithState's partitioning
> schema.
> >>
> >> My transformations are simple:
> >>
> >> 1) create KafkaDStream
> >> 2) mapPartitions to get a key-value stream where `key` corresponds to
> >> Kafka message key
> >> 3) apply mapWithState operation on key-value stream, the state stream
> >> shares keys with the original stream, the resulting streams doesn't
> change
> >> keys either
> >>
> >> The problem is that, as I understand, mapWithState stream has a
> different
> >> partitioning schema and thus I see shuffles in Spark Web UI.
> >>
> >> From the mapWithState implementation I see that:
> >> mapwithState uses Partitioner if specified, otherwise partitions data
> with
> >> HashPartitioner(<default-parallelism-conf>). The thing is that original
> >> KafkaDStream has a specific partitioning schema: Kafka partitions
> correspond
> >> Spark RDD partitions.
> >>
> >> Question: is there a way for mapWithState stream to inherit partitioning
> >> schema from the original stream (i.e. correspond to Kafka partitions).
> >>
> >> Thanks,
> >> Andrii
> >>
> >>
> >
>

Mime
View raw message