ConcurrentModificationExceptions with CachedKafkaConsumers
Thu, 30 Aug 2018 17:27:15 GMT
Hello, Spark Users.

We have an application using Spark 2.3.0 and the 0.8 Kafka client.  We're
have a Spark streaming job, and we're reading a reasonable amount of data
from Kafka (40 GB / minute or so).  We would like to move to using the
Kafka 0.10 client to avoid requiring our ( Kafka brokers from
having to modify formats.

We've run into,
'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've tried
to work around it as follows:

1. Disabled consumer caching.  This increased the total job time from ~1
minute per batch to ~1.8 minutes per batch.  This performance penalty is
unacceptable for our use-case. We also saw some partitions stop receiving
for an extended period of time - I was unable to get a simple repro for
this effect though.
2. Disabled speculation and multiple-job concurrency and added caching for
the stream directly after reading from Kafka & caching offsets.  This
approach seems to work well for simple examples (read from a Kafka topic,
write to another topic). However, when we move through more complex logic
we continue to see this type of error - despite only creating the stream
for a given topic a single time.  We validated that we're creating the
stream from a given topic / partition a single time by logging on stream
creation, caching the stream and (eventually) calling 'runJob' to actually
go and fetch the data. Nonetheless with multiple outputs we see the

I've included some code down below.  I would be happy if anyone had
debugging tips for the workaround.  However, my main concern is to ensure
that the 2.4 version will have a bug fix that will work for Spark Streaming
in which multiple input topics map data to multiple outputs. I would also
like to understand if the fix (
will be backported to Spark 2.3.x

In our code, read looks like the following:

case class StreamLookupKey(topic: Set[String], brokers: String)

private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = Map()

// Given inputs return a direct stream.
def createDirectStream(ssc: StreamingContext,
                       additionalKafkaParameters: Map[String, String],
                       brokersToUse: Array[String], //
                       topicsToUse: Array[String],
                       applicationName: String,
                       persist: Option[PersistenceManager],
                       useOldestOffsets: Boolean,
                       maxRatePerPartition: Long,
                       batchSeconds: Int
                      ): DStream[DecodedData] = {
  val streams: Array[DStream[DecodedData]] = => {
      val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
      val kafkaParameters: Map[String, String] =
getKafkaParameters(brokers, useOldestOffsets, groupId) ++
additionalKafkaParameters"Kafka Params: ${kafkaParameters}")
      val topics = topicsToUse.toSet"Creating Kafka direct connection -
${kafkaParameters.mkString(GeneralConstants.comma)} " +
        s"topics: ${topics.mkString(GeneralConstants.comma)} w/
applicationGroup: ${groupId}")

      streamMap.getOrElse(StreamLookupKey(topics, brokers),
createKafkaStream(ssc, applicationName, topics, brokers,
maxRatePerPartition, batchSeconds, kafkaParameters))


private def createKafkaStream(ssc: StreamingContext, applicationName:
String, topics: Set[String], brokers: String,
                              maxRatePerPartition: Long, batchSeconds:
Int, kafkaParameters: Map[String,String]): DStream[DecodedData] = {"Creating a stream from Kafka for application
${applicationName} w/ topic ${topics} and " +
    s"brokers: ${brokers.split(',').head} with parameters:
  try {
    val consumerStrategy = ConsumerStrategies.Subscribe[String,
DecodedData](topics.toSeq, kafkaParameters)
    val stream: InputDStream[ConsumerRecord[String, DecodedData]] =
      KafkaUtils.createDirectStream(ssc, locationStrategy =
LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy)

    KafkaStreamFactory.writeStreamOffsets(applicationName, brokers,
stream, maxRatePerPartition, batchSeconds)
    val result =
    streamMap += StreamLookupKey(topics, brokers) -> result
    result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator:
Iterator[_]) => {}))
  } catch ErrorHandling.safelyCatch {
    case e: Exception =>
      logger.error("Unable to create direct stream:")
      throw KafkaDirectStreamException(topics.toArray, brokers, e)

def getKafkaParameters(brokers: String, useOldestOffsets: Boolean,
applicationName: String): Map[String, String] =
  Map[String, String](
    "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else "latest"),
    "" -> false.toString, // we'll commit these manually
    "key.deserializer" ->
    "value.deserializer" -> classOf[Decoders.MixedDecoder].getCanonicalName,
    "partition.assignment.strategy" ->
    "bootstrap.servers" -> brokers,
    "" -> applicationName,
    "" -> 240000.toString,
    ""-> 300000.toString

Write code looks like the following:

def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String],
conv: (T) => Array[Byte], numPartitions: Int): Unit = {
  val rddToWrite =
    if (numPartitions > 0) {
    } else {

  // Get session from current threads session
  val session = SparkSession.builder().getOrCreate()
  val df = session.createDataFrame( => Row(conv(x))),
StructType(Array(StructField("value", BinaryType))))
  df.selectExpr("CAST('' AS STRING)", "value")
    .option("kafka.bootstrap.servers", getBrokersToUse(brokers))
    .option("compression.type", "gzip")
    .option("retries", "3")
    .option("topic", topic)


Bryan Jeffrey

