storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "M. Aaron Bossert" <maboss...@gmail.com>
Subject Re: Storm Kafka version mismatch issues
Date Mon, 15 Jan 2018 20:50:37 GMT
Also, I am getting a bit confused as to what "setRecordTranslator" should
be.  You mentioned the generic type didn't make sense, but when I changed
it, I keep getting errors telling me that setRecordTranslator expects a
RecordTranslator[String,String], actual is
SimpleRecordTranslator[Array[Byte],Array[Byte]]

val trans: SimpleRecordTranslator[Array[Byte],Array[Byte]] = new
SimpleRecordTranslator[Array[Byte],Array[Byte]]((r:
ConsumerRecord[Array[Byte],Array[Byte]]) =>
        new Values(r.topic(), r.partition():java.lang.Integer,
r.offset():java.lang.Long, r.key(), r.value()),
        new Fields("topic", "partition", "offset", "key", "value"),
        "packets"
)

val spoutConf =
KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net:6667", "packets")
    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
    .setOffsetCommitPeriodMs(10000)
    .setMaxUncommittedOffsets(1000)
    .setRecordTranslator(trans)
    .setProp(ConsumerConfig.GROUP_ID_CONFIG,"pcapKafkaSpoutConsumer")
    .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
    .build()


On Mon, Jan 15, 2018 at 3:45 PM, M. Aaron Bossert <mabossert@gmail.com>
wrote:

