storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "M. Aaron Bossert" <maboss...@gmail.com>
Subject Re: Storm Kafka version mismatch issues
Date Sat, 13 Jan 2018 20:34:31 GMT
Ah!  Perfect.  I'll poke through the docs and hope for a smooth transition.  Thanks!

Get Outlook for iOS<https://aka.ms/o0ukef>
________________________________
From: Stig Rohde Døssing <srdo@apache.org>
Sent: Saturday, January 13, 2018 3:24:31 PM
To: user@storm.apache.org
Subject: Re: Storm Kafka version mismatch issues

Yes, there is no code overlap between storm-kafka and storm-kafka-client. The setup code is
completely different. The code you posted is still using the storm-kafka classes (e.g. old
SpoutConfig instead of new KafkaSpoutConfig), so you're still on the old spout. I'm a little
surprised that code even compiled for you before, since the sbt you posted doesn't list storm-kafka
anymore.

There's documentation for the new spout at https://storm.apache.org/releases/1.1.1/storm-kafka-client.html,
and some example code at https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java.

2018-01-13 21:15 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com<mailto:mabossert@gmail.com>>:
Stig,

This project is still in a POC status, so I can wipe and restart anything...no big deal. 
Thanks!  So, I am now seeing some build errors when I try to run my assembly.  Is there a
different method for setting up the KafkaSpout?

