spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Instability issues with Spark 2.0.1 and Kafka 0.10
Date Fri, 04 Nov 2016 20:48:14 GMT
So basically what I am saying is

- increase poll.ms
- use a separate group id everywhere
- stop committing offsets under the covers

That should eliminate all of those as possible causes, and then we can
see if there are still issues.

As far as 0.8 vs 0.10, Spark doesn't require you to assign or
subscribe to a topic in order to update offsets, Kafka does.  If you
don't like the new Kafka consumer api, the existing 0.8 simple
consumer api should be usable with later brokers.  As long as you
don't need SSL or dynamic subscriptions, and it meets your needs, keep
using it.

On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <ivan@vadio.com> wrote:
> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses a
> single distinct topic. For example, the group would be something like
> "storage-group", and the topics would be "storage-channel1", and
> "storage-channel2". In each thread a KafkaConsumer is started, assigned the
> partitions assigned, and then commit offsets are called after the RDD is
> processed. This should not interfere with the consumer group used by the
> executors which would be "spark-executor-storage-group".
>
> In the streaming example there is a single topic ("client-events") and group
> ("processing-group"). A single stream is created and offsets are manually
> updated from the executor after each partition is handled. This was a
> challenge since Spark now requires one to assign or subscribe to a topic in
> order to even update the offsets. In 0.8.2.x you did not have to worry about
> that. This approach limits your exposure to duplicate data since idempotent
> records are not entirely possible in our scenario. At least without a lot of
> re-running of logic to de-dup.
>
> Thanks,
>
> Ivan
>
> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <cody@koeninger.org> wrote:
>>
>> So just to be clear, the answers to my questions are
>>
>> - you are not using different group ids, you're using the same group
>> id everywhere
>>
>> - you are committing offsets manually
>>
>> Right?
>>
>> If you want to eliminate network or kafka misbehavior as a source,
>> tune poll.ms upwards even higher.
>>
>> You must use different group ids for different rdds or streams.
>> Kafka consumers won't behave the way you expect if they are all in the
>> same group id, and the consumer cache is keyed by group id. Yes, the
>> executor will tack "spark-executor-" on to the beginning, but if you
>> give it the same base group id, it will be the same.  And the driver
>> will use the group id you gave it, unmodified.
>>
>> Finally, I really can't help you if you're manually writing your own
>> code to commit offsets directly to Kafka.  Trying to minimize
>> duplicates that way doesn't really make sense, your system must be
>> able to handle duplicates if you're using kafka as an offsets store,
>> it can't do transactional exactly once.
>>
>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <ivan@vadio.com> wrote:
>> > Here are some examples and details of the scenarios. The KafkaRDD is the
>> > most
>> > error prone to polling
>> > timeouts and concurrentm modification errors.
>> >
>> > *Using KafkaRDD* - This takes a list of channels and processes them in
>> > parallel using the KafkaRDD directly. they all use the same consumer
>> > group
>> > ('storage-group'), but each has it's own topic and each topic has 4
>> > partitions. We routinely get timeout errors when polling for data. This
>> > occurs whether we process in parallel or sequentially.
>> >
>> > *Spark Kafka setting:*
>> > spark.streaming.kafka.consumer.poll.ms=2000
>> >
>> > *Kafka Consumer Params:*
>> > metric.reporters = []
>> > metadata.max.age.ms = 300000
>> > partition.assignment.strategy =
>> > [org.apache.kafka.clients.consumer.RangeAssignor]
>> > reconnect.backoff.ms = 50
>> > sasl.kerberos.ticket.renew.window.factor = 0.8
>> > max.partition.fetch.bytes = 1048576
>> > bootstrap.servers = [somemachine:31000]
>> > ssl.keystore.type = JKS
>> > enable.auto.commit = false
>> > sasl.mechanism = GSSAPI
>> > interceptor.classes = null
>> > exclude.internal.topics = true
>> > ssl.truststore.password = null
>> > client.id =
>> > ssl.endpoint.identification.algorithm = null
>> > max.poll.records = 1000
>> > check.crcs = true
>> > request.timeout.ms = 40000
>> > heartbeat.interval.ms = 3000
>> > auto.commit.interval.ms = 5000
>> > receive.buffer.bytes = 65536
>> > ssl.truststore.type = JKS
>> > ssl.truststore.location = null
>> > ssl.keystore.password = null
>> > fetch.min.bytes = 1
>> > send.buffer.bytes = 131072
>> > value.deserializer = class
>> > com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
>> > group.id = storage-group
>> > retry.backoff.ms = 100
>> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> > sasl.kerberos.service.name = null
>> > sasl.kerberos.ticket.renew.jitter = 0.05
>> > ssl.trustmanager.algorithm = PKIX
>> > ssl.key.password = null
>> > fetch.max.wait.ms = 500
>> > sasl.kerberos.min.time.before.relogin = 60000
>> > connections.max.idle.ms = 540000
>> > session.timeout.ms = 30000
>> > metrics.num.samples = 2
>> > key.deserializer = class
>> > org.apache.kafka.common.serialization.StringDeserializer
>> > ssl.protocol = TLS
>> > ssl.provider = null
>> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> > ssl.keystore.location = null
>> > ssl.cipher.suites = null
>> > security.protocol = PLAINTEXT
>> > ssl.keymanager.algorithm = SunX509
>> > metrics.sample.window.ms = 30000
>> > auto.offset.reset = earliest
>> >
>> > *Example usage with KafkaRDD:*
>> > val channels = Seq("channel1", "channel2")
>> >
>> > channels.toParArray.foreach { channel =>
>> >   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>> >
>> >   // Get offsets for the given topic and the consumer group
>> > 'storage-group'
>> >   val offsetRanges = getOffsets("storage-group", channel)
>> >
>> >   val ds = KafkaUtils.createRDD[K, V](context,
>> >         kafkaParams asJava,
>> >         offsetRanges,
>> >         PreferConsistent).toDS[V]
>> >
>> >   // Do some aggregations
>> >   ds.agg(...)
>> >   // Save the data
>> >   ds.write.mode(SaveMode.Append).parquet(somePath)
>> >   // Save offsets using a KafkaConsumer
>> >   consumer.commitSync(newOffsets.asJava)
>> >   consumer.close()
>> > }
>> >
>> >
>> > *Example usage with Kafka Stream:*
>> > This creates a stream and processes events in each partition. At the end
>> > of
>> > processing for
>> > each partition, we updated the offsets for each partition. This is
>> > challenging to do, but is better
>> > then calling commitAysnc on the stream, because that occurs after the
>> > /entire/ RDD has been
>> > processed. This method minimizes duplicates in an exactly once
>> > environment.
>> > Since the executors
>> > use their own custom group "spark-executor-processor-group" and the
>> > commit
>> > is buried in private
>> > functions we are unable to use the executors cached consumer to update
>> > the
>> > offsets. This requires us
>> > to go through multiple steps to update the Kafka offsets accordingly.
>> >
>> > val offsetRanges = getOffsets("processor-group", "my-topic")
>> >
>> > val stream = KafkaUtils.createDirectStream[K, V](context,
>> >       PreferConsistent,
>> >       Subscribe[K, V](Seq("my-topic") asJavaCollection,
>> >         kafkaParams,
>> >         offsetRanges))
>> >
>> > stream.foreachRDD { rdd =>
>> >     val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> >
>> >     // Transform our data
>> >    rdd.foreachPartition { events =>
>> >        // Establish a consumer in the executor so we can update offsets
>> > after each partition.
>> >        // This class is homegrown and uses the KafkaConsumer to help
>> > get/set
>> > offsets
>> >        val consumer = new ConsumerUtils(kafkaParams)
>> >        // do something with our data
>> >
>> >        // Write the offsets that were updated in this partition
>> >        kafkaConsumer.setConsumerOffsets("processor-group",
>> >           Map(TopicAndPartition(tp.topic, tp.partition) -> endOffset))
>> >    }
>> > }
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message