> No, I have not messed with the logging setup...Also, my other topologies
> (not using Storm-Kafka-Client) run just fine...Have never had issues until
> now.
>
> On Mon, Jan 15, 2018 at 3:38 PM, Stig Rohde Døssing <srdo@apache.org>
> wrote:
>
>> Does running e.g. one of the storm-starter topologies work for you? You
>> might also try deploying the storm-kafka-client-examples topology I linked
>> earlier, to verify whether it's a problem with the spout setup, or an issue
>> with the Storm install.
>>
>> Have you modified the storm/log4j2/worker.xml? It might be set to filter
>> out some logs?
>>
>> (Really just guessing here)
>>
>> 2018-01-15 21:08 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>
>>> That is it...The only thing I see that is suspicious is that the port
>>> keeps changing, suggesting that the spout keeps dying.  But I don't see any
>>> red flags elsewhere to explain the recurring restarts.
>>>
>>> On Mon, Jan 15, 2018 at 2:57 PM, Stig Rohde Døssing <srdo@apache.org>
>>> wrote:
>>>
>>>> Hm, that doesn't really tell me anything. Is it the full log? I'd
>>>> expect to see some INFO-level logs related to starting up the spout.
>>>>
>>>> 2018-01-15 20:54 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>>>
>>>>> 2018-01-15 19:21:26.132 STDERR Thread-1 [INFO] Jan 15, 2018 7:21:25 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:21:26.132 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:21:26.138 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:21:26.143 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> 2018-01-15 19:23:29.223 STDERR Thread-1 [INFO] Jan 15, 2018 7:23:29 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:23:29.223 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:23:29.231 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:23:29.231 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> 2018-01-15 19:27:46.230 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:27:46.230 STDERR Thread-1 [INFO] Jan 15, 2018 7:27:46 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:27:46.241 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:27:46.268 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> 2018-01-15 19:29:49.325 STDERR Thread-1 [INFO] Jan 15, 2018 7:29:49 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:29:49.325 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:29:49.336 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:29:49.336 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> 2018-01-15 19:31:59.270 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:31:59.270 STDERR Thread-1 [INFO] Jan 15, 2018 7:31:59 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:31:59.281 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:31:59.304 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> 2018-01-15 19:34:02.315 STDERR Thread-1 [INFO] Jan 15, 2018 7:34:02 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:34:02.315 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:34:02.326 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:34:02.326 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> 2018-01-15 19:36:13.337 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:36:13.337 STDERR Thread-1 [INFO] Jan 15, 2018 7:36:13 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:36:13.347 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:36:13.349 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> 2018-01-15 19:38:16.341 STDERR Thread-1 [INFO] Jan 15, 2018 7:38:16 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:38:16.341 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:38:16.348 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:38:16.349 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> 2018-01-15 19:40:27.423 STDERR Thread-1 [INFO] Jan 15, 2018 7:40:27 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:40:27.423 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:40:27.430 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:40:27.430 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> 2018-01-15 19:42:30.411 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:42:30.411 STDERR Thread-1 [INFO] Jan 15, 2018 7:42:30 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:42:30.418 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:42:30.427 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> 2018-01-15 19:44:42.462 STDERR Thread-1 [INFO] Jan 15, 2018 7:44:42 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:44:42.462 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:44:42.469 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:44:42.469 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> 2018-01-15 19:46:45.469 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:46:45.469 STDERR Thread-1 [INFO] Jan 15, 2018 7:46:45 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:46:45.476 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:46:45.476 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> 2018-01-15 19:48:55.496 STDERR Thread-1 [INFO] Jan 15, 2018 7:48:55 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:48:55.496 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:48:55.502 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:48:55.520 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> 2018-01-15 19:50:58.565 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:50:58.565 STDERR Thread-1 [INFO] Jan 15, 2018 7:50:58 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:50:58.572 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:50:58.572 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> 2018-01-15 19:53:08.572 STDERR Thread-2 [INFO] JMXetricAgent
>>>>> instrumented JVM, see https://github.com/ganglia/jmxetric
>>>>>
>>>>> 2018-01-15 19:53:08.572 STDERR Thread-1 [INFO] Jan 15, 2018 7:53:08 PM
>>>>> info.ganglia.gmetric4j.GMonitor start
>>>>>
>>>>> 2018-01-15 19:53:08.579 STDERR Thread-1 [INFO] INFO: Setting up 1
>>>>> samplers
>>>>>
>>>>> 2018-01-15 19:53:08.580 STDERR Thread-1 [INFO] Error: JMX connector
>>>>> server communication error: service:jmx:rmi://r7u14.thanos
>>>>> .gotgdt.net:16706
>>>>>
>>>>> On Mon, Jan 15, 2018 at 2:46 PM, Stig Rohde Døssing <srdo@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Your generic types don't make sense. K and V in  KafkaSpout<K,
V>
>>>>>> should be your key and value types, which in your case are byte arrays.
>>>>>>
>>>>>> Could you post the worker log for the spout executor? It'll probably
>>>>>> be in storm/logs/worker-artifacts/$topo_name/$worker_port/worker.log.
>>>>>>
>>>>>> 2018-01-15 20:11 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>>>>>
>>>>>>> Sorry to keep bugging you about this.  I am banging my head against
>>>>>>> the wall.  I have put in all the fixes, I think...but am seeing
no errors
>>>>>>> of any kind, nor am I seeing any tuples coming from the Kafka
Spout...even
>>>>>>> though in the Storm UI, I do see the Kafka topic offsets:
>>>>>>> [image: Inline image 1]
>>>>>>>
>>>>>>> // Set up the Kafka Spout
>>>>>>> val spoutConf: KafkaSpoutConfig[String,String] =  KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net:6667",
"packets")
>>>>>>>     .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>>>>>>>     .setOffsetCommitPeriodMs(10000)
>>>>>>>     .setMaxUncommittedOffsets(1000)
>>>>>>>     .setRecordTranslator((r: ConsumerRecord[String,String]) =>
>>>>>>>         new Values(r.topic(), r.partition():java.lang.Integer,
r.offset():java.lang.Long, r.key(), r.value()),
>>>>>>>         new Fields("topic", "partition", "offset", "key", "value"),
>>>>>>>         "packets")
>>>>>>>     .setProp(ConsumerConfig.GROUP_ID_CONFIG,"pcapKafkaSpoutConsumer")
>>>>>>>     .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>>>>>>     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>>>>>>     .build()
>>>>>>>
>>>>>>> // Generates list of PCAP files to process, periodically scanning
a magic directory for new files
>>>>>>> topologyBuilder.setSpout("pcapKafkaSpout", new KafkaSpout(spoutConf),PcapFileSpoutWorkers)
>>>>>>>     .setNumTasks(PcapFileSpoutTasks)
>>>>>>>     .setMemoryLoad(16 * 1024)
>>>>>>>
>>>>>>> // Load balancer that groups packets by key
>>>>>>> topologyBuilder.setBolt("IpPairBolt",new IpPairBolt,IpPairBoltWorkers)
>>>>>>>     .setNumTasks(IpPairBoltTasks)
>>>>>>>     .fieldsGrouping("pcapKafkaSpout","packets",new Fields("key"))
>>>>>>>     /*.shuffleGrouping("PcapReaderBolt","packets")
>>>>>>>     .shuffleGrouping("NdStreamSpout","packets")*/
>>>>>>>     .setMemoryLoad(16 * 1024)
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jan 15, 2018 at 12:09 PM, Stig Rohde Døssing <
>>>>>>> srdo@apache.org> wrote:
>>>>>>>
>>>>>>>> I don't see a port number in your bootstrapServers, it should
>>>>>>>> probably be there. You also need to set a groupId. If you
want the spout to
>>>>>>>> use your key or value deserializers, you need to set them
in the
>>>>>>>> KafkaSpoutConfig as well, but you'll need to rewrite them
so they implement
>>>>>>>> https://kafka.apache.org/0100/javadoc/org/apache/kafka/commo
>>>>>>>> n/serialization/Deserializer.html.
>>>>>>>>
>>>>>>>> You can see how to set all these here
>>>>>>>> https://github.com/apache/storm/blob/master/examples/storm-k
>>>>>>>> afka-client-examples/src/main/java/org/apache/storm/kafka/sp
>>>>>>>> out/KafkaSpoutTopologyMainNamedTopics.java#L88 and the effect
of
>>>>>>>> setting them here https://kafka.apache.org/docum
>>>>>>>> entation/#consumerconfigs. The generic props from KafkaSpoutConfig
>>>>>>>> (plus bootstrapServers and key/value deserializer, since
these are
>>>>>>>> mandatory) are passed directly to a KafkaConsumer, so the
valid settings
>>>>>>>> are the ones listed in the Kafka documentation.
>>>>>>>>
>>>>>>>> 2018-01-15 16:42 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>:
>>>>>>>>
>>>>>>>>> Sorry for, perhaps a dumb question, but I have rewritten
my
>>>>>>>>> topology to accommodate the new star-kafka-client API
(I think!).  I see no
>>>>>>>>> errors of any kind, but there are no tuples emitted.
 This is the code in
