storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stig Rohde Døssing <s...@apache.org>
Subject Re: Storm Kafka version mismatch issues
Date Sat, 13 Jan 2018 20:05:25 GMT
Hi Aaron,

Please note that the position of your storm-kafka consumers won't be
migrated, so the new storm-kafka-client spout will start over on the topics
you subscribe to. Do you need offsets migrated, or is it fine if the
consumers start from scratch?

I don't know whether HDP has special requirements but here's the general
how-to for setting up storm-kafka-client:

The storm-kafka-client library should work with any Kafka version 0.10.0.0
and above. You should use the latest version of storm-kafka-client, which
is currently 1.1.1. If you want a decent number of extra fixes, you might
consider building version 1.2.0 yourself from
https://github.com/apache/storm/tree/1.x-branch/external/storm-kafka-client,
we are hoping to release this version before too long. Once you've declared
a dependency on storm-kafka-client, you should also add a dependency on
org.apache.kafka:kafka-clients, in the version that matches your Kafka
broker version.

Looking at your sbt you should not need org.apache.kafka:kafka on the
classpath. I think your zookeeper, log4j and slf4j-log4j12 exclusions on
storm-kafka-client are unnecessary as well. I don't believe those
dependencies are part of the storm-kafka-client dependency tree.

In case making those changes doesn't solve it, could you post a bit more of
the stack trace you get?

2018-01-13 20:23 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:

> All,
>
> I have resurrected some old code that includes a Kafka Producer class as
> well as a storm topology that includes a Kafka Spout to ingest the messages
> coming from the aforementioned producer.
>
> I was using the storm-kafka library with Scala 2.11, but when I changed to
> the newer code base, which is using Scala 2.12, I found that the older
> library wouldn't work...thus I wanted to switch to the new
> storm-kafka-client, but am not sure what version I should be using.  here
> is my build.sbt file, as well as the error messages I am seeing when the
> Spout actually runs (my favorite assistant, Google, tells me it is due to a
> version mismatch between Storm and Kafka).  I am using HDP 2.6.2, for
> whatever that is worth...
>
> java.lang.*NoSuchMethodError*: kafka.javaapi.consumer.
> SimpleConsumer.<init>(Ljava/lang/String;IIILjava/lang/
> String;Ljava/lang/String;)
>
> name := "apachestormstreaming"
>
> version := "0.1"
>
> scalaVersion := "2.12.4"
>
> scalacOptions := Seq("-unchecked", "-deprecation")
>
> resolvers ++= Seq(
>     "clojars" at "http://clojars.org/repo/",
>     "HDP" at "http://repo.hortonworks.com/content/repositories/releases/",
>     "Hortonworks Jetty" at "http://repo.hortonworks.com/content/repositories/jetty-hadoop/"
> )
>
> libraryDependencies ++= Seq(
>     "org.apache.storm" % "flux-core" % "1.1.0.2.6.2.0-205",
>     "org.apache.storm" % "storm-core" % "1.1.0.2.6.2.0-205" % Provided
>         exclude("org.slf4j", "slf4j-log4j12")
>         exclude("log4j","log4j"),
>     "org.pcap4j" % "pcap4j-core" % "2.0.0-alpha",
>     "org.pcap4j" % "pcap4j-packetfactory-static" % "2.0.0-alpha",
>     "org.apache.hadoop" % "hadoop-common" % "2.7.3.2.6.2.0-205"
>         exclude("org.slf4j", "slf4j-log4j12")
>         exclude("log4j","log4j")
>         exclude("commons-beanutils", "commons-beanutils-core")
>         exclude("commons-beanutils", "commons-beanutils")
>         exclude("commons-collections", "commons-collections"),
>     "org.apache.storm" % "storm-kafka-client" % "1.1.0.2.6.2.0-205"
>         exclude("org.apache.kafka","kafka-clients")
>         exclude("org.slf4j", "slf4j-log4j12")
>         exclude("log4j","log4j")
>         exclude("org.apache.zookeeper","zookeeper"),
>     "org.apache.kafka" % "kafka-clients" % "0.10.1.1",
>     "org.apache.kafka" %% "kafka" % "0.10.1.1"
>         exclude("org.slf4j", "slf4j-log4j12")
>         exclude("log4j","log4j")
>         exclude("org.apache.zookeeper","zookeeper"),
>     "org.apache.hbase" % "hbase-common" % "1.1.2.2.6.2.0-205"
>         exclude("org.slf4j","slf4j-log4j12")
>         exclude("log4j","log4j"),
>     "org.apache.hbase" % "hbase-client" % "1.1.2.2.6.2.0-205"
>         exclude("org.slf4j","slf4j-log4j12")
>         exclude("log4j","log4j"),
>     "com.paulgoldbaum" %% "scala-influxdb-client" % "0.5.2",
>     "org.apache.commons" % "commons-lang3" % "3.6",
>     "org.influxdb" % "influxdb-java" % "2.7"
> )
>
> assemblyMergeStrategy in assembly := {
>     case PathList(ps @ _*) if ps.last endsWith "project.clj" =>
>         MergeStrategy.rename
>     case PathList(ps @ _*) if ps.last endsWith "UnknownPacketFactory.class" =>
>         MergeStrategy.rename
>     case PathList("org","apache","http", _ @ _*) =>
>         MergeStrategy.first
>     case PathList("org","apache","commons","codec", _ @ _*) =>
>         MergeStrategy.first
>     case PathList("io","netty", _ @ _*) =>
>         MergeStrategy.last
>     case PathList(ps @ _*) if ps.last equalsIgnoreCase  "io.netty.versions.properties"
=>
>         MergeStrategy.first
>     case PathList(ps @ _*) if ps.last equalsIgnoreCase  "libnetty-transport-native-epoll.so"
=>
>         MergeStrategy.first
>     case x => val oldStrategy = (assemblyMergeStrategy in assembly).value
>         oldStrategy(x)
> }
>
>

Mime
View raw message