spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bowden, Chris" <chris.bow...@hpe.com>
Subject Re: Structured Streaming - Kafka
Date Tue, 07 Mar 2017 22:18:59 GMT
https://issues.apache.org/jira/browse/SPARK-19853, pr by eow

________________________________
From: Shixiong(Ryan) Zhu <shixiong@databricks.com>
Sent: Tuesday, March 7, 2017 2:04:45 PM
To: Bowden, Chris
Cc: user; Gudenkauf, Jack
Subject: Re: Structured Streaming - Kafka

Good catch. Could you create a ticket? You can also submit a PR to fix it if you have time
:)

On Tue, Mar 7, 2017 at 1:52 PM, Bowden, Chris <chris.bowden@hpe.com<mailto:chris.bowden@hpe.com>>
wrote:

Potential bug when using startingOffsets = SpecificOffsets with Kafka topics containing uppercase
characters?

KafkaSourceProvider#L80/86:

val startingOffsets =
  caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
    case Some("latest") => LatestOffsets
    case Some("earliest") => EarliestOffsets
    case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
    case None => LatestOffsets
  }

Topics in JSON get lowered so underlying assignments in the consumer are incorrect, and the
assertion in KafkaSource#L326 triggers:

private def fetchSpecificStartingOffsets(
    partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
  val result = withRetriesWithoutInterrupt {
    // Poll to get the latest assigned partitions
    consumer.poll(0)
    val partitions = consumer.assignment()
    consumer.pause(partitions)
    assert(partitions.asScala == partitionOffsets.keySet,
      "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n"
+
        "Use -1 for latest, -2 for earliest, if you don't care.\n" +
        s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")


Mime
View raw message