>>>>>>>>> the topology as well as the scheme I was using in the
old topology (old
>>>>>>>>> storm-kafka library). My producer is creating a record
that is translated
>>>>>>>>> to a byte array, nothing fancy...the underlying record
is a custom object.
>>>>>>>>>
>>>>>>>>> // Set up the Kafka Spout
>>>>>>>>> val spoutConf: KafkaSpoutConfig[String,String] =  KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net",
"packets")
>>>>>>>>>     .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>>>>>>>>>     .setOffsetCommitPeriodMs(10000)
>>>>>>>>>     .setMaxUncommittedOffsets(1000)
>>>>>>>>>     .build()
>>>>>>>>>
>>>>>>>>> // Generates list of PCAP files to process, periodically
scanning a magic directory for new files
>>>>>>>>> topologyBuilder.setSpout("pcapKafkaSpout", new KafkaSpout(spoutConf),PcapFileSpoutWorkers)
>>>>>>>>>     .setNumTasks(PcapFileSpoutTasks)
>>>>>>>>>     .setMemoryLoad(16 * 1024)
>>>>>>>>>
>>>>>>>>> // Load balancer that groups packets by key
>>>>>>>>> topologyBuilder.setBolt("IpPairBolt",new IpPairBolt,IpPairBoltWorkers)
>>>>>>>>>     .setNumTasks(IpPairBoltTasks)
>>>>>>>>>     .shuffleGrouping("pcapKafkaSpout")
>>>>>>>>>     .setMemoryLoad(16 * 1024)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> class PcapRecordKafkaScheme extends Scheme {
>>>>>>>>>     override def deserialize(buffer: ByteBuffer): util.List[AnyRef]
= {
>>>>>>>>>         val byteArray: Array[Byte] = new Array[Byte](buffer.remaining())
>>>>>>>>>         buffer.get(byteArray,0,byteArray.length)
>>>>>>>>>         val record: IpPacketRecord = SerializationUtils.deserialize(byteArray).asInstanceOf[IpPacketRecord]
>>>>>>>>>         new Values(record.ip_port_key,byteArray)
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     override def getOutputFields: Fields = {
>>>>>>>>>         new Fields("key","value")
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> .
>>>>>>>>>
>>>>>>>>> On Sat, Jan 13, 2018 at 3:34 PM, M. Aaron Bossert <
>>>>>>>>> mabossert@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Ah!  Perfect.  I’ll poke through the docs and hope
for a smooth
>>>>>>>>>> transition.  Thanks!
>>>>>>>>>>
>>>>>>>>>> Get Outlook for iOS <https://aka.ms/o0ukef>
>>>>>>>>>> ------------------------------
>>>>>>>>>> *From:* Stig Rohde Døssing <srdo@apache.org>
>>>>>>>>>> *Sent:* Saturday, January 13, 2018 3:24:31 PM
>>>>>>>>>> *To:* user@storm.apache.org
>>>>>>>>>> *Subject:* Re: Storm Kafka version mismatch issues
>>>>>>>>>>
>>>>>>>>>> Yes, there is no code overlap between storm-kafka
and
>>>>>>>>>> storm-kafka-client. The setup code is completely
different. The code you
>>>>>>>>>> posted is still using the storm-kafka classes (e.g.
old SpoutConfig instead
>>>>>>>>>> of new KafkaSpoutConfig), so you're still on the
old spout. I'm a little
>>>>>>>>>> surprised that code even compiled for you before,
since the sbt you posted
>>>>>>>>>> doesn't list storm-kafka anymore.
>>>>>>>>>>
>>>>>>>>>> There's documentation for the new spout at
>>>>>>>>>> https://storm.apache.org/releases/1.1.1/storm-kafka-client.html,
>>>>>>>>>> and some example code at https://github.com/apache/stor
>>>>>>>>>> m/blob/master/examples/storm-kafka-client-examples/src/main/
>>>>>>>>>> java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainName
>>>>>>>>>> dTopics.java.
>>>>>>>>>>
>>>>>>>>>> 2018-01-13 21:15 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com>
>>>>>>>>>> :
>>>>>>>>>>
>>>>>>>>>>> Stig,
>>>>>>>>>>>
>>>>>>>>>>> This project is still in a POC status, so I can
wipe and restart
>>>>>>>>>>> anything...no big deal.  Thanks!  So, I am now
seeing some build errors
>>>>>>>>>>> when I try to run my assembly.  Is there a different
method for setting up
>>>>>>>>>>> the KafkaSpout?
>>>>>>>>>>>
>>>>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:15:8:
>>>>>>>>>>> object BrokerHosts is not a member of package
org.apache.storm.kafka
>>>>>>>>>>> [error] import org.apache.storm.kafka.{BrokerHosts,
KafkaSpout,
>>>>>>>>>>> SpoutConfig, ZkHosts}
>>>>>>>>>>> [error]        ^
>>>>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:161:22:
>>>>>>>>>>> not found: type BrokerHosts
>>>>>>>>>>> [error]         val zkHosts: BrokerHosts = new
>>>>>>>>>>> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>>>>>>>>>> [error]                      ^
>>>>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:161:40:
>>>>>>>>>>> not found: type ZkHosts
>>>>>>>>>>> [error]         val zkHosts: BrokerHosts = new
>>>>>>>>>>> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>>>>>>>>>> [error]                                     
  ^
>>>>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:166:28:
>>>>>>>>>>> not found: type SpoutConfig
>>>>>>>>>>> [error]         val pcapKafkaConf: SpoutConfig
= new
>>>>>>>>>>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>>>>>>>>>> [error]                            ^
>>>>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:166:46:
>>>>>>>>>>> not found: type SpoutConfig
>>>>>>>>>>> [error]         val pcapKafkaConf: SpoutConfig
= new
>>>>>>>>>>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>>>>>>>>>> [error]                                     
        ^
>>>>>>>>>>> [error] /Users/mbossert/IdeaProjects/a
>>>>>>>>>>> pachestormstreaming/src/main/scala/mil/darpa/netdefense/apac
>>>>>>>>>>> hestormstreaming/storm/topologies/microbatchSummariesTopology.scala:173:34:
>>>>>>>>>>> not found: type KafkaSpout
>>>>>>>>>>> [error]         val pcapKafkaSpout = new
>>>>>>>>>>> KafkaSpout(pcapKafkaConf)
>>>>>>>>>>> [error]                                  ^
>>>>>>>>>>> [error] 6 errors found
>>>>>>>>>>>
>>>>>>>>>>> and here is the relevant part of my topology:
>>>>>>>>>>>
>>>>>>>>>>> // Set up the Kafka Spout
>>>>>>>>>>> val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>>>>>>>>>> val pcapKafkaTopic: String = "packets"
>>>>>>>>>>> val pcapZkRoot: String = "/packets"
>>>>>>>>>>> val pcapClientId: String = "PcapKafkaSpout"
>>>>>>>>>>>
>>>>>>>>>>> val pcapKafkaConf: SpoutConfig = new SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>>>>>>>>>> pcapKafkaConf.ignoreZkOffsets = true
>>>>>>>>>>> pcapKafkaConf.useStartOffsetTimeIfOffsetOutOfRange
= true
>>>>>>>>>>> pcapKafkaConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime
>>>>>>>>>>> pcapKafkaConf.outputStreamId = "packets"
>>>>>>>>>>> pcapKafkaConf.scheme = new SchemeAsMultiScheme(new
PcapRecordKafkaScheme)
>>>>>>>>>>>
>>>>>>>>>>> val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
>>>>>>>>>>>
>>>>>>>>>>> // Generates list of PCAP files to process, periodically
scanning a magic directory for new files
>>>>>>>>>>> topologyBuilder.setSpout("pcapKafkaSpout", pcapKafkaSpout,PcapFileSpoutWorkers)
>>>>>>>>>>>     .setNumTasks(PcapFileSpoutTasks)
>>>>>>>>>>>     .setMemoryLoad(16 * 1024)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Jan 13, 2018 at 3:05 PM, Stig Rohde Døssing
<
>>>>>>>>>>> srdo@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Aaron,
>>>>>>>>>>>>
>>>>>>>>>>>> Please note that the position of your storm-kafka
consumers
>>>>>>>>>>>> won't be migrated, so the new storm-kafka-client
spout will start over on
>>>>>>>>>>>> the topics you subscribe to. Do you need
offsets migrated, or is it fine if
>>>>>>>>>>>> the consumers start from scratch?
>>>>>>>>>>>>
>>>>>>>>>>>> I don't know whether HDP has special requirements
but here's
>>>>>>>>>>>> the general how-to for setting up storm-kafka-client:
>>>>>>>>>>>>
>>>>>>>>>>>> The storm-kafka-client library should work
with any Kafka
>>>>>>>>>>>> version 0.10.0.0 and above. You should use
the latest version of
>>>>>>>>>>>> storm-kafka-client, which is currently 1.1.1.
If you want a decent number
>>>>>>>>>>>> of extra fixes, you might consider building
version 1.2.0 yourself from
>>>>>>>>>>>> https://github.com/apache/storm/tree/1.x-branch/external/sto
>>>>>>>>>>>> rm-kafka-client, we are hoping to release
this version before
>>>>>>>>>>>> too long. Once you've declared a dependency
on storm-kafka-client, you
>>>>>>>>>>>> should also add a dependency on org.apache.kafka:kafka-clients,
>>>>>>>>>>>> in the version that matches your Kafka broker
version.
>>>>>>>>>>>>
>>>>>>>>>>>> Looking at your sbt you should not need org.apache.kafka:kafka
>>>>>>>>>>>> on the classpath. I think your zookeeper,
log4j and slf4j-log4j12
>>>>>>>>>>>> exclusions on storm-kafka-client are unnecessary
as well. I don't believe
>>>>>>>>>>>> those dependencies are part of the storm-kafka-client
dependency tree.
>>>>>>>>>>>>
>>>>>>>>>>>> In case making those changes doesn't solve
it, could you post a
>>>>>>>>>>>> bit more of the stack trace you get?
>>>>>>>>>>>>
>>>>>>>>>>>> 2018-01-13 20:23 GMT+01:00 M. Aaron Bossert
<
>>>>>>>>>>>> mabossert@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> All,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have resurrected some old code that
includes a Kafka
>>>>>>>>>>>>> Producer class as well as a storm topology
that includes a Kafka Spout to
>>>>>>>>>>>>> ingest the messages coming from the aforementioned
producer.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I was using the storm-kafka library with
Scala 2.11, but when
>>>>>>>>>>>>> I changed to the newer code base, which
is using Scala 2.12, I found that
>>>>>>>>>>>>> the older library wouldn't work...thus
I wanted to switch to the new
>>>>>>>>>>>>> storm-kafka-client, but am not sure what
version I should be using.  here
>>>>>>>>>>>>> is my build.sbt file, as well as the
error messages I am seeing when the
>>>>>>>>>>>>> Spout actually runs (my favorite assistant,
Google, tells me it is due to a
>>>>>>>>>>>>> version mismatch between Storm and Kafka).
 I am using HDP 2.6.2, for
>>>>>>>>>>>>> whatever that is worth...
>>>>>>>>>>>>>
>>>>>>>>>>>>> java.lang.*NoSuchMethodError*: kafka.javaapi.consumer.SimpleC
>>>>>>>>>>>>> onsumer.<init>(Ljava/lang/String;IIILjava/lang/String;Ljava/
>>>>>>>>>>>>> lang/String;)
>>>>>>>>>>>>>
>>>>>>>>>>>>> name := "apachestormstreaming"
>>>>>>>>>>>>>
>>>>>>>>>>>>> version := "0.1"
>>>>>>>>>>>>>
>>>>>>>>>>>>> scalaVersion := "2.12.4"
>>>>>>>>>>>>>
>>>>>>>>>>>>> scalacOptions := Seq("-unchecked", "-deprecation")
>>>>>>>>>>>>>
>>>>>>>>>>>>> resolvers ++= Seq(
>>>>>>>>>>>>>     "clojars" at "http://clojars.org/repo/",
>>>>>>>>>>>>>     "HDP" at "http://repo.hortonworks.com/content/repositories/releases/",
>>>>>>>>>>>>>     "Hortonworks Jetty" at "http://repo.hortonworks.com/content/repositories/jetty-hadoop/"
>>>>>>>>>>>>> )
>>>>>>>>>>>>>
>>>>>>>>>>>>> libraryDependencies ++= Seq(
>>>>>>>>>>>>>     "org.apache.storm" % "flux-core"
% "1.1.0.2.6.2.0-205",
>>>>>>>>>>>>>     "org.apache.storm" % "storm-core"
% "1.1.0.2.6.2.0-205" % Provided
>>>>>>>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>>>>>>>         exclude("log4j","log4j"),
>>>>>>>>>>>>>     "org.pcap4j" % "pcap4j-core" % "2.0.0-alpha",
>>>>>>>>>>>>>     "org.pcap4j" % "pcap4j-packetfactory-static"
% "2.0.0-alpha",
>>>>>>>>>>>>>     "org.apache.hadoop" % "hadoop-common"
% "2.7.3.2.6.2.0-205"
>>>>>>>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>>>>>>>         exclude("log4j","log4j")
>>>>>>>>>>>>>         exclude("commons-beanutils",
"commons-beanutils-core")
>>>>>>>>>>>>>         exclude("commons-beanutils",
"commons-beanutils")
>>>>>>>>>>>>>         exclude("commons-collections",
"commons-collections"),
>>>>>>>>>>>>>     "org.apache.storm" % "storm-kafka-client"
% "1.1.0.2.6.2.0-205"
>>>>>>>>>>>>>         exclude("org.apache.kafka","kafka-clients")
>>>>>>>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>>>>>>>         exclude("log4j","log4j")
>>>>>>>>>>>>>         exclude("org.apache.zookeeper","zookeeper"),
>>>>>>>>>>>>>     "org.apache.kafka" % "kafka-clients"
% "0.10.1.1",
>>>>>>>>>>>>>     "org.apache.kafka" %% "kafka" % "0.10.1.1"
>>>>>>>>>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>>>>>>>>>         exclude("log4j","log4j")
>>>>>>>>>>>>>         exclude("org.apache.zookeeper","zookeeper"),
>>>>>>>>>>>>>     "org.apache.hbase" % "hbase-common"
% "1.1.2.2.6.2.0-205"
>>>>>>>>>>>>>         exclude("org.slf4j","slf4j-log4j12")
>>>>>>>>>>>>>         exclude("log4j","log4j"),
>>>>>>>>>>>>>     "org.apache.hbase" % "hbase-client"
% "1.1.2.2.6.2.0-205"
>>>>>>>>>>>>>         exclude("org.slf4j","slf4j-log4j12")
>>>>>>>>>>>>>         exclude("log4j","log4j"),
>>>>>>>>>>>>>     "com.paulgoldbaum" %% "scala-influxdb-client"
% "0.5.2",
>>>>>>>>>>>>>     "org.apache.commons" % "commons-lang3"
% "3.6",
>>>>>>>>>>>>>     "org.influxdb" % "influxdb-java"
% "2.7"
>>>>>>>>>>>>> )
>>>>>>>>>>>>>
>>>>>>>>>>>>> assemblyMergeStrategy in assembly :=
{
>>>>>>>>>>>>>     case PathList(ps @ _*) if ps.last
endsWith "project.clj" =>
>>>>>>>>>>>>>         MergeStrategy.rename
>>>>>>>>>>>>>     case PathList(ps @ _*) if ps.last
endsWith "UnknownPacketFactory.class" =>
>>>>>>>>>>>>>         MergeStrategy.rename
>>>>>>>>>>>>>     case PathList("org","apache","http",
_ @ _*) =>
>>>>>>>>>>>>>         MergeStrategy.first
>>>>>>>>>>>>>     case PathList("org","apache","commons","codec",
_ @ _*) =>
>>>>>>>>>>>>>         MergeStrategy.first
>>>>>>>>>>>>>     case PathList("io","netty", _ @ _*)
=>
>>>>>>>>>>>>>         MergeStrategy.last
>>>>>>>>>>>>>     case PathList(ps @ _*) if ps.last
equalsIgnoreCase  "io.netty.versions.properties" =>
>>>>>>>>>>>>>         MergeStrategy.first
>>>>>>>>>>>>>     case PathList(ps @ _*) if ps.last
equalsIgnoreCase  "libnetty-transport-native-epoll.so" =>
>>>>>>>>>>>>>         MergeStrategy.first
>>>>>>>>>>>>>     case x => val oldStrategy = (assemblyMergeStrategy
in assembly).value
>>>>>>>>>>>>>         oldStrategy(x)
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message