storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "M. Aaron Bossert" <maboss...@gmail.com>
Subject Re: Storm Kafka version mismatch issues
Date Mon, 15 Jan 2018 21:18:23 GMT
Aha!  working that change right now...but I backtracked and went to the
Nimbus log:  The error that is coming out there is a bit more plausible and
potentially useful

2018-01-15 21:17:19.609 o.a.s.d.nimbus timer [INFO]
ExceptionKeyNotFoundException(msg:microbatchTopology-15-1515707846-stormcode.ser)

2018-01-15 21:17:19.612 o.a.s.d.nimbus timer [INFO] Cleaning up
microbatchTopology-2-1515694705

2018-01-15 21:17:19.648 o.a.s.d.nimbus timer [INFO]
ExceptionKeyNotFoundException(msg:microbatchTopology-2-1515694705-stormcode.ser)

2018-01-15 21:17:19.650 o.a.s.d.nimbus timer [INFO] Cleaning up
microbatchTopology-4-1515697192

2018-01-15 21:17:19.705 o.a.s.d.nimbus timer [INFO]
ExceptionKeyNotFoundException(msg:microbatchTopology-4-1515697192-stormcode.ser)

2018-01-15 21:17:19.707 o.a.s.d.nimbus timer [INFO] Cleaning up
microbatchTopology-22-1515814486

2018-01-15 21:17:19.743 o.a.s.d.nimbus timer [INFO]
ExceptionKeyNotFoundException(msg:microbatchTopology-22-1515814486-stormcode.ser)

2018-01-15 21:17:19.745 o.a.s.d.nimbus timer [INFO] Cleaning up
microbatchTopology-6-1515698178

2018-01-15 21:17:19.792 o.a.s.d.nimbus timer [INFO]
ExceptionKeyNotFoundException(msg:microbatchTopology-6-1515698178-stormcode.ser)

2018-01-15 21:17:19.794 o.a.s.d.nimbus timer [INFO] Cleaning up
microbatchTopology-1-1515693972

2018-01-15 21:17:19.830 o.a.s.d.nimbus timer [INFO]
ExceptionKeyNotFoundException(msg:microbatchTopology-1-1515693972-stormcode.ser)

On Mon, Jan 15, 2018 at 4:08 PM, Stig Rohde Døssing <srdo@apache.org> wrote:

