spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ivan von Nagy <i...@vadio.com>
Subject Re: Instability issues with Spark 2.0.1 and Kafka 0.10
Date Sun, 13 Nov 2016 18:47:15 GMT
As the code iterates through the parallel list, it is processing up to 8
KafkaRDD at a time. Each has it's own unique topic and consumer group now.
Every topic has 4 partitions, so technically there should never be more
then 32 CachedKafkaConsumers. However, this seems to not be the case as we
are using the default settings for cache size (16 initial -> 64 max) and
PreferConsistent for the location strategy. I do notice the concurrent
modification exception occurs when a cached consumer is being dropped out
of the cache when it reaches the max, 64. After looking at the code, the
KafkaRDDIterator will only close it's consumer if we are not caching (makes
sense), but there is no other way to close/drop the consumer until it gets
dropped from the cache. Perhaps there is an issue with resources here since
RDDs don't inherently have any resource management support, like "I am done
so cleanup now".

Over the course of this job, it will probably process upwards of 100-150
different channels so about 400-600 partitions. Does this mean we should
bump the cache size that high even though only about 8 channels (32
partitions) are being handled by the executors at any given time?

Thanks,

Ivan

On Sat, Nov 12, 2016 at 1:25 PM, Cody Koeninger <cody@koeninger.org> wrote:

