storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "M. Aaron Bossert" <maboss...@gmail.com>
Subject Re: Storm Kafka version mismatch issues
Date Mon, 15 Jan 2018 15:42:01 GMT
Sorry for, perhaps a dumb question, but I have rewritten my topology to
accommodate the new star-kafka-client API (I think!).  I see no errors of
any kind, but there are no tuples emitted.  This is the code in the
topology as well as the scheme I was using in the old topology (old
storm-kafka library). My producer is creating a record that is translated
to a byte array, nothing fancy...the underlying record is a custom object.

// Set up the Kafka Spout
val spoutConf: KafkaSpoutConfig[String,String] =
KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net", "packets")
    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
    .setOffsetCommitPeriodMs(10000)
    .setMaxUncommittedOffsets(1000)
    .build()

// Generates list of PCAP files to process, periodically scanning a
magic directory for new files
topologyBuilder.setSpout("pcapKafkaSpout", new
KafkaSpout(spoutConf),PcapFileSpoutWorkers)
    .setNumTasks(PcapFileSpoutTasks)
    .setMemoryLoad(16 * 1024)

// Load balancer that groups packets by key
topologyBuilder.setBolt("IpPairBolt",new IpPairBolt,IpPairBoltWorkers)
    .setNumTasks(IpPairBoltTasks)
    .shuffleGrouping("pcapKafkaSpout")
    .setMemoryLoad(16 * 1024)



class PcapRecordKafkaScheme extends Scheme {
    override def deserialize(buffer: ByteBuffer): util.List[AnyRef] = {
        val byteArray: Array[Byte] = new Array[Byte](buffer.remaining())
        buffer.get(byteArray,0,byteArray.length)
        val record: IpPacketRecord =
SerializationUtils.deserialize(byteArray).asInstanceOf[IpPacketRecord]
        new Values(record.ip_port_key,byteArray)
    }

    override def getOutputFields: Fields = {
        new Fields("key","value")
    }

}

.

On Sat, Jan 13, 2018 at 3:34 PM, M. Aaron Bossert <mabossert@gmail.com>
wrote:

