storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stig Rohde Døssing <s...@apache.org>
Subject Re: Storm Kafka version mismatch issues
Date Mon, 15 Jan 2018 17:09:29 GMT
I don't see a port number in your bootstrapServers, it should probably be
there. You also need to set a groupId. If you want the spout to use your
key or value deserializers, you need to set them in the KafkaSpoutConfig as
well, but you'll need to rewrite them so they implement
https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/serialization/Deserializer.html
.

You can see how to set all these here
https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L88
and the effect of setting them here
https://kafka.apache.org/documentation/#consumerconfigs. The generic props
from KafkaSpoutConfig (plus bootstrapServers and key/value deserializer,
since these are mandatory) are passed directly to a KafkaConsumer, so the
valid settings are the ones listed in the Kafka documentation.

2018-01-15 16:42 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:

> Sorry for, perhaps a dumb question, but I have rewritten my topology to
> accommodate the new star-kafka-client API (I think!).  I see no errors of
> any kind, but there are no tuples emitted.  This is the code in the
> topology as well as the scheme I was using in the old topology (old
> storm-kafka library). My producer is creating a record that is translated
> to a byte array, nothing fancy...the underlying record is a custom object.
>
> // Set up the Kafka Spout
> val spoutConf: KafkaSpoutConfig[String,String] =  KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net",
"packets")
>     .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>     .setOffsetCommitPeriodMs(10000)
>     .setMaxUncommittedOffsets(1000)
>     .build()
>
> // Generates list of PCAP files to process, periodically scanning a magic directory for
new files
> topologyBuilder.setSpout("pcapKafkaSpout", new KafkaSpout(spoutConf),PcapFileSpoutWorkers)
>     .setNumTasks(PcapFileSpoutTasks)
>     .setMemoryLoad(16 * 1024)
>
> // Load balancer that groups packets by key
> topologyBuilder.setBolt("IpPairBolt",new IpPairBolt,IpPairBoltWorkers)
>     .setNumTasks(IpPairBoltTasks)
>     .shuffleGrouping("pcapKafkaSpout")
>     .setMemoryLoad(16 * 1024)
>
>
>
> class PcapRecordKafkaScheme extends Scheme {
>     override def deserialize(buffer: ByteBuffer): util.List[AnyRef] = {
>         val byteArray: Array[Byte] = new Array[Byte](buffer.remaining())
>         buffer.get(byteArray,0,byteArray.length)
>         val record: IpPacketRecord = SerializationUtils.deserialize(byteArray).asInstanceOf[IpPacketRecord]
>         new Values(record.ip_port_key,byteArray)
>     }
>
>     override def getOutputFields: Fields = {
>         new Fields("key","value")
>     }
>
> }
>
> .
>
> On Sat, Jan 13, 2018 at 3:34 PM, M. Aaron Bossert <mabossert@gmail.com>
> wrote:
>
>> Ah!  Perfect.  I’ll poke through the docs and hope for a smooth
>> transition.  Thanks!
>>
>> Get Outlook for iOS <https://aka.ms/o0ukef>
>> ------------------------------
>> *From:* Stig Rohde Døssing <srdo@apache.org>
>> *Sent:* Saturday, January 13, 2018 3:24:31 PM
>> *To:* user@storm.apache.org
>> *Subject:* Re: Storm Kafka version mismatch issues
>>
>> Yes, there is no code overlap between storm-kafka and storm-kafka-client.
>> The setup code is completely different. The code you posted is still using
>> the storm-kafka classes (e.g. old SpoutConfig instead of new
>> KafkaSpoutConfig), so you're still on the old spout. I'm a little surprised
>> that code even compiled for you before, since the sbt you posted doesn't
>> list storm-kafka anymore.
>>
>> There's documentation for the new spout at https://storm.apache.org/relea
>> ses/1.1.1/storm-kafka-client.html, and some example code at
>> https://github.com/apache/storm/blob/master/examples/storm-
>> kafka-client-examples/src/main/java/org/apache/storm/
>> kafka/spout/KafkaSpoutTopologyMainNamedTopics.java.
>>
>> 2018-01-13 21:15 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>
>>> Stig,
>>>
>>> This project is still in a POC status, so I can wipe and restart
>>> anything...no big deal.  Thanks!  So, I am now seeing some build errors
>>> when I try to run my assembly.  Is there a different method for setting up
>>> the KafkaSpout?
>>>
>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>> ies/microbatchSummariesTopology.scala:15:8: object BrokerHosts is not a
>>> member of package org.apache.storm.kafka
>>> [error] import org.apache.storm.kafka.{BrokerHosts, KafkaSpout,
>>> SpoutConfig, ZkHosts}
>>> [error]        ^
>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>> ies/microbatchSummariesTopology.scala:161:22: not found: type
>>> BrokerHosts
>>> [error]         val zkHosts: BrokerHosts = new
>>> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>> [error]                      ^
>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>> ies/microbatchSummariesTopology.scala:161:40: not found: type ZkHosts
>>> [error]         val zkHosts: BrokerHosts = new
>>> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>> [error]                                        ^
>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>> ies/microbatchSummariesTopology.scala:166:28: not found: type
>>> SpoutConfig
>>> [error]         val pcapKafkaConf: SpoutConfig = new
>>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>> [error]                            ^
>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>> ies/microbatchSummariesTopology.scala:166:46: not found: type
>>> SpoutConfig
>>> [error]         val pcapKafkaConf: SpoutConfig = new
>>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>> [error]                                              ^
>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>> ies/microbatchSummariesTopology.scala:173:34: not found: type KafkaSpout
>>> [error]         val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
>>> [error]                                  ^
>>> [error] 6 errors found
>>>
>>> and here is the relevant part of my topology:
>>>
>>> // Set up the Kafka Spout
>>> val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>> val pcapKafkaTopic: String = "packets"
>>> val pcapZkRoot: String = "/packets"
>>> val pcapClientId: String = "PcapKafkaSpout"
>>>
>>> val pcapKafkaConf: SpoutConfig = new SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>> pcapKafkaConf.ignoreZkOffsets = true
>>> pcapKafkaConf.useStartOffsetTimeIfOffsetOutOfRange = true
>>> pcapKafkaConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime
>>> pcapKafkaConf.outputStreamId = "packets"
>>> pcapKafkaConf.scheme = new SchemeAsMultiScheme(new PcapRecordKafkaScheme)
>>>
>>> val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
>>>
>>> // Generates list of PCAP files to process, periodically scanning a magic directory
for new files
>>> topologyBuilder.setSpout("pcapKafkaSpout", pcapKafkaSpout,PcapFileSpoutWorkers)
>>>     .setNumTasks(PcapFileSpoutTasks)
>>>     .setMemoryLoad(16 * 1024)
>>>
>>>
>>> On Sat, Jan 13, 2018 at 3:05 PM, Stig Rohde Døssing <srdo@apache.org>
>>> wrote:
>>>
>>>> Hi Aaron,
>>>>
>>>> Please note that the position of your storm-kafka consumers won't be
>>>> migrated, so the new storm-kafka-client spout will start over on the topics
>>>> you subscribe to. Do you need offsets migrated, or is it fine if the
>>>> consumers start from scratch?
>>>>
>>>> I don't know whether HDP has special requirements but here's the
>>>> general how-to for setting up storm-kafka-client:
>>>>
>>>> The storm-kafka-client library should work with any Kafka version
>>>> 0.10.0.0 and above. You should use the latest version of
>>>> storm-kafka-client, which is currently 1.1.1. If you want a decent number
>>>> of extra fixes, you might consider building version 1.2.0 yourself from
>>>> https://github.com/apache/storm/tree/1.x-branch/external/sto
>>>> rm-kafka-client, we are hoping to release this version before too
>>>> long. Once you've declared a dependency on storm-kafka-client, you should
>>>> also add a dependency on org.apache.kafka:kafka-clients, in the
>>>> version that matches your Kafka broker version.
>>>>
>>>> Looking at your sbt you should not need org.apache.kafka:kafka on the
>>>> classpath. I think your zookeeper, log4j and slf4j-log4j12 exclusions on
>>>> storm-kafka-client are unnecessary as well. I don't believe those
>>>> dependencies are part of the storm-kafka-client dependency tree.
>>>>
>>>> In case making those changes doesn't solve it, could you post a bit
>>>> more of the stack trace you get?
>>>>
>>>> 2018-01-13 20:23 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>>>
>>>>> All,
>>>>>
>>>>> I have resurrected some old code that includes a Kafka Producer class
>>>>> as well as a storm topology that includes a Kafka Spout to ingest the
>>>>> messages coming from the aforementioned producer.
>>>>>
>>>>> I was using the storm-kafka library with Scala 2.11, but when I
>>>>> changed to the newer code base, which is using Scala 2.12, I found that
the
>>>>> older library wouldn't work...thus I wanted to switch to the new
>>>>> storm-kafka-client, but am not sure what version I should be using. 
here
>>>>> is my build.sbt file, as well as the error messages I am seeing when
the
>>>>> Spout actually runs (my favorite assistant, Google, tells me it is due
to a
>>>>> version mismatch between Storm and Kafka).  I am using HDP 2.6.2, for
>>>>> whatever that is worth...
>>>>>
>>>>> java.lang.*NoSuchMethodError*: kafka.javaapi.consumer.SimpleC
>>>>> onsumer.<init>(Ljava/lang/String;IIILjava/lang/String;Ljava/
>>>>> lang/String;)
>>>>>
>>>>> name := "apachestormstreaming"
>>>>>
>>>>> version := "0.1"
>>>>>
>>>>> scalaVersion := "2.12.4"
>>>>>
>>>>> scalacOptions := Seq("-unchecked", "-deprecation")
>>>>>
>>>>> resolvers ++= Seq(
>>>>>     "clojars" at "http://clojars.org/repo/",
>>>>>     "HDP" at "http://repo.hortonworks.com/content/repositories/releases/",
>>>>>     "Hortonworks Jetty" at "http://repo.hortonworks.com/content/repositories/jetty-hadoop/"
>>>>> )
>>>>>
>>>>> libraryDependencies ++= Seq(
>>>>>     "org.apache.storm" % "flux-core" % "1.1.0.2.6.2.0-205",
>>>>>     "org.apache.storm" % "storm-core" % "1.1.0.2.6.2.0-205" % Provided
>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>         exclude("log4j","log4j"),
>>>>>     "org.pcap4j" % "pcap4j-core" % "2.0.0-alpha",
>>>>>     "org.pcap4j" % "pcap4j-packetfactory-static" % "2.0.0-alpha",
>>>>>     "org.apache.hadoop" % "hadoop-common" % "2.7.3.2.6.2.0-205"
>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>         exclude("log4j","log4j")
>>>>>         exclude("commons-beanutils", "commons-beanutils-core")
>>>>>         exclude("commons-beanutils", "commons-beanutils")
>>>>>         exclude("commons-collections", "commons-collections"),
>>>>>     "org.apache.storm" % "storm-kafka-client" % "1.1.0.2.6.2.0-205"
>>>>>         exclude("org.apache.kafka","kafka-clients")
>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>         exclude("log4j","log4j")
>>>>>         exclude("org.apache.zookeeper","zookeeper"),
>>>>>     "org.apache.kafka" % "kafka-clients" % "0.10.1.1",
>>>>>     "org.apache.kafka" %% "kafka" % "0.10.1.1"
>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>         exclude("log4j","log4j")
>>>>>         exclude("org.apache.zookeeper","zookeeper"),
>>>>>     "org.apache.hbase" % "hbase-common" % "1.1.2.2.6.2.0-205"
>>>>>         exclude("org.slf4j","slf4j-log4j12")
>>>>>         exclude("log4j","log4j"),
>>>>>     "org.apache.hbase" % "hbase-client" % "1.1.2.2.6.2.0-205"
>>>>>         exclude("org.slf4j","slf4j-log4j12")
>>>>>         exclude("log4j","log4j"),
>>>>>     "com.paulgoldbaum" %% "scala-influxdb-client" % "0.5.2",
>>>>>     "org.apache.commons" % "commons-lang3" % "3.6",
>>>>>     "org.influxdb" % "influxdb-java" % "2.7"
>>>>> )
>>>>>
>>>>> assemblyMergeStrategy in assembly := {
>>>>>     case PathList(ps @ _*) if ps.last endsWith "project.clj" =>
>>>>>         MergeStrategy.rename
>>>>>     case PathList(ps @ _*) if ps.last endsWith "UnknownPacketFactory.class"
=>
>>>>>         MergeStrategy.rename
>>>>>     case PathList("org","apache","http", _ @ _*) =>
>>>>>         MergeStrategy.first
>>>>>     case PathList("org","apache","commons","codec", _ @ _*) =>
>>>>>         MergeStrategy.first
>>>>>     case PathList("io","netty", _ @ _*) =>
>>>>>         MergeStrategy.last
>>>>>     case PathList(ps @ _*) if ps.last equalsIgnoreCase  "io.netty.versions.properties"
=>
>>>>>         MergeStrategy.first
>>>>>     case PathList(ps @ _*) if ps.last equalsIgnoreCase  "libnetty-transport-native-epoll.so"
=>
>>>>>         MergeStrategy.first
>>>>>     case x => val oldStrategy = (assemblyMergeStrategy in assembly).value
>>>>>         oldStrategy(x)
>>>>> }
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message