storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "M. Aaron Bossert" <maboss...@gmail.com>
Subject Re: Storm Kafka version mismatch issues
Date Mon, 15 Jan 2018 20:08:52 GMT
That is it...The only thing I see that is suspicious is that the port keeps
changing, suggesting that the spout keeps dying.  But I don't see any red
flags elsewhere to explain the recurring restarts.

On Mon, Jan 15, 2018 at 2:57 PM, Stig Rohde Døssing <srdo@apache.org> wrote:

> Hm, that doesn't really tell me anything. Is it the full log? I'd expect
> to see some INFO-level logs related to starting up the spout.
>
> 2018-01-15 20:54 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>
>> 2018-01-15 19:21:26.132 STDERR Thread-1 [INFO] Jan 15, 2018 7:21:25 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:21:26.132 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:21:26.138 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:21:26.143 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> 2018-01-15 19:23:29.223 STDERR Thread-1 [INFO] Jan 15, 2018 7:23:29 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:23:29.223 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:23:29.231 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:23:29.231 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> 2018-01-15 19:27:46.230 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:27:46.230 STDERR Thread-1 [INFO] Jan 15, 2018 7:27:46 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:27:46.241 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:27:46.268 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> 2018-01-15 19:29:49.325 STDERR Thread-1 [INFO] Jan 15, 2018 7:29:49 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:29:49.325 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:29:49.336 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:29:49.336 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> 2018-01-15 19:31:59.270 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:31:59.270 STDERR Thread-1 [INFO] Jan 15, 2018 7:31:59 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:31:59.281 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:31:59.304 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> 2018-01-15 19:34:02.315 STDERR Thread-1 [INFO] Jan 15, 2018 7:34:02 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:34:02.315 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:34:02.326 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:34:02.326 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> 2018-01-15 19:36:13.337 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:36:13.337 STDERR Thread-1 [INFO] Jan 15, 2018 7:36:13 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:36:13.347 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:36:13.349 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> 2018-01-15 19:38:16.341 STDERR Thread-1 [INFO] Jan 15, 2018 7:38:16 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:38:16.341 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:38:16.348 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:38:16.349 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> 2018-01-15 19:40:27.423 STDERR Thread-1 [INFO] Jan 15, 2018 7:40:27 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:40:27.423 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:40:27.430 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:40:27.430 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> 2018-01-15 19:42:30.411 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:42:30.411 STDERR Thread-1 [INFO] Jan 15, 2018 7:42:30 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:42:30.418 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:42:30.427 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> 2018-01-15 19:44:42.462 STDERR Thread-1 [INFO] Jan 15, 2018 7:44:42 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:44:42.462 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:44:42.469 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:44:42.469 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> 2018-01-15 19:46:45.469 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:46:45.469 STDERR Thread-1 [INFO] Jan 15, 2018 7:46:45 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:46:45.476 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:46:45.476 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> 2018-01-15 19:48:55.496 STDERR Thread-1 [INFO] Jan 15, 2018 7:48:55 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:48:55.496 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:48:55.502 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:48:55.520 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> 2018-01-15 19:50:58.565 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:50:58.565 STDERR Thread-1 [INFO] Jan 15, 2018 7:50:58 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:50:58.572 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:50:58.572 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> 2018-01-15 19:53:08.572 STDERR Thread-2 [INFO] JMXetricAgent instrumented
>> JVM, see https://github.com/ganglia/jmxetric
>>
>> 2018-01-15 19:53:08.572 STDERR Thread-1 [INFO] Jan 15, 2018 7:53:08 PM
>> info.ganglia.gmetric4j.GMonitor start
>>
>> 2018-01-15 19:53:08.579 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers
>>
>> 2018-01-15 19:53:08.580 STDERR Thread-1 [INFO] Error: JMX connector
>> server communication error: service:jmx:rmi://r7u14.thanos
>> .gotgdt.net:16706
>>
>> On Mon, Jan 15, 2018 at 2:46 PM, Stig Rohde Døssing <srdo@apache.org>
>> wrote:
>>
>>> Your generic types don't make sense. K and V in  KafkaSpout<K, V> should
>>> be your key and value types, which in your case are byte arrays.
>>>
>>> Could you post the worker log for the spout executor? It'll probably be
>>> in storm/logs/worker-artifacts/$topo_name/$worker_port/worker.log.
>>>
>>> 2018-01-15 20:11 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>>
>>>> Sorry to keep bugging you about this.  I am banging my head against the
>>>> wall.  I have put in all the fixes, I think...but am seeing no errors of
>>>> any kind, nor am I seeing any tuples coming from the Kafka Spout...even
>>>> though in the Storm UI, I do see the Kafka topic offsets:
>>>> [image: Inline image 1]
>>>>
>>>> // Set up the Kafka Spout
>>>> val spoutConf: KafkaSpoutConfig[String,String] =  KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net:6667",
"packets")
>>>>     .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>>>>     .setOffsetCommitPeriodMs(10000)
>>>>     .setMaxUncommittedOffsets(1000)
>>>>     .setRecordTranslator((r: ConsumerRecord[String,String]) =>
>>>>         new Values(r.topic(), r.partition():java.lang.Integer, r.offset():java.lang.Long,
r.key(), r.value()),
>>>>         new Fields("topic", "partition", "offset", "key", "value"),
>>>>         "packets")
>>>>     .setProp(ConsumerConfig.GROUP_ID_CONFIG,"pcapKafkaSpoutConsumer")
>>>>     .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>>>     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>>>     .build()
>>>>
>>>> // Generates list of PCAP files to process, periodically scanning a magic
directory for new files
>>>> topologyBuilder.setSpout("pcapKafkaSpout", new KafkaSpout(spoutConf),PcapFileSpoutWorkers)
>>>>     .setNumTasks(PcapFileSpoutTasks)
>>>>     .setMemoryLoad(16 * 1024)
>>>>
>>>> // Load balancer that groups packets by key
>>>> topologyBuilder.setBolt("IpPairBolt",new IpPairBolt,IpPairBoltWorkers)
>>>>     .setNumTasks(IpPairBoltTasks)
>>>>     .fieldsGrouping("pcapKafkaSpout","packets",new Fields("key"))
>>>>     /*.shuffleGrouping("PcapReaderBolt","packets")
>>>>     .shuffleGrouping("NdStreamSpout","packets")*/
>>>>     .setMemoryLoad(16 * 1024)
>>>>
>>>>
>>>> On Mon, Jan 15, 2018 at 12:09 PM, Stig Rohde Døssing <srdo@apache.org>
>>>> wrote:
>>>>
>>>>> I don't see a port number in your bootstrapServers, it should probably
>>>>> be there. You also need to set a groupId. If you want the spout to use
your
>>>>> key or value deserializers, you need to set them in the KafkaSpoutConfig
as
>>>>> well, but you'll need to rewrite them so they implement
>>>>> https://kafka.apache.org/0100/javadoc/org/apache/kafka/commo
>>>>> n/serialization/Deserializer.html.
>>>>>
>>>>> You can see how to set all these here https://github.com/apache/stor
>>>>> m/blob/master/examples/storm-kafka-client-examples/src/main/
>>>>> java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainName
>>>>> dTopics.java#L88 and the effect of setting them here
>>>>> https://kafka.apache.org/documentation/#consumerconfigs. The generic
>>>>> props from KafkaSpoutConfig (plus bootstrapServers and key/value
>>>>> deserializer, since these are mandatory) are passed directly to a
>>>>> KafkaConsumer, so the valid settings are the ones listed in the Kafka
>>>>> documentation.
>>>>>
>>>>> 2018-01-15 16:42 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>>>>
>>>>>> Sorry for, perhaps a dumb question, but I have rewritten my topology
>>>>>> to accommodate the new star-kafka-client API (I think!).  I see no
errors
>>>>>> of any kind, but there are no tuples emitted.  This is the code in
the
>>>>>> topology as well as the scheme I was using in the old topology (old
>>>>>> storm-kafka library). My producer is creating a record that is translated
>>>>>> to a byte array, nothing fancy...the underlying record is a custom
object.
>>>>>>
>>>>>> // Set up the Kafka Spout
>>>>>> val spoutConf: KafkaSpoutConfig[String,String] =  KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net",
"packets")
>>>>>>     .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>>>>>>     .setOffsetCommitPeriodMs(10000)
>>>>>>     .setMaxUncommittedOffsets(1000)
>>>>>>     .build()
>>>>>>
>>>>>> // Generates list of PCAP files to process, periodically scanning
a magic directory for new files
>>>>>> topologyBuilder.setSpout("pcapKafkaSpout", new KafkaSpout(spoutConf),PcapFileSpoutWorkers)
>>>>>>     .setNumTasks(PcapFileSpoutTasks)
>>>>>>     .setMemoryLoad(16 * 1024)
>>>>>>
>>>>>> // Load balancer that groups packets by key
>>>>>> topologyBuilder.setBolt("IpPairBolt",new IpPairBolt,IpPairBoltWorkers)
>>>>>>     .setNumTasks(IpPairBoltTasks)
>>>>>>     .shuffleGrouping("pcapKafkaSpout")
>>>>>>     .setMemoryLoad(16 * 1024)
>>>>>>
>>>>>>
>>>>>>
>>>>>> class PcapRecordKafkaScheme extends Scheme {
>>>>>>     override def deserialize(buffer: ByteBuffer): util.List[AnyRef]
= {
>>>>>>         val byteArray: Array[Byte] = new Array[Byte](buffer.remaining())
>>>>>>         buffer.get(byteArray,0,byteArray.length)
>>>>>>         val record: IpPacketRecord = SerializationUtils.deserialize(byteArray).asInstanceOf[IpPacketRecord]
>>>>>>         new Values(record.ip_port_key,byteArray)
>>>>>>     }
>>>>>>
>>>>>>     override def getOutputFields: Fields = {
>>>>>>         new Fields("key","value")
>>>>>>     }
>>>>>>
>>>>>> }
>>>>>>
>>>>>> .
>>>>>>
>>>>>> On Sat, Jan 13, 2018 at 3:34 PM, M. Aaron Bossert <
>>>>>> mabossert@gmail.com> wrote:
>>>>>>
>>>>>>> Ah!  Perfect.  I’ll poke through the docs and hope for a smooth
>>>>>>> transition.  Thanks!
>>>>>>>
>>>>>>> Get Outlook for iOS <https://aka.ms/o0ukef>
>>>>>>> ------------------------------
>>>>>>> *From:* Stig Rohde Døssing <srdo@apache.org>
>>>>>>> *Sent:* Saturday, January 13, 2018 3:24:31 PM
>>>>>>> *To:* user@storm.apache.org
>>>>>>> *Subject:* Re: Storm Kafka version mismatch issues
>>>>>>>
>>>>>>> Yes, there is no code overlap between storm-kafka and
>>>>>>> storm-kafka-client. The setup code is completely different. The
code you
>>>>>>> posted is still using the storm-kafka classes (e.g. old SpoutConfig
instead
>>>>>>> of new KafkaSpoutConfig), so you're still on the old spout. I'm
a little
>>>>>>> surprised that code even compiled for you before, since the sbt
you posted
>>>>>>> doesn't list storm-kafka anymore.
>>>>>>>
>>>>>>> There's documentation for the new spout at
>>>>>>> https://storm.apache.org/releases/1.1.1/storm-kafka-client.html,
>>>>>>> and some example code at https://github.com/apache/stor
>>>>>>> m/blob/master/examples/storm-kafka-client-examples/src/main/
>>>>>>> java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainName
>>>>>>> dTopics.java.
>>>>>>>
>>>>>>> 2018-01-13 21:15 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>>>>>>
>>>>>>>> Stig,
>>>>>>>>
>>>>>>>> This project is still in a POC status, so I can wipe and
restart
>>>>>>>> anything...no big deal.  Thanks!  So, I am now seeing some
build errors
>>>>>>>> when I try to run my assembly.  Is there a different method
for setting up
>>>>>>>> the KafkaSpout?
>>>>>>>>
>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:15:8:
>>>>>>>> object BrokerHosts is not a member of package org.apache.storm.kafka
>>>>>>>> [error] import org.apache.storm.kafka.{BrokerHosts, KafkaSpout,
>>>>>>>> SpoutConfig, ZkHosts}
>>>>>>>> [error]        ^
>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:161:22:
>>>>>>>> not found: type BrokerHosts
>>>>>>>> [error]         val zkHosts: BrokerHosts = new
>>>>>>>> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>>>>>>> [error]                      ^
>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:161:40:
>>>>>>>> not found: type ZkHosts
>>>>>>>> [error]         val zkHosts: BrokerHosts = new
>>>>>>>> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>>>>>>> [error]                                        ^
>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:166:28:
>>>>>>>> not found: type SpoutConfig
>>>>>>>> [error]         val pcapKafkaConf: SpoutConfig = new
>>>>>>>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>>>>>>> [error]                            ^
>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:166:46:
>>>>>>>> not found: type SpoutConfig
>>>>>>>> [error]         val pcapKafkaConf: SpoutConfig = new
>>>>>>>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>>>>>>> [error]                                              ^
>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:173:34:
>>>>>>>> not found: type KafkaSpout
>>>>>>>> [error]         val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
>>>>>>>> [error]                                  ^
>>>>>>>> [error] 6 errors found
>>>>>>>>
>>>>>>>> and here is the relevant part of my topology:
>>>>>>>>
>>>>>>>> // Set up the Kafka Spout
>>>>>>>> val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>>>>>>> val pcapKafkaTopic: String = "packets"
>>>>>>>> val pcapZkRoot: String = "/packets"
>>>>>>>> val pcapClientId: String = "PcapKafkaSpout"
>>>>>>>>
>>>>>>>> val pcapKafkaConf: SpoutConfig = new SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>>>>>>> pcapKafkaConf.ignoreZkOffsets = true
>>>>>>>> pcapKafkaConf.useStartOffsetTimeIfOffsetOutOfRange = true
>>>>>>>> pcapKafkaConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime
>>>>>>>> pcapKafkaConf.outputStreamId = "packets"
>>>>>>>> pcapKafkaConf.scheme = new SchemeAsMultiScheme(new PcapRecordKafkaScheme)
>>>>>>>>
>>>>>>>> val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
>>>>>>>>
>>>>>>>> // Generates list of PCAP files to process, periodically
scanning a magic directory for new files
>>>>>>>> topologyBuilder.setSpout("pcapKafkaSpout", pcapKafkaSpout,PcapFileSpoutWorkers)
>>>>>>>>     .setNumTasks(PcapFileSpoutTasks)
>>>>>>>>     .setMemoryLoad(16 * 1024)
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Jan 13, 2018 at 3:05 PM, Stig Rohde Døssing <
>>>>>>>> srdo@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Hi Aaron,
>>>>>>>>>
>>>>>>>>> Please note that the position of your storm-kafka consumers
won't
>>>>>>>>> be migrated, so the new storm-kafka-client spout will
start over on the
>>>>>>>>> topics you subscribe to. Do you need offsets migrated,
or is it fine if the
>>>>>>>>> consumers start from scratch?
>>>>>>>>>
>>>>>>>>> I don't know whether HDP has special requirements but
here's the
>>>>>>>>> general how-to for setting up storm-kafka-client:
>>>>>>>>>
>>>>>>>>> The storm-kafka-client library should work with any Kafka
version
>>>>>>>>> 0.10.0.0 and above. You should use the latest version
of
>>>>>>>>> storm-kafka-client, which is currently 1.1.1. If you
want a decent number
>>>>>>>>> of extra fixes, you might consider building version 1.2.0
yourself from
>>>>>>>>> https://github.com/apache/storm/tree/1.x-branch/external/sto
>>>>>>>>> rm-kafka-client, we are hoping to release this version
before too
>>>>>>>>> long. Once you've declared a dependency on storm-kafka-client,
you should
>>>>>>>>> also add a dependency on org.apache.kafka:kafka-clients,
in the
>>>>>>>>> version that matches your Kafka broker version.
>>>>>>>>>
>>>>>>>>> Looking at your sbt you should not need org.apache.kafka:kafka
on
>>>>>>>>> the classpath. I think your zookeeper, log4j and slf4j-log4j12
exclusions
>>>>>>>>> on storm-kafka-client are unnecessary as well. I don't
believe those
>>>>>>>>> dependencies are part of the storm-kafka-client dependency
tree.
>>>>>>>>>
>>>>>>>>> In case making those changes doesn't solve it, could
you post a
>>>>>>>>> bit more of the stack trace you get?
>>>>>>>>>
>>>>>>>>> 2018-01-13 20:23 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>>>>>>>>
>>>>>>>>>> All,
>>>>>>>>>>
>>>>>>>>>> I have resurrected some old code that includes a
Kafka Producer
>>>>>>>>>> class as well as a storm topology that includes a
Kafka Spout to ingest the
>>>>>>>>>> messages coming from the aforementioned producer.
>>>>>>>>>>
>>>>>>>>>> I was using the storm-kafka library with Scala 2.11,
but when I
>>>>>>>>>> changed to the newer code base, which is using Scala
2.12, I found that the
>>>>>>>>>> older library wouldn't work...thus I wanted to switch
to the new
>>>>>>>>>> storm-kafka-client, but am not sure what version
I should be using.  here
>>>>>>>>>> is my build.sbt file, as well as the error messages
I am seeing when the
>>>>>>>>>> Spout actually runs (my favorite assistant, Google,
tells me it is due to a
>>>>>>>>>> version mismatch between Storm and Kafka).  I am
using HDP 2.6.2, for
>>>>>>>>>> whatever that is worth...
>>>>>>>>>>
>>>>>>>>>> java.lang.*NoSuchMethodError*: kafka.javaapi.consumer.SimpleC
>>>>>>>>>> onsumer.<init>(Ljava/lang/String;IIILjava/lang/String;Ljava/
>>>>>>>>>> lang/String;)
>>>>>>>>>>
>>>>>>>>>> name := "apachestormstreaming"
>>>>>>>>>>
>>>>>>>>>> version := "0.1"
>>>>>>>>>>
>>>>>>>>>> scalaVersion := "2.12.4"
>>>>>>>>>>
>>>>>>>>>> scalacOptions := Seq("-unchecked", "-deprecation")
>>>>>>>>>>
>>>>>>>>>> resolvers ++= Seq(
>>>>>>>>>>     "clojars" at "http://clojars.org/repo/",
>>>>>>>>>>     "HDP" at "http://repo.hortonworks.com/content/repositories/releases/",
>>>>>>>>>>     "Hortonworks Jetty" at "http://repo.hortonworks.com/content/repositories/jetty-hadoop/"
>>>>>>>>>> )
>>>>>>>>>>
>>>>>>>>>> libraryDependencies ++= Seq(
>>>>>>>>>>     "org.apache.storm" % "flux-core" % "1.1.0.2.6.2.0-205",
>>>>>>>>>>     "org.apache.storm" % "storm-core" % "1.1.0.2.6.2.0-205"
% Provided
>>>>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>>>>         exclude("log4j","log4j"),
>>>>>>>>>>     "org.pcap4j" % "pcap4j-core" % "2.0.0-alpha",
>>>>>>>>>>     "org.pcap4j" % "pcap4j-packetfactory-static"
% "2.0.0-alpha",
>>>>>>>>>>     "org.apache.hadoop" % "hadoop-common" % "2.7.3.2.6.2.0-205"
>>>>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>>>>         exclude("log4j","log4j")
>>>>>>>>>>         exclude("commons-beanutils", "commons-beanutils-core")
>>>>>>>>>>         exclude("commons-beanutils", "commons-beanutils")
>>>>>>>>>>         exclude("commons-collections", "commons-collections"),
>>>>>>>>>>     "org.apache.storm" % "storm-kafka-client" % "1.1.0.2.6.2.0-205"
>>>>>>>>>>         exclude("org.apache.kafka","kafka-clients")
>>>>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>>>>         exclude("log4j","log4j")
>>>>>>>>>>         exclude("org.apache.zookeeper","zookeeper"),
>>>>>>>>>>     "org.apache.kafka" % "kafka-clients" % "0.10.1.1",
>>>>>>>>>>     "org.apache.kafka" %% "kafka" % "0.10.1.1"
>>>>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>>>>         exclude("log4j","log4j")
>>>>>>>>>>         exclude("org.apache.zookeeper","zookeeper"),
>>>>>>>>>>     "org.apache.hbase" % "hbase-common" % "1.1.2.2.6.2.0-205"
>>>>>>>>>>         exclude("org.slf4j","slf4j-log4j12")
>>>>>>>>>>         exclude("log4j","log4j"),
>>>>>>>>>>     "org.apache.hbase" % "hbase-client" % "1.1.2.2.6.2.0-205"
>>>>>>>>>>         exclude("org.slf4j","slf4j-log4j12")
>>>>>>>>>>         exclude("log4j","log4j"),
>>>>>>>>>>     "com.paulgoldbaum" %% "scala-influxdb-client"
% "0.5.2",
>>>>>>>>>>     "org.apache.commons" % "commons-lang3" % "3.6",
>>>>>>>>>>     "org.influxdb" % "influxdb-java" % "2.7"
>>>>>>>>>> )
>>>>>>>>>>
>>>>>>>>>> assemblyMergeStrategy in assembly := {
>>>>>>>>>>     case PathList(ps @ _*) if ps.last endsWith "project.clj"
=>
>>>>>>>>>>         MergeStrategy.rename
>>>>>>>>>>     case PathList(ps @ _*) if ps.last endsWith "UnknownPacketFactory.class"
=>
>>>>>>>>>>         MergeStrategy.rename
>>>>>>>>>>     case PathList("org","apache","http", _ @ _*)
=>
>>>>>>>>>>         MergeStrategy.first
>>>>>>>>>>     case PathList("org","apache","commons","codec",
_ @ _*) =>
>>>>>>>>>>         MergeStrategy.first
>>>>>>>>>>     case PathList("io","netty", _ @ _*) =>
>>>>>>>>>>         MergeStrategy.last
>>>>>>>>>>     case PathList(ps @ _*) if ps.last equalsIgnoreCase
 "io.netty.versions.properties" =>
>>>>>>>>>>         MergeStrategy.first
>>>>>>>>>>     case PathList(ps @ _*) if ps.last equalsIgnoreCase
 "libnetty-transport-native-epoll.so" =>
>>>>>>>>>>         MergeStrategy.first
>>>>>>>>>>     case x => val oldStrategy = (assemblyMergeStrategy
in assembly).value
>>>>>>>>>>         oldStrategy(x)
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message