> You should not be getting consumer churn on executors at all, that's
> the whole point of the cache.  How many partitions are you trying to
> process per executor?
>
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#
> locationstrategies
>
> gives instructions on the default size of the cache and how to increase it.
>
> On Sat, Nov 12, 2016 at 2:15 PM, Ivan von Nagy <ivan@vadio.com> wrote:
> > Hi Sean,
> >
> > Thanks for responding. We have run our jobs with internal parallel
> > processing for well over a year (Spark 1.5, 1.6 and Kafka 0.8.2.2.) and
> did
> > not encounter any of these issues until upgrading to Spark 2.0.1 and
> Kafka
> > 0.10 clients. If we process serially, then we sometimes get the errors,
> but
> > far less often. Also, if done sequentially it takes sometimes more the
> 2x as
> > long which is not an option for this particular job.
> >
> > I posted another example on Nov 10th which is the example below. We
> > basically iterate through a list in parallel and sometimes the list
> could be
> > upwards of a hundred elements. The parallelism in Scala/Spark limits to
> > about 8 at a time on our nodes. For performance reasons we process in
> > parallel and we also separate each since each channel has their own
> topic.
> > We don't combine all into one KafkaRDD because that means we have to
> process
> > all or nothing if an error occurs. This way if a couple of channels
> fail, we
> > can re-run the job and it will only process those channels.
> >
> > This has just been perplexing since we had never encountered any errors
> for
> > well over a year using the prior versions. At this time, I am just
> seeking
> > any configuration options or code changes that we may be missing or even
> at
> > a lower level, fundamentally what changed in Spark 2 and Kafka 0.10 that
> > surfaced these issues.
> >
> > We continue to use Spark 1.6 with the Kafka 0.8.x clients until this can
> be
> > figured out, however, it is a deal breaker for use to upgrade to Spark
> 2.x
> > with Kafka 0.10 clients. On a side note, we have not encountered any
> issues
> > with the Kafka Producers, this is simply with the KafkaRDD and its use of
> > CachedKafkaConsumer. Any help is much appreciated.
> >
> > Thanks,
> >
> > Ivan
> >
> > Example usage with KafkaRDD:
> > val channels = Seq("channel1", "channel2")
> >
> > channels.toParArray.foreach { channel =>
> >   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
> >
> >   // Get offsets for the given topic and the consumer group
> "$prefix-$topic"
> >   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
> >
> >   val ds = KafkaUtils.createRDD[K, V](context,
> >         kafkaParams asJava,
> >         offsetRanges,
> >         PreferConsistent).toDS[V]
> >
> >   // Do some aggregations
> >   ds.agg(...)
> >   // Save the data
> >   ds.write.mode(SaveMode.Append).parquet(somePath)
> >   // Save offsets using a KafkaConsumer
> >   consumer.commitSync(newOffsets.asJava)
> >   consumer.close()
> > }
> >
> > On Sat, Nov 12, 2016 at 11:46 AM, Sean McKibben <graphex@graphex.com>
> wrote:
> >>
> >> How are you iterating through your RDDs in parallel? In the past (Spark
> >> 1.5.2) when I've had actions being performed on multiple RDDs
> concurrently
> >> using futures, I've encountered some pretty bad behavior in Spark,
> >> especially during job retries. Very difficult to explain things, like
> >> records from one RDD leaking into a totally different (though shared
> >> lineage) RDD during job retries. I'm not sure what documentation exists
> >> around parallelizing on top of Spark's existing parallelization
> approach,
> >> but I would venture a guess that that could be the source of your
> concurrent
> >> access problems, and potentially other hidden issues. Have you tried a
> >> version of your app which doesn't parallelize actions on RDDs, but
> instead
> >> serially processes each RDD? I'm sure it isn't ideal performance-wise,
> but
> >> it seems like a good choice for an A/B test.
> >>
> >> The poll.ms issue could very well be settings or capability of your
> kafka
> >> cluster. I think other (non-Spark) approaches may have less consumer
> churn
> >> and be less susceptible to things like GC pauses or coordination
> latency. It
> >> could also be that the number of consumers being simultaneously created
> on
> >> each executor causes a thundering herd problem during initial phases
> (which
> >> then causes job retries, which then causes more consumer churn, etc.).
> >>
> >> Sean
> >>
> >>
> >>
> >> On Nov 12, 2016, at 11:14 AM, Ivan von Nagy <ivan@vadio.com> wrote:
> >>
> >> The code was changed to use a unique group for each KafkaRDD that was
> >> created (see Nov 10 post). There is no KafkaRDD being reused. The basic
> >> logic (see Nov 10 post for example) is get a list of channels, iterate
> >> through them in parallel, load a KafkaRDD using a given topic and a
> consumer
> >> group that is made from the topic (each RDD uses a different topic and
> >> group), process the data and write to Parquet files.
> >>
> >> Per my Nov 10th post, we still get polling timeouts unless the poll.ms
> is
> >> set to something like 10 seconds. We also get concurrent modification
> >> exceptions as well. I believe the key here is the processing of data in
> >> parallel is where we encounter issues so we are looking for some
> possible
> >> answers surrounding this.
> >>
> >> Thanks,
> >>
> >> Ivan
> >>
> >>
> >> On Fri, Nov 11, 2016 at 12:12 PM, Cody Koeninger <cody@koeninger.org>
> >> wrote:
> >>>
> >>> It is already documented that you must use a different group id, which
> as
> >>> far as I can tell you are still not doing.
> >>>
> >>>
> >>> On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" <shixiong@databricks.com
> >
> >>> wrote:
> >>>>
> >>>> Yeah, the KafkaRDD cannot be reused. It's better to document it.
> >>>>
> >>>> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <ivan@vadio.com>
> wrote:
> >>>>>
> >>>>> Ok, I have split he KafkaRDD logic to each use their own group and
> >>>>> bumped the poll.ms to 10 seconds. Anything less then 2 seconds on
> the
> >>>>> poll.ms ends up with a timeout and exception so I am still
> perplexed on that
> >>>>> one. The new error I am getting now is a `
> ConcurrentModificationException`
> >>>>> when Spark is trying to remove the CachedKafkaConsumer.
> >>>>>
> >>>>> java.util.ConcurrentModificationException: KafkaConsumer is not
safe
> >>>>> for multi-threaded access
> >>>>> at
> >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1431)
> >>>>> at
> >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1361)
> >>>>> at
> >>>>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> >>>>> at java.util.LinkedHashMap.afterNodeInsertion(
> LinkedHashMap.java:299)
> >>>>>
> >>>>> Here is the basic logic:
> >>>>>
> >>>>> Using KafkaRDD - This takes a list of channels and processes them
in
> >>>>> parallel using the KafkaRDD directly. They each use a distinct
> consumer
> >>>>> group (s"$prefix-$topic"), and each has it's own topic and each
> topic has 4
> >>>>> partitions. We routinely get timeout errors when polling for data
> when the
> >>>>> poll.ms is less then 2 seconds. This occurs whether we process in
> parallel.
> >>>>>
> >>>>> Example usage with KafkaRDD:
> >>>>> val channels = Seq("channel1", "channel2")
> >>>>>
> >>>>> channels.toParArray.foreach { channel =>
> >>>>>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
> >>>>>
> >>>>>   // Get offsets for the given topic and the consumer group
> >>>>> "$prefix-$topic"
> >>>>>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
> >>>>>
> >>>>>   val ds = KafkaUtils.createRDD[K, V](context,
> >>>>>         kafkaParams asJava,
> >>>>>         offsetRanges,
> >>>>>         PreferConsistent).toDS[V]
> >>>>>
> >>>>>   // Do some aggregations
> >>>>>   ds.agg(...)
> >>>>>   // Save the data
> >>>>>   ds.write.mode(SaveMode.Append).parquet(somePath)
> >>>>>   // Save offsets using a KafkaConsumer
> >>>>>   consumer.commitSync(newOffsets.asJava)
> >>>>>   consumer.close()
> >>>>> }
> >>>>>
> >>>>> I am not sure why the concurrent issue is there as I have tried
to
> >>>>> debug and also looked at the KafkaConsumer code as well, but
> everything
> >>>>> looks like it should not occur. The things to figure out is why
when
> running
> >>>>> in parallel does this occur and also why the timeouts still occur.
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Ivan
> >>>>>
> >>>>> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <cody@koeninger.org>
> >>>>> wrote:
> >>>>>>
> >>>>>> There definitely is Kafka documentation indicating that you
should
> use
> >>>>>> a different consumer group for logically different subscribers,
this
> >>>>>> is really basic to Kafka:
> >>>>>>
> >>>>>> http://kafka.apache.org/documentation#intro_consumers
> >>>>>>
> >>>>>> As for your comment that "commit async after each RDD, which
is not
> >>>>>> really viable also", how is it not viable?  Again, committing
> offsets
> >>>>>> to Kafka doesn't give you reliable delivery semantics unless
your
> >>>>>> downstream data store is idempotent.  If your downstream data
store
> is
> >>>>>> idempotent, then it shouldn't matter to you when offset commits
> >>>>>> happen, as long as they happen within a reasonable time after
the
> data
> >>>>>> is written.
> >>>>>>
> >>>>>> Do you want to keep arguing with me, or follow my advice and
proceed
> >>>>>> with debugging any remaining issues after you make the changes
I
> >>>>>> suggested?
> >>>>>>
> >>>>>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <ivan@vadio.com>
> wrote:
> >>>>>> > With our stream version, we update the offsets for only
the
> >>>>>> > partition we
> >>>>>> > operating on. We even break down the partition into smaller
> batches
> >>>>>> > and then
> >>>>>> > update the offsets after each batch within the partition.
With
> Spark
> >>>>>> > 1.6 and
> >>>>>> > Kafka 0.8.x this was not an issue, and as Sean pointed
out, this
> is
> >>>>>> > not
> >>>>>> > necessarily a Spark issue since Kafka no longer allows
you to
> simply
> >>>>>> > update
> >>>>>> > the offsets for a given consumer group. You have to subscribe
or
> >>>>>> > assign
> >>>>>> > partitions to even do so.
> >>>>>> >
> >>>>>> > As for storing the offsets in some other place like a DB,
it don't
> >>>>>> > find this
> >>>>>> > useful because you then can't use tools like Kafka Manager.
In
> order
> >>>>>> > to do
> >>>>>> > so you would have to store in a DB and the circle back
and update
> >>>>>> > Kafka
> >>>>>> > afterwards. This means you have to keep two sources in
sync which
> is
> >>>>>> > not
> >>>>>> > really a good idea.
> >>>>>> >
> >>>>>> > It is a challenge in Spark to use the Kafka offsets since
the
> drive
> >>>>>> > keeps
> >>>>>> > subscribed to the topic(s) and consumer group, while the
executors
> >>>>>> > prepend
> >>>>>> > "spark-executor-" to the consumer group. The stream (driver)
does
> >>>>>> > allow you
> >>>>>> > to commit async after each RDD, which is not really viable
also. I
> >>>>>> > have not
> >>>>>> > of implementing an Akka actor system on the driver and
send it
> >>>>>> > messages from
> >>>>>> > the executor code to update the offsets, but then that
is
> >>>>>> > asynchronous as
> >>>>>> > well so not really a good solution.
> >>>>>> >
> >>>>>> > I have no idea why Kafka made this change and also why
in the
> >>>>>> > parallel
> >>>>>> > KafkaRDD application we would be advised to use different
consumer
> >>>>>> > groups
> >>>>>> > for each RDD. That seems strange to me that different consumer
> >>>>>> > groups would
> >>>>>> > be required or advised. There is no Kafka documentation
that I
> know
> >>>>>> > if that
> >>>>>> > states this. The biggest issue I see with the parallel
KafkaRDD is
> >>>>>> > the
> >>>>>> > timeouts. I have tried to set poll.ms to 30 seconds and
still get
> >>>>>> > the issue.
> >>>>>> > Something is not right here and just not seem right. As
I
> mentioned
> >>>>>> > with the
> >>>>>> > streaming application, with Spark 1.6 and Kafka 0.8.x we
never saw
> >>>>>> > this
> >>>>>> > issue. We have been running the same basic logic for over
a year
> now
> >>>>>> > without
> >>>>>> > one hitch at all.
> >>>>>> >
> >>>>>> > Ivan
> >>>>>> >
> >>>>>> >
> >>>>>> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <
> cody@koeninger.org>
> >>>>>> > wrote:
> >>>>>> >>
> >>>>>> >> Someone can correct me, but I'm pretty sure Spark dstreams
(in
> >>>>>> >> general, not just kafka) have been progressing on to
the next
> batch
> >>>>>> >> after a given batch aborts for quite some time now.
 Yet another