> The idea is that the deserializer converts from Kafka's internal
> representation of a message (likely a byte array), while the
> RecordTranslator uses the deserialized value to construct a Values the
> spout can emit, and is also responsible for declaring the spout's output
> fields to Storm.
>
> e.g. if you were reading Strings from Kafka, you might have a
> StringDeserializer that goes byte[] -> String, and a RecordTranslator that
> declares that the spout will emit the fields "message" and "emitTimestamp",
> and constructs a tuple using the string + System.getCurrentTimeMillis when
> it is emitted.
>
> Regarding the type error, it is because you're using the
> KafkaSpoutConfig.builder convenience method. It produces a
> KafkaSpoutConfig<String, String>, which then expects a RecordTranslator of
> the same type. When you need to use non-String keys and values, you need to
> use the Builder constructors in KafkaSpoutConfig (e.g. new
> KafkaSpoutConfig.Builder(...)).
>
> 2018-01-15 21:50 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>
>> Also, I am getting a bit confused as to what "setRecordTranslator" should
>> be.  You mentioned the generic type didn't make sense, but when I changed
>> it, I keep getting errors telling me that setRecordTranslator expects a
>> RecordTranslator[String,String], actual is SimpleRecordTranslator[Array[B
>> yte],Array[Byte]]
>>
>> val trans: SimpleRecordTranslator[Array[Byte],Array[Byte]] = new SimpleRecordTranslator[Array[Byte],Array[Byte]]((r: ConsumerRecord[Array[Byte],Array[Byte]]) =>
>>         new Values(r.topic(), r.partition():java.lang.Integer, r.offset():java.lang.Long, r.key(), r.value()),
>>         new Fields("topic", "partition", "offset", "key", "value"),
>>         "packets"
>> )
>>
>> val spoutConf =  KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net:6667", "packets")
>>     .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>>     .setOffsetCommitPeriodMs(10000)
>>     .setMaxUncommittedOffsets(1000)
>>     .setRecordTranslator(trans)
>>     .setProp(ConsumerConfig.GROUP_ID_CONFIG,"pcapKafkaSpoutConsumer")
>>     .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>     .build()
>>
>>
>> On Mon, Jan 15, 2018 at 3:45 PM, M. Aaron Bossert <mabossert@gmail.com>
>> wrote:
>>
>>> No, I have not messed with the logging setup...Also, my other topologies
>>> (not using Storm-Kafka-Client) run just fine...Have never had issues until
>>> now.
>>>
>>> On Mon, Jan 15, 2018 at 3:38 PM, Stig Rohde Døssing <srdo@apache.org>
>>> wrote:
>>>
>>>> Does running e.g. one of the storm-starter topologies work for you? You
>>>> might also try deploying the storm-kafka-client-examples topology I linked
>>>> earlier, to verify whether it's a problem with the spout setup, or an issue
>>>> with the Storm install.
>>>>
>>>> Have you modified the storm/log4j2/worker.xml? It might be set to
>>>> filter out some logs?
>>>>
>>>> (Really just guessing here)
>>>>
>>>> 2018-01-15 21:08 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>>>
>>>>> That is it...The only thing I see that is suspicious is that the port
>>>>> keeps changing, suggesting that the spout keeps dying.  But I don't see any
>>>>> red flags elsewhere to explain the recurring restarts.
>>>>>
>>>>> On Mon, Jan 15, 2018 at 2:57 PM, Stig Rohde Døssing <srdo@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hm, that doesn't really tell me anything. Is it the full log? I'd
>>>>>> expect to see some INFO-level logs related to starting up the spout.
>>>>>>
>>>>>> 2018-01-15 20:54 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>>>>>
>>>>>>> 2018-01-15 19:21:26.132 STDERR Thread-1 [INFO] Jan 15, 2018 7:21:25
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:21:26.132 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:21:26.138 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:21:26.143 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> 2018-01-15 19:23:29.223 STDERR Thread-1 [INFO] Jan 15, 2018 7:23:29
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:23:29.223 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:23:29.231 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:23:29.231 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> 2018-01-15 19:27:46.230 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:27:46.230 STDERR Thread-1 [INFO] Jan 15, 2018 7:27:46
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:27:46.241 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:27:46.268 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> 2018-01-15 19:29:49.325 STDERR Thread-1 [INFO] Jan 15, 2018 7:29:49
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:29:49.325 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:29:49.336 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:29:49.336 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> 2018-01-15 19:31:59.270 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:31:59.270 STDERR Thread-1 [INFO] Jan 15, 2018 7:31:59
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:31:59.281 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:31:59.304 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> 2018-01-15 19:34:02.315 STDERR Thread-1 [INFO] Jan 15, 2018 7:34:02
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:34:02.315 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:34:02.326 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:34:02.326 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> 2018-01-15 19:36:13.337 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:36:13.337 STDERR Thread-1 [INFO] Jan 15, 2018 7:36:13
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:36:13.347 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:36:13.349 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> 2018-01-15 19:38:16.341 STDERR Thread-1 [INFO] Jan 15, 2018 7:38:16
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:38:16.341 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:38:16.348 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:38:16.349 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> 2018-01-15 19:40:27.423 STDERR Thread-1 [INFO] Jan 15, 2018 7:40:27
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:40:27.423 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:40:27.430 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:40:27.430 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> 2018-01-15 19:42:30.411 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:42:30.411 STDERR Thread-1 [INFO] Jan 15, 2018 7:42:30
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:42:30.418 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:42:30.427 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> 2018-01-15 19:44:42.462 STDERR Thread-1 [INFO] Jan 15, 2018 7:44:42
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:44:42.462 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:44:42.469 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:44:42.469 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> 2018-01-15 19:46:45.469 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:46:45.469 STDERR Thread-1 [INFO] Jan 15, 2018 7:46:45
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:46:45.476 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:46:45.476 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> 2018-01-15 19:48:55.496 STDERR Thread-1 [INFO] Jan 15, 2018 7:48:55
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:48:55.496 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:48:55.502 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:48:55.520 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> 2018-01-15 19:50:58.565 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:50:58.565 STDERR Thread-1 [INFO] Jan 15, 2018 7:50:58
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:50:58.572 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:50:58.572 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> 2018-01-15 19:53:08.572 STDERR Thread-2 [INFO] JMXetricAgent
>>>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>>>
>>>>>>> 2018-01-15 19:53:08.572 STDERR Thread-1 [INFO] Jan 15, 2018 7:53:08
>>>>>>> PM info.ganglia.gmetric4j.GMonitor start
>>>>>>>
>>>>>>> 2018-01-15 19:53:08.579 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>>>> samplers
>>>>>>>
>>>>>>> 2018-01-15 19:53:08.580 STDERR Thread-1 [INFO] Error: JMX connector
>>>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>>>> .gotgdt.net:16706
>>>>>>>
>>>>>>> On Mon, Jan 15, 2018 at 2:46 PM, Stig Rohde Døssing <srdo@apache.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Your generic types don't make sense. K and V in  KafkaSpout<K, V>
>>>>>>>> should be your key and value types, which in your case are byte arrays.
>>>>>>>>
>>>>>>>> Could you post the worker log for the spout executor? It'll
>>>>>>>> probably be in storm/logs/worker-artifacts/$t
>>>>>>>> opo_name/$worker_port/worker.log.
>>>>>>>>
>>>>>>>> 2018-01-15 20:11 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>>>>>>>
>>>>>>>>> Sorry to keep bugging you about this.  I am banging my head
>>>>>>>>> against the wall.  I have put in all the fixes, I think...but am seeing no
>>>>>>>>> errors of any kind, nor am I seeing any tuples coming from the Kafka
>>>>>>>>> Spout...even though in the Storm UI, I do see the Kafka topic offsets:
>>>>>>>>> [image: Inline image 1]
>>>>>>>>>
>>>>>>>>> // Set up the Kafka Spout
>>>>>>>>> val spoutConf: KafkaSpoutConfig[String,String] =  KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net:6667", "packets")
>>>>>>>>>     .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>>>>>>>>>     .setOffsetCommitPeriodMs(10000)
>>>>>>>>>     .setMaxUncommittedOffsets(1000)
>>>>>>>>>     .setRecordTranslator((r: ConsumerRecord[String,String]) =>
>>>>>>>>>         new Values(r.topic(), r.partition():java.lang.Integer, r.offset():java.lang.Long, r.key(), r.value()),
>>>>>>>>>         new Fields("topic", "partition", "offset", "key", "value"),
>>>>>>>>>         "packets")
>>>>>>>>>     .setProp(ConsumerConfig.GROUP_ID_CONFIG,"pcapKafkaSpoutConsumer")
>>>>>>>>>     .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>>>>>>>>     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>>>>>>>>     .build()
>>>>>>>>>
>>>>>>>>> // Generates list of PCAP files to process, periodically scanning a magic directory for new files
>>>>>>>>> topologyBuilder.setSpout("pcapKafkaSpout", new KafkaSpout(spoutConf),PcapFileSpoutWorkers)
>>>>>>>>>     .setNumTasks(PcapFileSpoutTasks)
>>>>>>>>>     .setMemoryLoad(16 * 1024)
>>>>>>>>>
>>>>>>>>> // Load balancer that groups packets by key
>>>>>>>>> topologyBuilder.setBolt("IpPairBolt",new IpPairBolt,IpPairBoltWorkers)
>>>>>>>>>     .setNumTasks(IpPairBoltTasks)
>>>>>>>>>     .fieldsGrouping("pcapKafkaSpout","packets",new Fields("key"))
>>>>>>>>>     /*.shuffleGrouping("PcapReaderBolt","packets")
>>>>>>>>>     .shuffleGrouping("NdStreamSpout","packets")*/
>>>>>>>>>     .setMemoryLoad(16 * 1024)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Jan 15, 2018 at 12:09 PM, Stig Rohde Døssing <
>>>>>>>>> srdo@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> I don't see a port number in your bootstrapServers, it should
>>>>>>>>>> probably be there. You also need to set a groupId. If you want the spout to
>>>>>>>>>> use your key or value deserializers, you need to set them in the
>>>>>>>>>> KafkaSpoutConfig as well, but you'll need to rewrite them so they implement
>>>>>>>>>> https://kafka.apache.org/0100/javadoc/org/apache/kafka/commo
>>>>>>>>>> n/serialization/Deserializer.html.
>>>>>>>>>>
>>>>>>>>>> You can see how to set all these here
>>>>>>>>>> https://github.com/apache/storm/blob/master/examples/storm-k
>>>>>>>>>> afka-client-examples/src/main/java/org/apache/storm/kafka/sp
>>>>>>>>>> out/KafkaSpoutTopologyMainNamedTopics.java#L88 and the effect of
>>>>>>>>>> setting them here https://kafka.apache.org/docum
>>>>>>>>>> entation/#consumerconfigs. The generic props from
>>>>>>>>>> KafkaSpoutConfig (plus bootstrapServers and key/value deserializer, since
>>>>>>>>>> these are mandatory) are passed directly to a KafkaConsumer, so the valid
>>>>>>>>>> settings are the ones listed in the Kafka documentation.
>>>>>>>>>>
>>>>>>>>>> 2018-01-15 16:42 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>
>>>>>>>>>> :
>>>>>>>>>>
>>>>>>>>>>> Sorry for, perhaps a dumb question, but I have rewritten my
>>>>>>>>>>> topology to accommodate the new star-kafka-client API (I think!).  I see no
>>>>>>>>>>> errors of any kind, but there are no tuples emitted.  This is the code in
>>>>>>>>>>> the topology as well as the scheme I was using in the old topology (old
>>>>>>>>>>> storm-kafka library). My producer is creating a record that is translated
>>>>>>>>>>> to a byte array, nothing fancy...the underlying record is a custom object.
>>>>>>>>>>>
>>>>>>>>>>> // Set up the Kafka Spout
>>>>>>>>>>> val spoutConf: KafkaSpoutConfig[String,String] =  KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net", "packets")
>>>>>>>>>>>     .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>>>>>>>>>>>     .setOffsetCommitPeriodMs(10000)
>>>>>>>>>>>     .setMaxUncommittedOffsets(1000)
>>>>>>>>>>>     .build()
>>>>>>>>>>>
>>>>>>>>>>> // Generates list of PCAP files to process, periodically scanning a magic directory for new files
>>>>>>>>>>> topologyBuilder.setSpout("pcapKafkaSpout", new KafkaSpout(spoutConf),PcapFileSpoutWorkers)
>>>>>>>>>>>     .setNumTasks(PcapFileSpoutTasks)
>>>>>>>>>>>     .setMemoryLoad(16 * 1024)
>>>>>>>>>>>
>>>>>>>>>>> // Load balancer that groups packets by key
>>>>>>>>>>> topologyBuilder.setBolt("IpPairBolt",new IpPairBolt,IpPairBoltWorkers)
>>>>>>>>>>>     .setNumTasks(IpPairBoltTasks)
>>>>>>>>>>>     .shuffleGrouping("pcapKafkaSpout")
>>>>>>>>>>>     .setMemoryLoad(16 * 1024)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> class PcapRecordKafkaScheme extends Scheme {
>>>>>>>>>>>     override def deserialize(buffer: ByteBuffer): util.List[AnyRef] = {
>>>>>>>>>>>         val byteArray: Array[Byte] = new Array[Byte](buffer.remaining())
>>>>>>>>>>>         buffer.get(byteArray,0,byteArray.length)
>>>>>>>>>>>         val record: IpPacketRecord = SerializationUtils.deserialize(byteArray).asInstanceOf[IpPacketRecord]
>>>>>>>>>>>         new Values(record.ip_port_key,byteArray)
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>     override def getOutputFields: Fields = {
>>>>>>>>>>>         new Fields("key","value")
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> .
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Jan 13, 2018 at 3:34 PM, M. Aaron Bossert <
>>>>>>>>>>> mabossert@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ah!  Perfect.  I’ll poke through the docs and hope for a smooth
>>>>>>>>>>>> transition.  Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>> Get Outlook for iOS <https://aka.ms/o0ukef>
>>>>>>>>>>>> ------------------------------
>>>>>>>>>>>> *From:* Stig Rohde Døssing <srdo@apache.org>
>>>>>>>>>>>> *Sent:* Saturday, January 13, 2018 3:24:31 PM
>>>>>>>>>>>> *To:* user@storm.apache.org
>>>>>>>>>>>> *Subject:* Re: Storm Kafka version mismatch issues
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, there is no code overlap between storm-kafka and
>>>>>>>>>>>> storm-kafka-client. The setup code is completely different. The code you
>>>>>>>>>>>> posted is still using the storm-kafka classes (e.g. old SpoutConfig instead
>>>>>>>>>>>> of new KafkaSpoutConfig), so you're still on the old spout. I'm a little
>>>>>>>>>>>> surprised that code even compiled for you before, since the sbt you posted
>>>>>>>>>>>> doesn't list storm-kafka anymore.
>>>>>>>>>>>>
>>>>>>>>>>>> There's documentation for the new spout at
>>>>>>>>>>>> https://storm.apache.org/releases/1.1.1/storm-kafka-client.html,
>>>>>>>>>>>> and some example code at https://github.com/apache/stor
>>>>>>>>>>>> m/blob/master/examples/storm-kafka-client-examples/src/main/
>>>>>>>>>>>> java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainName
>>>>>>>>>>>> dTopics.java.
>>>>>>>>>>>>
>>>>>>>>>>>> 2018-01-13 21:15 GMT+01:00 M. Aaron Bossert <
>>>>>>>>>>>> mabossert@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Stig,
>>>>>>>>>>>>>
>>>>>>>>>>>>> This project is still in a POC status, so I can wipe and
>>>>>>>>>>>>> restart anything...no big deal.  Thanks!  So, I am now seeing some build
>>>>>>>>>>>>> errors when I try to run my assembly.  Is there a different method for
>>>>>>>>>>>>> setting up the KafkaSpout?
>>>>>>>>>>>>>
>>>>>>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:15:8:
>>>>>>>>>>>>> object BrokerHosts is not a member of package org.apache.storm.kafka
>>>>>>>>>>>>> [error] import org.apache.storm.kafka.{BrokerHosts,
>>>>>>>>>>>>> KafkaSpout, SpoutConfig, ZkHosts}
>>>>>>>>>>>>> [error]        ^
>>>>>>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:161:22:
>>>>>>>>>>>>> not found: type BrokerHosts
>>>>>>>>>>>>> [error]         val zkHosts: BrokerHosts = new
>>>>>>>>>>>>> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>>>>>>>>>>>> [error]                      ^
>>>>>>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:161:40:
>>>>>>>>>>>>> not found: type ZkHosts
>>>>>>>>>>>>> [error]         val zkHosts: BrokerHosts = new
>>>>>>>>>>>>> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>>>>>>>>>>>> [error]                                        ^
>>>>>>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:166:28:
>>>>>>>>>>>>> not found: type SpoutConfig
>>>>>>>>>>>>> [error]         val pcapKafkaConf: SpoutConfig = new
>>>>>>>>>>>>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>>>>>>>>>>>> [error]                            ^
>>>>>>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:166:46:
>>>>>>>>>>>>> not found: type SpoutConfig
>>>>>>>>>>>>> [error]         val pcapKafkaConf: SpoutConfig = new
>>>>>>>>>>>>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>>>>>>>>>>>> [error]                                              ^
>>>>>>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:173:34:
>>>>>>>>>>>>> not found: type KafkaSpout
>>>>>>>>>>>>> [error]         val pcapKafkaSpout = new
>>>>>>>>>>>>> KafkaSpout(pcapKafkaConf)
>>>>>>>>>>>>> [error]                                  ^
>>>>>>>>>>>>> [error] 6 errors found
>>>>>>>>>>>>>
>>>>>>>>>>>>> and here is the relevant part of my topology:
>>>>>>>>>>>>>
>>>>>>>>>>>>> // Set up the Kafka Spout
>>>>>>>>>>>>> val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>>>>>>>>>>>> val pcapKafkaTopic: String = "packets"
>>>>>>>>>>>>> val pcapZkRoot: String = "/packets"
>>>>>>>>>>>>> val pcapClientId: String = "PcapKafkaSpout"
>>>>>>>>>>>>>
>>>>>>>>>>>>> val pcapKafkaConf: SpoutConfig = new SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>>>>>>>>>>>> pcapKafkaConf.ignoreZkOffsets = true
>>>>>>>>>>>>> pcapKafkaConf.useStartOffsetTimeIfOffsetOutOfRange = true
>>>>>>>>>>>>> pcapKafkaConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime
>>>>>>>>>>>>> pcapKafkaConf.outputStreamId = "packets"
>>>>>>>>>>>>> pcapKafkaConf.scheme = new SchemeAsMultiScheme(new PcapRecordKafkaScheme)
>>>>>>>>>>>>>
>>>>>>>>>>>>> val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
>>>>>>>>>>>>>
>>>>>>>>>>>>> // Generates list of PCAP files to process, periodically scanning a magic directory for new files
>>>>>>>>>>>>> topologyBuilder.setSpout("pcapKafkaSpout", pcapKafkaSpout,PcapFileSpoutWorkers)
>>>>>>>>>>>>>     .setNumTasks(PcapFileSpoutTasks)
>>>>>>>>>>>>>     .setMemoryLoad(16 * 1024)
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Jan 13, 2018 at 3:05 PM, Stig Rohde Døssing <
>>>>>>>>>>>>> srdo@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Aaron,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Please note that the position of your storm-kafka consumers
>>>>>>>>>>>>>> won't be migrated, so the new storm-kafka-client spout will start over on
>>>>>>>>>>>>>> the topics you subscribe to. Do you need offsets migrated, or is it fine if
>>>>>>>>>>>>>> the consumers start from scratch?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I don't know whether HDP has special requirements but here's
>>>>>>>>>>>>>> the general how-to for setting up storm-kafka-client:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The storm-kafka-client library should work with any Kafka
>>>>>>>>>>>>>> version 0.10.0.0 and above. You should use the latest version of
>>>>>>>>>>>>>> storm-kafka-client, which is currently 1.1.1. If you want a decent number
>>>>>>>>>>>>>> of extra fixes, you might consider building version 1.2.0 yourself from
>>>>>>>>>>>>>> https://github.com/apache/storm/tree/1.x-branch/external/sto
>>>>>>>>>>>>>> rm-kafka-client, we are hoping to release this version
>>>>>>>>>>>>>> before too long. Once you've declared a dependency on storm-kafka-client,
>>>>>>>>>>>>>> you should also add a dependency on org.apache.kafka:kafka-clients,
>>>>>>>>>>>>>> in the version that matches your Kafka broker version.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Looking at your sbt you should not need
>>>>>>>>>>>>>> org.apache.kafka:kafka on the classpath. I think your zookeeper, log4j and
>>>>>>>>>>>>>> slf4j-log4j12 exclusions on storm-kafka-client are unnecessary as well. I
>>>>>>>>>>>>>> don't believe those dependencies are part of the storm-kafka-client
>>>>>>>>>>>>>> dependency tree.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In case making those changes doesn't solve it, could you post
>>>>>>>>>>>>>> a bit more of the stack trace you get?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2018-01-13 20:23 GMT+01:00 M. Aaron Bossert <
>>>>>>>>>>>>>> mabossert@gmail.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> All,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have resurrected some old code that includes a Kafka
>>>>>>>>>>>>>>> Producer class as well as a storm topology that includes a Kafka Spout to
>>>>>>>>>>>>>>> ingest the messages coming from the aforementioned producer.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I was using the storm-kafka library with Scala 2.11, but
>>>>>>>>>>>>>>> when I changed to the newer code base, which is using Scala 2.12, I found
>>>>>>>>>>>>>>> that the older library wouldn't work...thus I wanted to switch to the new
>>>>>>>>>>>>>>> storm-kafka-client, but am not sure what version I should be using.  here
>>>>>>>>>>>>>>> is my build.sbt file, as well as the error messages I am seeing when the
>>>>>>>>>>>>>>> Spout actually runs (my favorite assistant, Google, tells me it is due to a
>>>>>>>>>>>>>>> version mismatch between Storm and Kafka).  I am using HDP 2.6.2, for
>>>>>>>>>>>>>>> whatever that is worth...
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> java.lang.*NoSuchMethodError*:
>>>>>>>>>>>>>>> kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/Stri
>>>>>>>>>>>>>>> ng;IIILjava/lang/String;Ljava/lang/String;)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> name := "apachestormstreaming"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> version := "0.1"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> scalaVersion := "2.12.4"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> scalacOptions := Seq("-unchecked", "-deprecation")
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> resolvers ++= Seq(
>>>>>>>>>>>>>>>     "clojars" at "http://clojars.org/repo/",
>>>>>>>>>>>>>>>     "HDP" at "http://repo.hortonworks.com/content/repositories/releases/",
>>>>>>>>>>>>>>>     "Hortonworks Jetty" at "http://repo.hortonworks.com/content/repositories/jetty-hadoop/"
>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> libraryDependencies ++= Seq(
>>>>>>>>>>>>>>>     "org.apache.storm" % "flux-core" % "1.1.0.2.6.2.0-205",
>>>>>>>>>>>>>>>     "org.apache.storm" % "storm-core" % "1.1.0.2.6.2.0-205" % Provided
>>>>>>>>>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>>>>>>>>>         exclude("log4j","log4j"),
>>>>>>>>>>>>>>>     "org.pcap4j" % "pcap4j-core" % "2.0.0-alpha",
>>>>>>>>>>>>>>>     "org.pcap4j" % "pcap4j-packetfactory-static" % "2.0.0-alpha",
>>>>>>>>>>>>>>>     "org.apache.hadoop" % "hadoop-common" % "2.7.3.2.6.2.0-205"
>>>>>>>>>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>>>>>>>>>         exclude("log4j","log4j")
>>>>>>>>>>>>>>>         exclude("commons-beanutils", "commons-beanutils-core")
>>>>>>>>>>>>>>>         exclude("commons-beanutils", "commons-beanutils")
>>>>>>>>>>>>>>>         exclude("commons-collections", "commons-collections"),
>>>>>>>>>>>>>>>     "org.apache.storm" % "storm-kafka-client" % "1.1.0.2.6.2.0-205"
>>>>>>>>>>>>>>>         exclude("org.apache.kafka","kafka-clients")
>>>>>>>>>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>>>>>>>>>         exclude("log4j","log4j")
>>>>>>>>>>>>>>>         exclude("org.apache.zookeeper","zookeeper"),
>>>>>>>>>>>>>>>     "org.apache.kafka" % "kafka-clients" % "0.10.1.1",
>>>>>>>>>>>>>>>     "org.apache.kafka" %% "kafka" % "0.10.1.1"
>>>>>>>>>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>>>>>>>>>         exclude("log4j","log4j")
>>>>>>>>>>>>>>>         exclude("org.apache.zookeeper","zookeeper"),
>>>>>>>>>>>>>>>     "org.apache.hbase" % "hbase-common" % "1.1.2.2.6.2.0-205"
>>>>>>>>>>>>>>>         exclude("org.slf4j","slf4j-log4j12")
>>>>>>>>>>>>>>>         exclude("log4j","log4j"),
>>>>>>>>>>>>>>>     "org.apache.hbase" % "hbase-client" % "1.1.2.2.6.2.0-205"
>>>>>>>>>>>>>>>         exclude("org.slf4j","slf4j-log4j12")
>>>>>>>>>>>>>>>         exclude("log4j","log4j"),
>>>>>>>>>>>>>>>     "com.paulgoldbaum" %% "scala-influxdb-client" % "0.5.2",
>>>>>>>>>>>>>>>     "org.apache.commons" % "commons-lang3" % "3.6",
>>>>>>>>>>>>>>>     "org.influxdb" % "influxdb-java" % "2.7"
>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> assemblyMergeStrategy in assembly := {
>>>>>>>>>>>>>>>     case PathList(ps @ _*) if ps.last endsWith "project.clj" =>
>>>>>>>>>>>>>>>         MergeStrategy.rename
>>>>>>>>>>>>>>>     case PathList(ps @ _*) if ps.last endsWith "UnknownPacketFactory.class" =>
>>>>>>>>>>>>>>>         MergeStrategy.rename
>>>>>>>>>>>>>>>     case PathList("org","apache","http", _ @ _*) =>
>>>>>>>>>>>>>>>         MergeStrategy.first
>>>>>>>>>>>>>>>     case PathList("org","apache","commons","codec", _ @ _*) =>
>>>>>>>>>>>>>>>         MergeStrategy.first
>>>>>>>>>>>>>>>     case PathList("io","netty", _ @ _*) =>
>>>>>>>>>>>>>>>         MergeStrategy.last
>>>>>>>>>>>>>>>     case PathList(ps @ _*) if ps.last equalsIgnoreCase  "io.netty.versions.properties" =>
>>>>>>>>>>>>>>>         MergeStrategy.first
>>>>>>>>>>>>>>>     case PathList(ps @ _*) if ps.last equalsIgnoreCase  "libnetty-transport-native-epoll.so" =>
>>>>>>>>>>>>>>>         MergeStrategy.first
>>>>>>>>>>>>>>>     case x => val oldStrategy = (assemblyMergeStrategy in assembly).value
>>>>>>>>>>>>>>>         oldStrategy(x)
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message