> Ah!  Perfect.  I’ll poke through the docs and hope for a smooth
> transition.  Thanks!
>
> Get Outlook for iOS <https://aka.ms/o0ukef>
> ------------------------------
> *From:* Stig Rohde Døssing <srdo@apache.org>
> *Sent:* Saturday, January 13, 2018 3:24:31 PM
> *To:* user@storm.apache.org
> *Subject:* Re: Storm Kafka version mismatch issues
>
> Yes, there is no code overlap between storm-kafka and storm-kafka-client.
> The setup code is completely different. The code you posted is still using
> the storm-kafka classes (e.g. old SpoutConfig instead of new
> KafkaSpoutConfig), so you're still on the old spout. I'm a little surprised
> that code even compiled for you before, since the sbt you posted doesn't
> list storm-kafka anymore.
>
> There's documentation for the new spout at https://storm.apache.org/
> releases/1.1.1/storm-kafka-client.html, and some example code at
> https://github.com/apache/storm/blob/master/examples/
> storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/
> KafkaSpoutTopologyMainNamedTopics.java.
>
> 2018-01-13 21:15 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>
>> Stig,
>>
>> This project is still in a POC status, so I can wipe and restart
>> anything...no big deal.  Thanks!  So, I am now seeing some build errors
>> when I try to run my assembly.  Is there a different method for setting up
>> the KafkaSpout?
>>
>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>> ies/microbatchSummariesTopology.scala:15:8: object BrokerHosts is not a
>> member of package org.apache.storm.kafka
>> [error] import org.apache.storm.kafka.{BrokerHosts, KafkaSpout,
>> SpoutConfig, ZkHosts}
>> [error]        ^
>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>> ies/microbatchSummariesTopology.scala:161:22: not found: type BrokerHosts
>> [error]         val zkHosts: BrokerHosts = new
>> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>> [error]                      ^
>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>> ies/microbatchSummariesTopology.scala:161:40: not found: type ZkHosts
>> [error]         val zkHosts: BrokerHosts = new
>> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>> [error]                                        ^
>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>> ies/microbatchSummariesTopology.scala:166:28: not found: type SpoutConfig
>> [error]         val pcapKafkaConf: SpoutConfig = new
>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>> [error]                            ^
>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>> ies/microbatchSummariesTopology.scala:166:46: not found: type SpoutConfig
>> [error]         val pcapKafkaConf: SpoutConfig = new
>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>> [error]                                              ^
>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>> ies/microbatchSummariesTopology.scala:173:34: not found: type KafkaSpout
>> [error]         val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
>> [error]                                  ^
>> [error] 6 errors found
>>
>> and here is the relevant part of my topology:
>>
>> // Set up the Kafka Spout
>> val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>> val pcapKafkaTopic: String = "packets"
>> val pcapZkRoot: String = "/packets"
>> val pcapClientId: String = "PcapKafkaSpout"
>>
>> val pcapKafkaConf: SpoutConfig = new SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>> pcapKafkaConf.ignoreZkOffsets = true
>> pcapKafkaConf.useStartOffsetTimeIfOffsetOutOfRange = true
>> pcapKafkaConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime
>> pcapKafkaConf.outputStreamId = "packets"
>> pcapKafkaConf.scheme = new SchemeAsMultiScheme(new PcapRecordKafkaScheme)
>>
>> val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
>>
>> // Generates list of PCAP files to process, periodically scanning a magic directory
for new files
>> topologyBuilder.setSpout("pcapKafkaSpout", pcapKafkaSpout,PcapFileSpoutWorkers)
>>     .setNumTasks(PcapFileSpoutTasks)
>>     .setMemoryLoad(16 * 1024)
>>
>>
>> On Sat, Jan 13, 2018 at 3:05 PM, Stig Rohde Døssing <srdo@apache.org>
>> wrote:
>>
>>> Hi Aaron,
>>>
>>> Please note that the position of your storm-kafka consumers won't be
>>> migrated, so the new storm-kafka-client spout will start over on the topics
>>> you subscribe to. Do you need offsets migrated, or is it fine if the
>>> consumers start from scratch?
>>>
>>> I don't know whether HDP has special requirements but here's the general
>>> how-to for setting up storm-kafka-client:
>>>
>>> The storm-kafka-client library should work with any Kafka version
>>> 0.10.0.0 and above. You should use the latest version of
>>> storm-kafka-client, which is currently 1.1.1. If you want a decent number
>>> of extra fixes, you might consider building version 1.2.0 yourself from
>>> https://github.com/apache/storm/tree/1.x-branch/external/sto
>>> rm-kafka-client, we are hoping to release this version before too long.
>>> Once you've declared a dependency on storm-kafka-client, you should also
>>> add a dependency on org.apache.kafka:kafka-clients, in the version that
>>> matches your Kafka broker version.
>>>
>>> Looking at your sbt you should not need org.apache.kafka:kafka on the
>>> classpath. I think your zookeeper, log4j and slf4j-log4j12 exclusions on
>>> storm-kafka-client are unnecessary as well. I don't believe those
>>> dependencies are part of the storm-kafka-client dependency tree.
>>>
>>> In case making those changes doesn't solve it, could you post a bit more
>>> of the stack trace you get?
>>>
>>> 2018-01-13 20:23 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>>
>>>> All,
>>>>
>>>> I have resurrected some old code that includes a Kafka Producer class
>>>> as well as a storm topology that includes a Kafka Spout to ingest the
>>>> messages coming from the aforementioned producer.
>>>>
>>>> I was using the storm-kafka library with Scala 2.11, but when I changed
>>>> to the newer code base, which is using Scala 2.12, I found that the older
>>>> library wouldn't work...thus I wanted to switch to the new
>>>> storm-kafka-client, but am not sure what version I should be using.  here
>>>> is my build.sbt file, as well as the error messages I am seeing when the
>>>> Spout actually runs (my favorite assistant, Google, tells me it is due to
a
>>>> version mismatch between Storm and Kafka).  I am using HDP 2.6.2, for
>>>> whatever that is worth...
>>>>
>>>> java.lang.*NoSuchMethodError*: kafka.javaapi.consumer.SimpleC
>>>> onsumer.<init>(Ljava/lang/String;IIILjava/lang/String;Ljava/
>>>> lang/String;)
>>>>
>>>> name := "apachestormstreaming"
>>>>
>>>> version := "0.1"
>>>>
>>>> scalaVersion := "2.12.4"
>>>>
>>>> scalacOptions := Seq("-unchecked", "-deprecation")
>>>>
>>>> resolvers ++= Seq(
>>>>     "clojars" at "http://clojars.org/repo/",
>>>>     "HDP" at "http://repo.hortonworks.com/content/repositories/releases/",
>>>>     "Hortonworks Jetty" at "http://repo.hortonworks.com/content/repositories/jetty-hadoop/"
>>>> )
>>>>
>>>> libraryDependencies ++= Seq(
>>>>     "org.apache.storm" % "flux-core" % "1.1.0.2.6.2.0-205",
>>>>     "org.apache.storm" % "storm-core" % "1.1.0.2.6.2.0-205" % Provided
>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>         exclude("log4j","log4j"),
>>>>     "org.pcap4j" % "pcap4j-core" % "2.0.0-alpha",
>>>>     "org.pcap4j" % "pcap4j-packetfactory-static" % "2.0.0-alpha",
>>>>     "org.apache.hadoop" % "hadoop-common" % "2.7.3.2.6.2.0-205"
>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>         exclude("log4j","log4j")
>>>>         exclude("commons-beanutils", "commons-beanutils-core")
>>>>         exclude("commons-beanutils", "commons-beanutils")
>>>>         exclude("commons-collections", "commons-collections"),
>>>>     "org.apache.storm" % "storm-kafka-client" % "1.1.0.2.6.2.0-205"
>>>>         exclude("org.apache.kafka","kafka-clients")
>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>         exclude("log4j","log4j")
>>>>         exclude("org.apache.zookeeper","zookeeper"),
>>>>     "org.apache.kafka" % "kafka-clients" % "0.10.1.1",
>>>>     "org.apache.kafka" %% "kafka" % "0.10.1.1"
>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>         exclude("log4j","log4j")
>>>>         exclude("org.apache.zookeeper","zookeeper"),
>>>>     "org.apache.hbase" % "hbase-common" % "1.1.2.2.6.2.0-205"
>>>>         exclude("org.slf4j","slf4j-log4j12")
>>>>         exclude("log4j","log4j"),
>>>>     "org.apache.hbase" % "hbase-client" % "1.1.2.2.6.2.0-205"
>>>>         exclude("org.slf4j","slf4j-log4j12")
>>>>         exclude("log4j","log4j"),
>>>>     "com.paulgoldbaum" %% "scala-influxdb-client" % "0.5.2",
>>>>     "org.apache.commons" % "commons-lang3" % "3.6",
>>>>     "org.influxdb" % "influxdb-java" % "2.7"
>>>> )
>>>>
>>>> assemblyMergeStrategy in assembly := {
>>>>     case PathList(ps @ _*) if ps.last endsWith "project.clj" =>
>>>>         MergeStrategy.rename
>>>>     case PathList(ps @ _*) if ps.last endsWith "UnknownPacketFactory.class"
=>
>>>>         MergeStrategy.rename
>>>>     case PathList("org","apache","http", _ @ _*) =>
>>>>         MergeStrategy.first
>>>>     case PathList("org","apache","commons","codec", _ @ _*) =>
>>>>         MergeStrategy.first
>>>>     case PathList("io","netty", _ @ _*) =>
>>>>         MergeStrategy.last
>>>>     case PathList(ps @ _*) if ps.last equalsIgnoreCase  "io.netty.versions.properties"
=>
>>>>         MergeStrategy.first
>>>>     case PathList(ps @ _*) if ps.last equalsIgnoreCase  "libnetty-transport-native-epoll.so"
=>
>>>>         MergeStrategy.first
>>>>     case x => val oldStrategy = (assemblyMergeStrategy in assembly).value
>>>>         oldStrategy(x)
>>>> }
>>>>
>>>>
>>>
>>
>

Mime
View raw message