> >>>>>> >> reason I put offsets in my database transactionally.
 My jobs
> throw
> >>>>>> >> exceptions if the offset in the DB isn't what I expected
it to
> be.
> >>>>>> >>
> >>>>>> >>
> >>>>>> >>
> >>>>>> >>
> >>>>>> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <
> graphex@graphex.com>
> >>>>>> >> wrote:
> >>>>>> >> > I've been encountering the same kinds of timeout
issues as
> Ivan,
> >>>>>> >> > using
> >>>>>> >> > the "Kafka Stream" approach that he is using,
except I'm
> storing
> >>>>>> >> > my offsets
> >>>>>> >> > manually from the driver to Zookeeper in the Kafka
8 format. I
> >>>>>> >> > haven't yet
> >>>>>> >> > implemented the KafkaRDD approach, and therefore
don't have the
> >>>>>> >> > concurrency
> >>>>>> >> > issues, but a very similar use case is coming
up for me soon,
> >>>>>> >> > it's just been
> >>>>>> >> > backburnered until I can get streaming to be more
reliable (I
> >>>>>> >> > will
> >>>>>> >> > definitely ensure unique group IDs when I do).
Offset commits
> are
> >>>>>> >> > certainly
> >>>>>> >> > more painful in Kafka 0.10, and that doesn't have
anything to
> do
> >>>>>> >> > with Spark.
> >>>>>> >> >
> >>>>>> >> > While i may be able to alleviate the timeout by
just increasing
> >>>>>> >> > it, I've
> >>>>>> >> > noticed something else that is more worrying:
When one task
> fails
> >>>>>> >> > 4 times in
> >>>>>> >> > a row (i.e. "Failed to get records for _ after
polling for _"),
> >>>>>> >> > Spark aborts
> >>>>>> >> > the Stage and Job with "Job aborted due to stage
failure: Task
> _
> >>>>>> >> > in stage _
> >>>>>> >> > failed 4 times". That's fine, and it's the behavior
I want, but
> >>>>>> >> > instead of
> >>>>>> >> > stopping the Application there (as previous versions
of Spark
> >>>>>> >> > did) the next
> >>>>>> >> > microbatch marches on and offsets are committed
ahead of the
> >>>>>> >> > failed
> >>>>>> >> > microbatch. Suddenly my at-least-once app becomes
more
> >>>>>> >> > sometimes-at-least-once which is no good. In order
for spark to
> >>>>>> >> > display that
> >>>>>> >> > failure, I must be propagating the errors up to
Spark, but the
> >>>>>> >> > behavior of
> >>>>>> >> > marching forward with the next microbatch seems
to be new, and
> a
> >>>>>> >> > big
> >>>>>> >> > potential for data loss in streaming applications.
> >>>>>> >> >
> >>>>>> >> > Am I perhaps missing a setting to stop the entire
streaming
> >>>>>> >> > application
> >>>>>> >> > once spark.task.maxFailures is reached? Has anyone
else seen
> this
> >>>>>> >> > behavior
> >>>>>> >> > of a streaming application skipping over failed
microbatches?
> >>>>>> >> >
> >>>>>> >> > Thanks,
> >>>>>> >> > Sean
> >>>>>> >> >
> >>>>>> >> >
> >>>>>> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger
<
> cody@koeninger.org>
> >>>>>> >> >> wrote:
> >>>>>> >> >>
> >>>>>> >> >> So basically what I am saying is
> >>>>>> >> >>
> >>>>>> >> >> - increase poll.ms
> >>>>>> >> >> - use a separate group id everywhere
> >>>>>> >> >> - stop committing offsets under the covers
> >>>>>> >> >>
> >>>>>> >> >> That should eliminate all of those as possible
causes, and
> then
> >>>>>> >> >> we can
> >>>>>> >> >> see if there are still issues.
> >>>>>> >> >>
> >>>>>> >> >> As far as 0.8 vs 0.10, Spark doesn't require
you to assign or
> >>>>>> >> >> subscribe to a topic in order to update offsets,
Kafka does.
> If
> >>>>>> >> >> you
> >>>>>> >> >> don't like the new Kafka consumer api, the
existing 0.8 simple
> >>>>>> >> >> consumer api should be usable with later brokers.
 As long as
> >>>>>> >> >> you
> >>>>>> >> >> don't need SSL or dynamic subscriptions, and
it meets your
> >>>>>> >> >> needs, keep
> >>>>>> >> >> using it.
> >>>>>> >> >>
> >>>>>> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy
<ivan@vadio.com
> >
> >>>>>> >> >> wrote:
> >>>>>> >> >>> Yes, the parallel KafkaRDD uses the same
consumer group, but
> >>>>>> >> >>> each RDD
> >>>>>> >> >>> uses a
> >>>>>> >> >>> single distinct topic. For example, the
group would be
> >>>>>> >> >>> something like
> >>>>>> >> >>> "storage-group", and the topics would
be "storage-channel1",
> >>>>>> >> >>> and
> >>>>>> >> >>> "storage-channel2". In each thread a KafkaConsumer
is
> started,
> >>>>>> >> >>> assigned the
> >>>>>> >> >>> partitions assigned, and then commit offsets
are called after
> >>>>>> >> >>> the RDD
> >>>>>> >> >>> is
> >>>>>> >> >>> processed. This should not interfere with
the consumer group
> >>>>>> >> >>> used by
> >>>>>> >> >>> the
> >>>>>> >> >>> executors which would be "spark-executor-storage-group".
> >>>>>> >> >>>
> >>>>>> >> >>> In the streaming example there is a single
topic
> >>>>>> >> >>> ("client-events") and
> >>>>>> >> >>> group
> >>>>>> >> >>> ("processing-group"). A single stream
is created and offsets
> >>>>>> >> >>> are
> >>>>>> >> >>> manually
> >>>>>> >> >>> updated from the executor after each partition
is handled.
> This
> >>>>>> >> >>> was a
> >>>>>> >> >>> challenge since Spark now requires one
to assign or subscribe
> >>>>>> >> >>> to a
> >>>>>> >> >>> topic in
> >>>>>> >> >>> order to even update the offsets. In 0.8.2.x
you did not have
> >>>>>> >> >>> to worry
> >>>>>> >> >>> about
> >>>>>> >> >>> that. This approach limits your exposure
to duplicate data
> >>>>>> >> >>> since
> >>>>>> >> >>> idempotent
> >>>>>> >> >>> records are not entirely possible in our
scenario. At least
> >>>>>> >> >>> without a
> >>>>>> >> >>> lot of
> >>>>>> >> >>> re-running of logic to de-dup.
> >>>>>> >> >>>
> >>>>>> >> >>> Thanks,
> >>>>>> >> >>>
> >>>>>> >> >>> Ivan
> >>>>>> >> >>>
> >>>>>> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger
> >>>>>> >> >>> <cody@koeninger.org>
> >>>>>> >> >>> wrote:
> >>>>>> >> >>>>
> >>>>>> >> >>>> So just to be clear, the answers to
my questions are
> >>>>>> >> >>>>
> >>>>>> >> >>>> - you are not using different group
ids, you're using the
> same
> >>>>>> >> >>>> group
> >>>>>> >> >>>> id everywhere
> >>>>>> >> >>>>
> >>>>>> >> >>>> - you are committing offsets manually
> >>>>>> >> >>>>
> >>>>>> >> >>>> Right?
> >>>>>> >> >>>>
> >>>>>> >> >>>> If you want to eliminate network or
kafka misbehavior as a
> >>>>>> >> >>>> source,
> >>>>>> >> >>>> tune poll.ms upwards even higher.
> >>>>>> >> >>>>
> >>>>>> >> >>>> You must use different group ids for
different rdds or
> >>>>>> >> >>>> streams.
> >>>>>> >> >>>> Kafka consumers won't behave the way
you expect if they are
> >>>>>> >> >>>> all in
> >>>>>> >> >>>> the
> >>>>>> >> >>>> same group id, and the consumer cache
is keyed by group id.
> >>>>>> >> >>>> Yes, the
> >>>>>> >> >>>> executor will tack "spark-executor-"
on to the beginning,
> but
> >>>>>> >> >>>> if you
> >>>>>> >> >>>> give it the same base group id, it
will be the same.  And
> the
> >>>>>> >> >>>> driver
> >>>>>> >> >>>> will use the group id you gave it,
unmodified.
> >>>>>> >> >>>>
> >>>>>> >> >>>> Finally, I really can't help you if
you're manually writing
> >>>>>> >> >>>> your own
> >>>>>> >> >>>> code to commit offsets directly to
Kafka.  Trying to
> minimize
> >>>>>> >> >>>> duplicates that way doesn't really
make sense, your system
> >>>>>> >> >>>> must be
> >>>>>> >> >>>> able to handle duplicates if you're
using kafka as an
> offsets
> >>>>>> >> >>>> store,
> >>>>>> >> >>>> it can't do transactional exactly
once.
> >>>>>> >> >>>>
> >>>>>> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy
<ivan@vadio.com>
> >>>>>> >> >>>> wrote:
> >>>>>> >> >>>>> Here are some examples and details
of the scenarios. The
> >>>>>> >> >>>>> KafkaRDD is
> >>>>>> >> >>>>> the
> >>>>>> >> >>>>> most
> >>>>>> >> >>>>> error prone to polling
> >>>>>> >> >>>>> timeouts and concurrentm modification
errors.
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> *Using KafkaRDD* - This takes
a list of channels and
> >>>>>> >> >>>>> processes them
> >>>>>> >> >>>>> in
> >>>>>> >> >>>>> parallel using the KafkaRDD directly.
they all use the same
> >>>>>> >> >>>>> consumer
> >>>>>> >> >>>>> group
> >>>>>> >> >>>>> ('storage-group'), but each has
it's own topic and each
> topic
> >>>>>> >> >>>>> has 4
> >>>>>> >> >>>>> partitions. We routinely get timeout
errors when polling
> for
> >>>>>> >> >>>>> data.
> >>>>>> >> >>>>> This
> >>>>>> >> >>>>> occurs whether we process in parallel
or sequentially.
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> *Spark Kafka setting:*
> >>>>>> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> *Kafka Consumer Params:*
> >>>>>> >> >>>>> metric.reporters = []
> >>>>>> >> >>>>> metadata.max.age.ms = 300000
> >>>>>> >> >>>>> partition.assignment.strategy
=
> >>>>>> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
> >>>>>> >> >>>>> reconnect.backoff.ms = 50
> >>>>>> >> >>>>> sasl.kerberos.ticket.renew.window.factor
= 0.8
> >>>>>> >> >>>>> max.partition.fetch.bytes = 1048576
> >>>>>> >> >>>>> bootstrap.servers = [somemachine:31000]
> >>>>>> >> >>>>> ssl.keystore.type = JKS
> >>>>>> >> >>>>> enable.auto.commit = false
> >>>>>> >> >>>>> sasl.mechanism = GSSAPI
> >>>>>> >> >>>>> interceptor.classes = null
> >>>>>> >> >>>>> exclude.internal.topics = true
> >>>>>> >> >>>>> ssl.truststore.password = null
> >>>>>> >> >>>>> client.id =
> >>>>>> >> >>>>> ssl.endpoint.identification.algorithm
= null
> >>>>>> >> >>>>> max.poll.records = 1000
> >>>>>> >> >>>>> check.crcs = true
> >>>>>> >> >>>>> request.timeout.ms = 40000
> >>>>>> >> >>>>> heartbeat.interval.ms = 3000
> >>>>>> >> >>>>> auto.commit.interval.ms = 5000
> >>>>>> >> >>>>> receive.buffer.bytes = 65536
> >>>>>> >> >>>>> ssl.truststore.type = JKS
> >>>>>> >> >>>>> ssl.truststore.location = null
> >>>>>> >> >>>>> ssl.keystore.password = null
> >>>>>> >> >>>>> fetch.min.bytes = 1
> >>>>>> >> >>>>> send.buffer.bytes = 131072
> >>>>>> >> >>>>> value.deserializer = class
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> com.vadio.analytics.spark.storage.
> ClientEventJsonOptionDeserializer
> >>>>>> >> >>>>> group.id = storage-group
> >>>>>> >> >>>>> retry.backoff.ms = 100
> >>>>>> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >>>>>> >> >>>>> sasl.kerberos.service.name = null
> >>>>>> >> >>>>> sasl.kerberos.ticket.renew.jitter
= 0.05
> >>>>>> >> >>>>> ssl.trustmanager.algorithm = PKIX
> >>>>>> >> >>>>> ssl.key.password = null
> >>>>>> >> >>>>> fetch.max.wait.ms = 500
> >>>>>> >> >>>>> sasl.kerberos.min.time.before.relogin
= 60000
> >>>>>> >> >>>>> connections.max.idle.ms = 540000
> >>>>>> >> >>>>> session.timeout.ms = 30000
> >>>>>> >> >>>>> metrics.num.samples = 2
> >>>>>> >> >>>>> key.deserializer = class
> >>>>>> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer
> >>>>>> >> >>>>> ssl.protocol = TLS
> >>>>>> >> >>>>> ssl.provider = null
> >>>>>> >> >>>>> ssl.enabled.protocols = [TLSv1.2,
TLSv1.1, TLSv1]
> >>>>>> >> >>>>> ssl.keystore.location = null
> >>>>>> >> >>>>> ssl.cipher.suites = null
> >>>>>> >> >>>>> security.protocol = PLAINTEXT
> >>>>>> >> >>>>> ssl.keymanager.algorithm = SunX509
> >>>>>> >> >>>>> metrics.sample.window.ms = 30000
> >>>>>> >> >>>>> auto.offset.reset = earliest
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> *Example usage with KafkaRDD:*
> >>>>>> >> >>>>> val channels = Seq("channel1",
"channel2")
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> channels.toParArray.foreach {
channel =>
> >>>>>> >> >>>>>  val consumer = new KafkaConsumer[K,
V](kafkaParams.asJava)
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>  // Get offsets for the given
topic and the consumer group
> >>>>>> >> >>>>> 'storage-group'
> >>>>>> >> >>>>>  val offsetRanges = getOffsets("storage-group",
channel)
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>  val ds = KafkaUtils.createRDD[K,
V](context,
> >>>>>> >> >>>>>        kafkaParams asJava,
> >>>>>> >> >>>>>        offsetRanges,
> >>>>>> >> >>>>>        PreferConsistent).toDS[V]
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>  // Do some aggregations
> >>>>>> >> >>>>>  ds.agg(...)
> >>>>>> >> >>>>>  // Save the data
> >>>>>> >> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
> >>>>>> >> >>>>>  // Save offsets using a KafkaConsumer
> >>>>>> >> >>>>>  consumer.commitSync(newOffsets.asJava)
> >>>>>> >> >>>>>  consumer.close()
> >>>>>> >> >>>>> }
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> *Example usage with Kafka Stream:*
> >>>>>> >> >>>>> This creates a stream and processes
events in each
> partition.
> >>>>>> >> >>>>> At the
> >>>>>> >> >>>>> end
> >>>>>> >> >>>>> of
> >>>>>> >> >>>>> processing for
> >>>>>> >> >>>>> each partition, we updated the
offsets for each partition.
> >>>>>> >> >>>>> This is
> >>>>>> >> >>>>> challenging to do, but is better
> >>>>>> >> >>>>> then calling commitAysnc on the
stream, because that occurs
> >>>>>> >> >>>>> after
> >>>>>> >> >>>>> the
> >>>>>> >> >>>>> /entire/ RDD has been
> >>>>>> >> >>>>> processed. This method minimizes
duplicates in an exactly
> >>>>>> >> >>>>> once
> >>>>>> >> >>>>> environment.
> >>>>>> >> >>>>> Since the executors
> >>>>>> >> >>>>> use their own custom group "spark-executor-processor-
> group"
> >>>>>> >> >>>>> and the
> >>>>>> >> >>>>> commit
> >>>>>> >> >>>>> is buried in private
> >>>>>> >> >>>>> functions we are unable to use
the executors cached
> consumer
> >>>>>> >> >>>>> to
> >>>>>> >> >>>>> update
> >>>>>> >> >>>>> the
> >>>>>> >> >>>>> offsets. This requires us
> >>>>>> >> >>>>> to go through multiple steps to
update the Kafka offsets
> >>>>>> >> >>>>> accordingly.
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> val offsetRanges = getOffsets("processor-group",
> "my-topic")
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> val stream = KafkaUtils.createDirectStream[K,
V](context,
> >>>>>> >> >>>>>      PreferConsistent,
> >>>>>> >> >>>>>      Subscribe[K, V](Seq("my-topic")
asJavaCollection,
> >>>>>> >> >>>>>        kafkaParams,
> >>>>>> >> >>>>>        offsetRanges))
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> stream.foreachRDD { rdd =>
> >>>>>> >> >>>>>    val offsetRanges =
> >>>>>> >> >>>>> rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>    // Transform our data
> >>>>>> >> >>>>>   rdd.foreachPartition { events
=>
> >>>>>> >> >>>>>       // Establish a consumer
in the executor so we can
> >>>>>> >> >>>>> update
> >>>>>> >> >>>>> offsets
> >>>>>> >> >>>>> after each partition.
> >>>>>> >> >>>>>       // This class is homegrown
and uses the KafkaConsumer
> >>>>>> >> >>>>> to help
> >>>>>> >> >>>>> get/set
> >>>>>> >> >>>>> offsets
> >>>>>> >> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
> >>>>>> >> >>>>>       // do something with our
data
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>       // Write the offsets that
were updated in this
> >>>>>> >> >>>>> partition
> >>>>>> >> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
> >>>>>> >> >>>>>          Map(TopicAndPartition(tp.topic,
tp.partition) ->
> >>>>>> >> >>>>> endOffset))
> >>>>>> >> >>>>>   }
> >>>>>> >> >>>>> }
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> --
> >>>>>> >> >>>>> View this message in context:
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/
> Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
> >>>>>> >> >>>>> Sent from the Apache Spark User
List mailing list archive
> at
> >>>>>> >> >>>>> Nabble.com.
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> ------------------------------
> ---------------------------------------
> >>>>>> >> >>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >>>>>> >> >>>>>
> >>>>>> >> >>>
> >>>>>> >> >>>
> >>>>>> >> >>
> >>>>>> >> >>
> >>>>>> >> >> ------------------------------------------------------------
> ---------
> >>>>>> >> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >>>>>> >> >>
> >>>>>> >> >
> >>>>>> >
> >>>>>> >
> >>>>>
> >>>>>
> >>>>
> >>
> >>
> >
>

Mime
View raw message