[error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:15:8:
object BrokerHosts is not a member of package org.apache.storm.kafka
[error] import org.apache.storm.kafka.{BrokerHosts, KafkaSpout, SpoutConfig, ZkHosts}
[error]        ^
[error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:161:22:
not found: type BrokerHosts
[error]         val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
[error]                      ^
[error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:161:40:
not found: type ZkHosts
[error]         val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
[error]                                        ^
[error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:166:28:
not found: type SpoutConfig
[error]         val pcapKafkaConf: SpoutConfig = new SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
[error]                            ^
[error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:166:46:
not found: type SpoutConfig
[error]         val pcapKafkaConf: SpoutConfig = new SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
[error]                                              ^
[error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:173:34:
not found: type KafkaSpout
[error]         val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
[error]                                  ^
[error] 6 errors found

and here is the relevant part of my topology:


// Set up the Kafka Spout
val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
val pcapKafkaTopic: String = "packets"
val pcapZkRoot: String = "/packets"
val pcapClientId: String = "PcapKafkaSpout"

val pcapKafkaConf: SpoutConfig = new SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
pcapKafkaConf.ignoreZkOffsets = true
pcapKafkaConf.useStartOffsetTimeIfOffsetOutOfRange = true
pcapKafkaConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime
pcapKafkaConf.outputStreamId = "packets"
pcapKafkaConf.scheme = new SchemeAsMultiScheme(new PcapRecordKafkaScheme)

val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)

// Generates list of PCAP files to process, periodically scanning a magic directory for new
files
topologyBuilder.setSpout("pcapKafkaSpout", pcapKafkaSpout,PcapFileSpoutWorkers)
    .setNumTasks(PcapFileSpoutTasks)
    .setMemoryLoad(16 * 1024)

On Sat, Jan 13, 2018 at 3:05 PM, Stig Rohde Døssing <srdo@apache.org<mailto:srdo@apache.org>>
wrote:
Hi Aaron,

Please note that the position of your storm-kafka consumers won't be migrated, so the new
storm-kafka-client spout will start over on the topics you subscribe to. Do you need offsets
migrated, or is it fine if the consumers start from scratch?

I don't know whether HDP has special requirements but here's the general how-to for setting
up storm-kafka-client:

The storm-kafka-client library should work with any Kafka version 0.10.0.0 and above. You
should use the latest version of storm-kafka-client, which is currently 1.1.1. If you want
a decent number of extra fixes, you might consider building version 1.2.0 yourself from https://github.com/apache/storm/tree/1.x-branch/external/storm-kafka-client,
we are hoping to release this version before too long. Once you've declared a dependency on
storm-kafka-client, you should also add a dependency on org.apache.kafka:kafka-clients, in
the version that matches your Kafka broker version.

Looking at your sbt you should not need org.apache.kafka:kafka on the classpath. I think your
zookeeper, log4j and slf4j-log4j12 exclusions on storm-kafka-client are unnecessary as well.
I don't believe those dependencies are part of the storm-kafka-client dependency tree.

In case making those changes doesn't solve it, could you post a bit more of the stack trace
you get?

2018-01-13 20:23 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com<mailto:mabossert@gmail.com>>:
All,

I have resurrected some old code that includes a Kafka Producer class as well as a storm topology
that includes a Kafka Spout to ingest the messages coming from the aforementioned producer.

I was using the storm-kafka library with Scala 2.11, but when I changed to the newer code
base, which is using Scala 2.12, I found that the older library wouldn't work...thus I wanted
to switch to the new storm-kafka-client, but am not sure what version I should be using. 
here is my build.sbt file, as well as the error messages I am seeing when the Spout actually
runs (my favorite assistant, Google, tells me it is due to a version mismatch between Storm
and Kafka).  I am using HDP 2.6.2, for whatever that is worth...


java.lang.NoSuchMethodError: kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;IIILjava/lang/String;Ljava/lang/String;)


name := "apachestormstreaming"

version := "0.1"

scalaVersion := "2.12.4"

scalacOptions := Seq("-unchecked", "-deprecation")

resolvers ++= Seq(
    "clojars" at "http://clojars.org/repo/",
    "HDP" at "http://repo.hortonworks.com/content/repositories/releases/",
    "Hortonworks Jetty" at "http://repo.hortonworks.com/content/repositories/jetty-hadoop/"
)

libraryDependencies ++= Seq(
    "org.apache.storm" % "flux-core" % "1.1.0.2.6.2.0-205",
    "org.apache.storm" % "storm-core" % "1.1.0.2.6.2.0-205" % Provided
        exclude("org.slf4j", "slf4j-log4j12")
        exclude("log4j","log4j"),
    "org.pcap4j" % "pcap4j-core" % "2.0.0-alpha",
    "org.pcap4j" % "pcap4j-packetfactory-static" % "2.0.0-alpha",
    "org.apache.hadoop" % "hadoop-common" % "2.7.3.2.6.2.0-205"
        exclude("org.slf4j", "slf4j-log4j12")
        exclude("log4j","log4j")
        exclude("commons-beanutils", "commons-beanutils-core")
        exclude("commons-beanutils", "commons-beanutils")
        exclude("commons-collections", "commons-collections"),
    "org.apache.storm" % "storm-kafka-client" % "1.1.0.2.6.2.0-205"
        exclude("org.apache.kafka","kafka-clients")
        exclude("org.slf4j", "slf4j-log4j12")
        exclude("log4j","log4j")
        exclude("org.apache.zookeeper","zookeeper"),
    "org.apache.kafka" % "kafka-clients" % "0.10.1.1",
    "org.apache.kafka" %% "kafka" % "0.10.1.1"
        exclude("org.slf4j", "slf4j-log4j12")
        exclude("log4j","log4j")
        exclude("org.apache.zookeeper","zookeeper"),
    "org.apache.hbase" % "hbase-common" % "1.1.2.2.6.2.0-205"
        exclude("org.slf4j","slf4j-log4j12")
        exclude("log4j","log4j"),
    "org.apache.hbase" % "hbase-client" % "1.1.2.2.6.2.0-205"
        exclude("org.slf4j","slf4j-log4j12")
        exclude("log4j","log4j"),
    "com.paulgoldbaum" %% "scala-influxdb-client" % "0.5.2",
    "org.apache.commons" % "commons-lang3" % "3.6",
    "org.influxdb" % "influxdb-java" % "2.7"
)

assemblyMergeStrategy in assembly := {
    case PathList(ps @ _*) if ps.last endsWith "project.clj" =>
        MergeStrategy.rename
    case PathList(ps @ _*) if ps.last endsWith "UnknownPacketFactory.class" =>
        MergeStrategy.rename
    case PathList("org","apache","http", _ @ _*) =>
        MergeStrategy.first
    case PathList("org","apache","commons","codec", _ @ _*) =>
        MergeStrategy.first
    case PathList("io","netty", _ @ _*) =>
        MergeStrategy.last
    case PathList(ps @ _*) if ps.last equalsIgnoreCase  "io.netty.versions.properties" =>
        MergeStrategy.first
    case PathList(ps @ _*) if ps.last equalsIgnoreCase  "libnetty-transport-native-epoll.so"
=>
        MergeStrategy.first
    case x => val oldStrategy = (assemblyMergeStrategy in assembly).value
        oldStrategy(x)
}




Mime
View raw message