storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stig Rohde Døssing <s...@apache.org>
Subject Re: Storm Kafka version mismatch issues
Date Mon, 15 Jan 2018 19:46:37 GMT
Your generic types don't make sense. K and V in  KafkaSpout<K, V> should be
your key and value types, which in your case are byte arrays.

Could you post the worker log for the spout executor? It'll probably be in
storm/logs/worker-artifacts/$topo_name/$worker_port/worker.log.

2018-01-15 20:11 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:

> Sorry to keep bugging you about this.  I am banging my head against the
> wall.  I have put in all the fixes, I think...but am seeing no errors of
> any kind, nor am I seeing any tuples coming from the Kafka Spout...even
> though in the Storm UI, I do see the Kafka topic offsets:
> [image: Inline image 1]
>
> // Set up the Kafka Spout
> val spoutConf: KafkaSpoutConfig[String,String] =  KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net:6667",
"packets")
>     .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>     .setOffsetCommitPeriodMs(10000)
>     .setMaxUncommittedOffsets(1000)
>     .setRecordTranslator((r: ConsumerRecord[String,String]) =>
>         new Values(r.topic(), r.partition():java.lang.Integer, r.offset():java.lang.Long,
r.key(), r.value()),
>         new Fields("topic", "partition", "offset", "key", "value"),
>         "packets")
>     .setProp(ConsumerConfig.GROUP_ID_CONFIG,"pcapKafkaSpoutConsumer")
>     .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>     .build()
>
> // Generates list of PCAP files to process, periodically scanning a magic directory for
new files
> topologyBuilder.setSpout("pcapKafkaSpout", new KafkaSpout(spoutConf),PcapFileSpoutWorkers)
>     .setNumTasks(PcapFileSpoutTasks)
>     .setMemoryLoad(16 * 1024)
>
> // Load balancer that groups packets by key
> topologyBuilder.setBolt("IpPairBolt",new IpPairBolt,IpPairBoltWorkers)
>     .setNumTasks(IpPairBoltTasks)
>     .fieldsGrouping("pcapKafkaSpout","packets",new Fields("key"))
>     /*.shuffleGrouping("PcapReaderBolt","packets")
>     .shuffleGrouping("NdStreamSpout","packets")*/
>     .setMemoryLoad(16 * 1024)
>
>
> On Mon, Jan 15, 2018 at 12:09 PM, Stig Rohde Døssing <srdo@apache.org>
> wrote:
>
>> I don't see a port number in your bootstrapServers, it should probably be
>> there. You also need to set a groupId. If you want the spout to use your
>> key or value deserializers, you need to set them in the KafkaSpoutConfig as
>> well, but you'll need to rewrite them so they implement
>> https://kafka.apache.org/0100/javadoc/org/apache/kafka/commo
>> n/serialization/Deserializer.html.
>>
>> You can see how to set all these here https://github.com/apache/stor
>> m/blob/master/examples/storm-kafka-client-examples/src/
>> main/java/org/apache/storm/kafka/spout/KafkaSpoutTopology
>> MainNamedTopics.java#L88 and the effect of setting them here
>> https://kafka.apache.org/documentation/#consumerconfigs. The generic
>> props from KafkaSpoutConfig (plus bootstrapServers and key/value
>> deserializer, since these are mandatory) are passed directly to a
>> KafkaConsumer, so the valid settings are the ones listed in the Kafka
>> documentation.
>>
>> 2018-01-15 16:42 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>
>>> Sorry for, perhaps a dumb question, but I have rewritten my topology to
>>> accommodate the new star-kafka-client API (I think!).  I see no errors of
>>> any kind, but there are no tuples emitted.  This is the code in the
>>> topology as well as the scheme I was using in the old topology (old
>>> storm-kafka library). My producer is creating a record that is translated
>>> to a byte array, nothing fancy...the underlying record is a custom object.
>>>
>>> // Set up the Kafka Spout
>>> val spoutConf: KafkaSpoutConfig[String,String] =  KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net",
"packets")
>>>     .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>>>     .setOffsetCommitPeriodMs(10000)
>>>     .setMaxUncommittedOffsets(1000)
>>>     .build()
>>>
>>> // Generates list of PCAP files to process, periodically scanning a magic directory
for new files
>>> topologyBuilder.setSpout("pcapKafkaSpout", new KafkaSpout(spoutConf),PcapFileSpoutWorkers)
>>>     .setNumTasks(PcapFileSpoutTasks)
>>>     .setMemoryLoad(16 * 1024)
>>>
>>> // Load balancer that groups packets by key
>>> topologyBuilder.setBolt("IpPairBolt",new IpPairBolt,IpPairBoltWorkers)
>>>     .setNumTasks(IpPairBoltTasks)
>>>     .shuffleGrouping("pcapKafkaSpout")
>>>     .setMemoryLoad(16 * 1024)
>>>
>>>
>>>
>>> class PcapRecordKafkaScheme extends Scheme {
>>>     override def deserialize(buffer: ByteBuffer): util.List[AnyRef] = {
>>>         val byteArray: Array[Byte] = new Array[Byte](buffer.remaining())
>>>         buffer.get(byteArray,0,byteArray.length)
>>>         val record: IpPacketRecord = SerializationUtils.deserialize(byteArray).asInstanceOf[IpPacketRecord]
>>>         new Values(record.ip_port_key,byteArray)
>>>     }
>>>
>>>     override def getOutputFields: Fields = {
>>>         new Fields("key","value")
>>>     }
>>>
>>> }
>>>
>>> .
>>>
>>> On Sat, Jan 13, 2018 at 3:34 PM, M. Aaron Bossert <mabossert@gmail.com>
>>> wrote:
>>>
>>>> Ah!  Perfect.  I’ll poke through the docs and hope for a smooth
>>>> transition.  Thanks!
>>>>
>>>> Get Outlook for iOS <https://aka.ms/o0ukef>
>>>> ------------------------------
>>>> *From:* Stig Rohde Døssing <srdo@apache.org>
>>>> *Sent:* Saturday, January 13, 2018 3:24:31 PM
>>>> *To:* user@storm.apache.org
>>>> *Subject:* Re: Storm Kafka version mismatch issues
>>>>
>>>> Yes, there is no code overlap between storm-kafka and
>>>> storm-kafka-client. The setup code is completely different. The code you
>>>> posted is still using the storm-kafka classes (e.g. old SpoutConfig instead
>>>> of new KafkaSpoutConfig), so you're still on the old spout. I'm a little
>>>> surprised that code even compiled for you before, since the sbt you posted
>>>> doesn't list storm-kafka anymore.
>>>>
>>>> There's documentation for the new spout at
>>>> https://storm.apache.org/releases/1.1.1/storm-kafka-client.html, and
>>>> some example code at https://github.com/apache/stor
>>>> m/blob/master/examples/storm-kafka-client-examples/src/main/
>>>> java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainName
>>>> dTopics.java.
>>>>
>>>> 2018-01-13 21:15 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>>>
>>>>> Stig,
>>>>>
>>>>> This project is still in a POC status, so I can wipe and restart
>>>>> anything...no big deal.  Thanks!  So, I am now seeing some build errors
>>>>> when I try to run my assembly.  Is there a different method for setting
up
>>>>> the KafkaSpout?
>>>>>
>>>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>>>> ies/microbatchSummariesTopology.scala:15:8: object BrokerHosts is not
>>>>> a member of package org.apache.storm.kafka
>>>>> [error] import org.apache.storm.kafka.{BrokerHosts, KafkaSpout,
>>>>> SpoutConfig, ZkHosts}
>>>>> [error]        ^
>>>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>>>> ies/microbatchSummariesTopology.scala:161:22: not found: type
>>>>> BrokerHosts
>>>>> [error]         val zkHosts: BrokerHosts = new
>>>>> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>>>> [error]                      ^
>>>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>>>> ies/microbatchSummariesTopology.scala:161:40: not found: type ZkHosts
>>>>> [error]         val zkHosts: BrokerHosts = new
>>>>> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>>>> [error]                                        ^
>>>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>>>> ies/microbatchSummariesTopology.scala:166:28: not found: type
>>>>> SpoutConfig
>>>>> [error]         val pcapKafkaConf: SpoutConfig = new
>>>>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>>>> [error]                            ^
>>>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>>>> ies/microbatchSummariesTopology.scala:166:46: not found: type
>>>>> SpoutConfig
>>>>> [error]         val pcapKafkaConf: SpoutConfig = new
>>>>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>>>> [error]                                              ^
>>>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>>>> ies/microbatchSummariesTopology.scala:173:34: not found: type
>>>>> KafkaSpout
>>>>> [error]         val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
>>>>> [error]                                  ^
>>>>> [error] 6 errors found
>>>>>
>>>>> and here is the relevant part of my topology:
>>>>>
>>>>> // Set up the Kafka Spout
>>>>> val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>>>> val pcapKafkaTopic: String = "packets"
>>>>> val pcapZkRoot: String = "/packets"
>>>>> val pcapClientId: String = "PcapKafkaSpout"
>>>>>
>>>>> val pcapKafkaConf: SpoutConfig = new SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>>>> pcapKafkaConf.ignoreZkOffsets = true
>>>>> pcapKafkaConf.useStartOffsetTimeIfOffsetOutOfRange = true
>>>>> pcapKafkaConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime
>>>>> pcapKafkaConf.outputStreamId = "packets"
>>>>> pcapKafkaConf.scheme = new SchemeAsMultiScheme(new PcapRecordKafkaScheme)
>>>>>
>>>>> val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
>>>>>
>>>>> // Generates list of PCAP files to process, periodically scanning a magic
directory for new files
>>>>> topologyBuilder.setSpout("pcapKafkaSpout", pcapKafkaSpout,PcapFileSpoutWorkers)
>>>>>     .setNumTasks(PcapFileSpoutTasks)
>>>>>     .setMemoryLoad(16 * 1024)
>>>>>
>>>>>
>>>>> On Sat, Jan 13, 2018 at 3:05 PM, Stig Rohde Døssing <srdo@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Aaron,
>>>>>>
>>>>>> Please note that the position of your storm-kafka consumers won't
be
>>>>>> migrated, so the new storm-kafka-client spout will start over on
the topics
>>>>>> you subscribe to. Do you need offsets migrated, or is it fine if
the
>>>>>> consumers start from scratch?
>>>>>>
>>>>>> I don't know whether HDP has special requirements but here's the
>>>>>> general how-to for setting up storm-kafka-client:
>>>>>>
>>>>>> The storm-kafka-client library should work with any Kafka version
>>>>>> 0.10.0.0 and above. You should use the latest version of
>>>>>> storm-kafka-client, which is currently 1.1.1. If you want a decent
number
>>>>>> of extra fixes, you might consider building version 1.2.0 yourself
from
>>>>>> https://github.com/apache/storm/tree/1.x-branch/external/sto
>>>>>> rm-kafka-client, we are hoping to release this version before too
>>>>>> long. Once you've declared a dependency on storm-kafka-client, you
should
>>>>>> also add a dependency on org.apache.kafka:kafka-clients, in the
>>>>>> version that matches your Kafka broker version.
>>>>>>
>>>>>> Looking at your sbt you should not need org.apache.kafka:kafka on
the
>>>>>> classpath. I think your zookeeper, log4j and slf4j-log4j12 exclusions
on
>>>>>> storm-kafka-client are unnecessary as well. I don't believe those
>>>>>> dependencies are part of the storm-kafka-client dependency tree.
>>>>>>
>>>>>> In case making those changes doesn't solve it, could you post a bit
>>>>>> more of the stack trace you get?
>>>>>>
>>>>>> 2018-01-13 20:23 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>>>>>
>>>>>>> All,
>>>>>>>
>>>>>>> I have resurrected some old code that includes a Kafka Producer
>>>>>>> class as well as a storm topology that includes a Kafka Spout
to ingest the
>>>>>>> messages coming from the aforementioned producer.
>>>>>>>
>>>>>>> I was using the storm-kafka library with Scala 2.11, but when
I
>>>>>>> changed to the newer code base, which is using Scala 2.12, I
found that the
>>>>>>> older library wouldn't work...thus I wanted to switch to the
new
>>>>>>> storm-kafka-client, but am not sure what version I should be
using.  here
>>>>>>> is my build.sbt file, as well as the error messages I am seeing
when the
>>>>>>> Spout actually runs (my favorite assistant, Google, tells me
it is due to a
>>>>>>> version mismatch between Storm and Kafka).  I am using HDP 2.6.2,
for
>>>>>>> whatever that is worth...
>>>>>>>
>>>>>>> java.lang.*NoSuchMethodError*: kafka.javaapi.consumer.SimpleC
>>>>>>> onsumer.<init>(Ljava/lang/String;IIILjava/lang/String;Ljava/
>>>>>>> lang/String;)
>>>>>>>
>>>>>>> name := "apachestormstreaming"
>>>>>>>
>>>>>>> version := "0.1"
>>>>>>>
>>>>>>> scalaVersion := "2.12.4"
>>>>>>>
>>>>>>> scalacOptions := Seq("-unchecked", "-deprecation")
>>>>>>>
>>>>>>> resolvers ++= Seq(
>>>>>>>     "clojars" at "http://clojars.org/repo/",
>>>>>>>     "HDP" at "http://repo.hortonworks.com/content/repositories/releases/",
>>>>>>>     "Hortonworks Jetty" at "http://repo.hortonworks.com/content/repositories/jetty-hadoop/"
>>>>>>> )
>>>>>>>
>>>>>>> libraryDependencies ++= Seq(
>>>>>>>     "org.apache.storm" % "flux-core" % "1.1.0.2.6.2.0-205",
>>>>>>>     "org.apache.storm" % "storm-core" % "1.1.0.2.6.2.0-205" %
Provided
>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>         exclude("log4j","log4j"),
>>>>>>>     "org.pcap4j" % "pcap4j-core" % "2.0.0-alpha",
>>>>>>>     "org.pcap4j" % "pcap4j-packetfactory-static" % "2.0.0-alpha",
>>>>>>>     "org.apache.hadoop" % "hadoop-common" % "2.7.3.2.6.2.0-205"
>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>         exclude("log4j","log4j")
>>>>>>>         exclude("commons-beanutils", "commons-beanutils-core")
>>>>>>>         exclude("commons-beanutils", "commons-beanutils")
>>>>>>>         exclude("commons-collections", "commons-collections"),
>>>>>>>     "org.apache.storm" % "storm-kafka-client" % "1.1.0.2.6.2.0-205"
>>>>>>>         exclude("org.apache.kafka","kafka-clients")
>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>         exclude("log4j","log4j")
>>>>>>>         exclude("org.apache.zookeeper","zookeeper"),
>>>>>>>     "org.apache.kafka" % "kafka-clients" % "0.10.1.1",
>>>>>>>     "org.apache.kafka" %% "kafka" % "0.10.1.1"
>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>         exclude("log4j","log4j")
>>>>>>>         exclude("org.apache.zookeeper","zookeeper"),
>>>>>>>     "org.apache.hbase" % "hbase-common" % "1.1.2.2.6.2.0-205"
>>>>>>>         exclude("org.slf4j","slf4j-log4j12")
>>>>>>>         exclude("log4j","log4j"),
>>>>>>>     "org.apache.hbase" % "hbase-client" % "1.1.2.2.6.2.0-205"
>>>>>>>         exclude("org.slf4j","slf4j-log4j12")
>>>>>>>         exclude("log4j","log4j"),
>>>>>>>     "com.paulgoldbaum" %% "scala-influxdb-client" % "0.5.2",
>>>>>>>     "org.apache.commons" % "commons-lang3" % "3.6",
>>>>>>>     "org.influxdb" % "influxdb-java" % "2.7"
>>>>>>> )
>>>>>>>
>>>>>>> assemblyMergeStrategy in assembly := {
>>>>>>>     case PathList(ps @ _*) if ps.last endsWith "project.clj"
=>
>>>>>>>         MergeStrategy.rename
>>>>>>>     case PathList(ps @ _*) if ps.last endsWith "UnknownPacketFactory.class"
=>
>>>>>>>         MergeStrategy.rename
>>>>>>>     case PathList("org","apache","http", _ @ _*) =>
>>>>>>>         MergeStrategy.first
>>>>>>>     case PathList("org","apache","commons","codec", _ @ _*) =>
>>>>>>>         MergeStrategy.first
>>>>>>>     case PathList("io","netty", _ @ _*) =>
>>>>>>>         MergeStrategy.last
>>>>>>>     case PathList(ps @ _*) if ps.last equalsIgnoreCase  "io.netty.versions.properties"
=>
>>>>>>>         MergeStrategy.first
>>>>>>>     case PathList(ps @ _*) if ps.last equalsIgnoreCase  "libnetty-transport-native-epoll.so"
=>
>>>>>>>         MergeStrategy.first
>>>>>>>     case x => val oldStrategy = (assemblyMergeStrategy in
assembly).value
>>>>>>>         oldStrategy(x)
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message