storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "M. Aaron Bossert" <maboss...@gmail.com>
Subject Re: Storm Kafka version mismatch issues
Date Mon, 15 Jan 2018 22:51:31 GMT
Actually, I seem to have an entirely other problem...I restarted storm and now can’t get
it to start up again.  The worker nodes start without error and then immediately shutdown...will
have to track that down before continuing with the Kafka issue.  Perhaps that was the issue
all along.

Get Outlook for iOS<https://aka.ms/o0ukef>
________________________________
From: M. Aaron Bossert <mabossert@gmail.com>
Sent: Monday, January 15, 2018 4:18:23 PM
To: user@storm.apache.org
Subject: Re: Storm Kafka version mismatch issues

Aha!  working that change right now...but I backtracked and went to the Nimbus log:  The error
that is coming out there is a bit more plausible and potentially useful


2018-01-15 21:17:19.609 o.a.s.d.nimbus timer [INFO] ExceptionKeyNotFoundException(msg:microbatchTopology-15-1515707846-stormcode.ser)

2018-01-15 21:17:19.612 o.a.s.d.nimbus timer [INFO] Cleaning up microbatchTopology-2-1515694705

2018-01-15 21:17:19.648 o.a.s.d.nimbus timer [INFO] ExceptionKeyNotFoundException(msg:microbatchTopology-2-1515694705-stormcode.ser)

2018-01-15 21:17:19.650 o.a.s.d.nimbus timer [INFO] Cleaning up microbatchTopology-4-1515697192

2018-01-15 21:17:19.705 o.a.s.d.nimbus timer [INFO] ExceptionKeyNotFoundException(msg:microbatchTopology-4-1515697192-stormcode.ser)

2018-01-15 21:17:19.707 o.a.s.d.nimbus timer [INFO] Cleaning up microbatchTopology-22-1515814486

2018-01-15 21:17:19.743 o.a.s.d.nimbus timer [INFO] ExceptionKeyNotFoundException(msg:microbatchTopology-22-1515814486-stormcode.ser)

2018-01-15 21:17:19.745 o.a.s.d.nimbus timer [INFO] Cleaning up microbatchTopology-6-1515698178

2018-01-15 21:17:19.792 o.a.s.d.nimbus timer [INFO] ExceptionKeyNotFoundException(msg:microbatchTopology-6-1515698178-stormcode.ser)

2018-01-15 21:17:19.794 o.a.s.d.nimbus timer [INFO] Cleaning up microbatchTopology-1-1515693972

2018-01-15 21:17:19.830 o.a.s.d.nimbus timer [INFO] ExceptionKeyNotFoundException(msg:microbatchTopology-1-1515693972-stormcode.ser)

On Mon, Jan 15, 2018 at 4:08 PM, Stig Rohde Døssing <srdo@apache.org<mailto:srdo@apache.org>>
wrote:
The idea is that the deserializer converts from Kafka's internal representation of a message
(likely a byte array), while the RecordTranslator uses the deserialized value to construct
a Values the spout can emit, and is also responsible for declaring the spout's output fields
to Storm.

e.g. if you were reading Strings from Kafka, you might have a StringDeserializer that goes
byte[] -> String, and a RecordTranslator that declares that the spout will emit the fields
"message" and "emitTimestamp", and constructs a tuple using the string + System.getCurrentTimeMillis
when it is emitted.

Regarding the type error, it is because you're using the KafkaSpoutConfig.builder convenience
method. It produces a KafkaSpoutConfig<String, String>, which then expects a RecordTranslator
of the same type. When you need to use non-String keys and values, you need to use the Builder
constructors in KafkaSpoutConfig (e.g. new KafkaSpoutConfig.Builder(...)).

2018-01-15 21:50 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com<mailto:mabossert@gmail.com>>:
Also, I am getting a bit confused as to what "setRecordTranslator" should be.  You mentioned
the generic type didn't make sense, but when I changed it, I keep getting errors telling me
that setRecordTranslator expects a RecordTranslator[String,String], actual is SimpleRecordTranslator[Array[Byte],Array[Byte]]


val trans: SimpleRecordTranslator[Array[Byte],Array[Byte]] = new SimpleRecordTranslator[Array[Byte],Array[Byte]]((r:
ConsumerRecord[Array[Byte],Array[Byte]]) =>
        new Values(r.topic(), r.partition():java.lang.Integer, r.offset():java.lang.Long,
r.key(), r.value()),
        new Fields("topic", "partition", "offset", "key", "value"),
        "packets"
)

val spoutConf =  KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net:6667<http://r7u09.thanos.gotgdt.net:6667>",
"packets")
    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
    .setOffsetCommitPeriodMs(10000)
    .setMaxUncommittedOffsets(1000)
    .setRecordTranslator(trans)
    .setProp(ConsumerConfig.GROUP_ID_CONFIG,"pcapKafkaSpoutConsumer")
    .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
    .build()

On Mon, Jan 15, 2018 at 3:45 PM, M. Aaron Bossert <mabossert@gmail.com<mailto:mabossert@gmail.com>>
wrote:
No, I have not messed with the logging setup...Also, my other topologies (not using Storm-Kafka-Client)
run just fine...Have never had issues until now.

On Mon, Jan 15, 2018 at 3:38 PM, Stig Rohde Døssing <srdo@apache.org<mailto:srdo@apache.org>>
wrote:
Does running e.g. one of the storm-starter topologies work for you? You might also try deploying
the storm-kafka-client-examples topology I linked earlier, to verify whether it's a problem
with the spout setup, or an issue with the Storm install.

Have you modified the storm/log4j2/worker.xml? It might be set to filter out some logs?

(Really just guessing here)

2018-01-15 21:08 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com<mailto:mabossert@gmail.com>>:
That is it...The only thing I see that is suspicious is that the port keeps changing, suggesting
that the spout keeps dying.  But I don't see any red flags elsewhere to explain the recurring
restarts.

On Mon, Jan 15, 2018 at 2:57 PM, Stig Rohde Døssing <srdo@apache.org<mailto:srdo@apache.org>>
wrote:
Hm, that doesn't really tell me anything. Is it the full log? I'd expect to see some INFO-level
logs related to starting up the spout.

2018-01-15 20:54 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com<mailto:mabossert@gmail.com>>:

2018-01-15 19:21:26.132 STDERR Thread-1 [INFO] Jan 15, 2018 7:21:25 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:21:26.132 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:21:26.138 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:21:26.143 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

2018-01-15 19:23:29.223 STDERR Thread-1 [INFO] Jan 15, 2018 7:23:29 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:23:29.223 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:23:29.231 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:23:29.231 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

2018-01-15 19:27:46.230 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:27:46.230 STDERR Thread-1 [INFO] Jan 15, 2018 7:27:46 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:27:46.241 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:27:46.268 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

2018-01-15 19:29:49.325 STDERR Thread-1 [INFO] Jan 15, 2018 7:29:49 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:29:49.325 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:29:49.336 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:29:49.336 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

2018-01-15 19:31:59.270 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:31:59.270 STDERR Thread-1 [INFO] Jan 15, 2018 7:31:59 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:31:59.281 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:31:59.304 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

2018-01-15 19:34:02.315 STDERR Thread-1 [INFO] Jan 15, 2018 7:34:02 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:34:02.315 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:34:02.326 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:34:02.326 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

2018-01-15 19:36:13.337 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:36:13.337 STDERR Thread-1 [INFO] Jan 15, 2018 7:36:13 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:36:13.347 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:36:13.349 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

2018-01-15 19:38:16.341 STDERR Thread-1 [INFO] Jan 15, 2018 7:38:16 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:38:16.341 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:38:16.348 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:38:16.349 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

2018-01-15 19:40:27.423 STDERR Thread-1 [INFO] Jan 15, 2018 7:40:27 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:40:27.423 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:40:27.430 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:40:27.430 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

2018-01-15 19:42:30.411 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:42:30.411 STDERR Thread-1 [INFO] Jan 15, 2018 7:42:30 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:42:30.418 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:42:30.427 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

2018-01-15 19:44:42.462 STDERR Thread-1 [INFO] Jan 15, 2018 7:44:42 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:44:42.462 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:44:42.469 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:44:42.469 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

2018-01-15 19:46:45.469 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:46:45.469 STDERR Thread-1 [INFO] Jan 15, 2018 7:46:45 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:46:45.476 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:46:45.476 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

2018-01-15 19:48:55.496 STDERR Thread-1 [INFO] Jan 15, 2018 7:48:55 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:48:55.496 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:48:55.502 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:48:55.520 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

2018-01-15 19:50:58.565 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:50:58.565 STDERR Thread-1 [INFO] Jan 15, 2018 7:50:58 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:50:58.572 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:50:58.572 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

2018-01-15 19:53:08.572 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

2018-01-15 19:53:08.572 STDERR Thread-1 [INFO] Jan 15, 2018 7:53:08 PM info.ganglia.gmetric4j.GMonitor
start

2018-01-15 19:53:08.579 STDERR Thread-1 [INFO] INFO: Setting up 1 samplers

2018-01-15 19:53:08.580 STDERR Thread-1 [INFO] Error: JMX connector server communication error:
service:jmx:rmi://r7u14.thanos.gotgdt.net:16706<http://r7u14.thanos.gotgdt.net:16706>

On Mon, Jan 15, 2018 at 2:46 PM, Stig Rohde Døssing <srdo@apache.org<mailto:srdo@apache.org>>
wrote:
Your generic types don't make sense. K and V in  KafkaSpout<K, V> should be your key
and value types, which in your case are byte arrays.

Could you post the worker log for the spout executor? It'll probably be in storm/logs/worker-artifacts/$topo_name/$worker_port/worker.log.

2018-01-15 20:11 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com<mailto:mabossert@gmail.com>>:
Sorry to keep bugging you about this.  I am banging my head against the wall.  I have put
in all the fixes, I think...but am seeing no errors of any kind, nor am I seeing any tuples
coming from the Kafka Spout...even though in the Storm UI, I do see the Kafka topic offsets:
[Inline image 1]


// Set up the Kafka Spout
val spoutConf: KafkaSpoutConfig[String,String] =  KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net:6667<http://r7u09.thanos.gotgdt.net:6667>",
"packets")
    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
    .setOffsetCommitPeriodMs(10000)
    .setMaxUncommittedOffsets(1000)
    .setRecordTranslator((r: ConsumerRecord[String,String]) =>
        new Values(r.topic(), r.partition():java.lang.Integer, r.offset():java.lang.Long,
r.key(), r.value()),
        new Fields("topic", "partition", "offset", "key", "value"),
        "packets")
    .setProp(ConsumerConfig.GROUP_ID_CONFIG,"pcapKafkaSpoutConsumer")
    .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArrayDeserializer")
    .build()

// Generates list of PCAP files to process, periodically scanning a magic directory for new
files
topologyBuilder.setSpout("pcapKafkaSpout", new KafkaSpout(spoutConf),PcapFileSpoutWorkers)
    .setNumTasks(PcapFileSpoutTasks)
    .setMemoryLoad(16 * 1024)

// Load balancer that groups packets by key
topologyBuilder.setBolt("IpPairBolt",new IpPairBolt,IpPairBoltWorkers)
    .setNumTasks(IpPairBoltTasks)
    .fieldsGrouping("pcapKafkaSpout","packets",new Fields("key"))
    /*.shuffleGrouping("PcapReaderBolt","packets")
    .shuffleGrouping("NdStreamSpout","packets")*/
    .setMemoryLoad(16 * 1024)

On Mon, Jan 15, 2018 at 12:09 PM, Stig Rohde Døssing <srdo@apache.org<mailto:srdo@apache.org>>
wrote:
I don't see a port number in your bootstrapServers, it should probably be there. You also
need to set a groupId. If you want the spout to use your key or value deserializers, you need
to set them in the KafkaSpoutConfig as well, but you'll need to rewrite them so they implement
https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/serialization/Deserializer.html.

You can see how to set all these here https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L88
and the effect of setting them here https://kafka.apache.org/documentation/#consumerconfigs.
The generic props from KafkaSpoutConfig (plus bootstrapServers and key/value deserializer,
since these are mandatory) are passed directly to a KafkaConsumer, so the valid settings are
the ones listed in the Kafka documentation.

2018-01-15 16:42 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com<mailto:mabossert@gmail.com>>:
Sorry for, perhaps a dumb question, but I have rewritten my topology to accommodate the new
star-kafka-client API (I think!).  I see no errors of any kind, but there are no tuples emitted.
 This is the code in the topology as well as the scheme I was using in the old topology (old
storm-kafka library). My producer is creating a record that is translated to a byte array,
nothing fancy...the underlying record is a custom object.


// Set up the Kafka Spout
val spoutConf: KafkaSpoutConfig[String,String] =  KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net<http://r7u09.thanos.gotgdt.net>",
"packets")
    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
    .setOffsetCommitPeriodMs(10000)
    .setMaxUncommittedOffsets(1000)
    .build()

// Generates list of PCAP files to process, periodically scanning a magic directory for new
files
topologyBuilder.setSpout("pcapKafkaSpout", new KafkaSpout(spoutConf),PcapFileSpoutWorkers)
    .setNumTasks(PcapFileSpoutTasks)
    .setMemoryLoad(16 * 1024)

// Load balancer that groups packets by key
topologyBuilder.setBolt("IpPairBolt",new IpPairBolt,IpPairBoltWorkers)
    .setNumTasks(IpPairBoltTasks)
    .shuffleGrouping("pcapKafkaSpout")
    .setMemoryLoad(16 * 1024)



class PcapRecordKafkaScheme extends Scheme {
    override def deserialize(buffer: ByteBuffer): util.List[AnyRef] = {
        val byteArray: Array[Byte] = new Array[Byte](buffer.remaining())
        buffer.get(byteArray,0,byteArray.length)
        val record: IpPacketRecord = SerializationUtils.deserialize(byteArray).asInstanceOf[IpPacketRecord]
        new Values(record.ip_port_key,byteArray)
    }

    override def getOutputFields: Fields = {
        new Fields("key","value")
    }

}

.

On Sat, Jan 13, 2018 at 3:34 PM, M. Aaron Bossert <mabossert@gmail.com<mailto:mabossert@gmail.com>>
wrote:
Ah!  Perfect.  I’ll poke through the docs and hope for a smooth transition.  Thanks!

Get Outlook for iOS<https://aka.ms/o0ukef>
________________________________
From: Stig Rohde Døssing <srdo@apache.org<mailto:srdo@apache.org>>
Sent: Saturday, January 13, 2018 3:24:31 PM
To: user@storm.apache.org<mailto:user@storm.apache.org>
Subject: Re: Storm Kafka version mismatch issues

Yes, there is no code overlap between storm-kafka and storm-kafka-client. The setup code is
completely different. The code you posted is still using the storm-kafka classes (e.g. old
SpoutConfig instead of new KafkaSpoutConfig), so you're still on the old spout. I'm a little
surprised that code even compiled for you before, since the sbt you posted doesn't list storm-kafka
anymore.

There's documentation for the new spout at https://storm.apache.org/releases/1.1.1/storm-kafka-client.html,
and some example code at https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java.

2018-01-13 21:15 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com<mailto:mabossert@gmail.com>>:
Stig,

This project is still in a POC status, so I can wipe and restart anything...no big deal. 
Thanks!  So, I am now seeing some build errors when I try to run my assembly.  Is there a
different method for setting up the KafkaSpout?

[error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:15:8:
object BrokerHosts is not a member of package org.apache.storm.kafka
[error] import org.apache.storm.kafka.{BrokerHosts, KafkaSpout, SpoutConfig, ZkHosts}
[error]        ^
[error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:161:22:
not found: type BrokerHosts
[error]         val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
[error]                      ^
[error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:161:40:
not found: type ZkHosts
[error]         val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
[error]                                        ^
[error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:166:28:
not found: type SpoutConfig
[error]         val pcapKafkaConf: SpoutConfig = new SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
[error]                            ^
[error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:166:46:
not found: type SpoutConfig
[error]         val pcapKafkaConf: SpoutConfig = new SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
[error]                                              ^
[error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:173:34:
not found: type KafkaSpout
[error]         val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
[error]                                  ^
[error] 6 errors found

and here is the relevant part of my topology:


// Set up the Kafka Spout
val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
val pcapKafkaTopic: String = "packets"
val pcapZkRoot: String = "/packets"
val pcapClientId: String = "PcapKafkaSpout"

val pcapKafkaConf: SpoutConfig = new SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
pcapKafkaConf.ignoreZkOffsets = true
pcapKafkaConf.useStartOffsetTimeIfOffsetOutOfRange = true
pcapKafkaConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime
pcapKafkaConf.outputStreamId = "packets"
pcapKafkaConf.scheme = new SchemeAsMultiScheme(new PcapRecordKafkaScheme)

val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)

// Generates list of PCAP files to process, periodically scanning a magic directory for new
files
topologyBuilder.setSpout("pcapKafkaSpout", pcapKafkaSpout,PcapFileSpoutWorkers)
    .setNumTasks(PcapFileSpoutTasks)
    .setMemoryLoad(16 * 1024)

On Sat, Jan 13, 2018 at 3:05 PM, Stig Rohde Døssing <srdo@apache.org<mailto:srdo@apache.org>>
wrote:
Hi Aaron,

Please note that the position of your storm-kafka consumers won't be migrated, so the new
storm-kafka-client spout will start over on the topics you subscribe to. Do you need offsets
migrated, or is it fine if the consumers start from scratch?

I don't know whether HDP has special requirements but here's the general how-to for setting
up storm-kafka-client:

The storm-kafka-client library should work with any Kafka version 0.10.0.0 and above. You
should use the latest version of storm-kafka-client, which is currently 1.1.1. If you want
a decent number of extra fixes, you might consider building version 1.2.0 yourself from https://github.com/apache/storm/tree/1.x-branch/external/storm-kafka-client,
we are hoping to release this version before too long. Once you've declared a dependency on
storm-kafka-client, you should also add a dependency on org.apache.kafka:kafka-clients, in
the version that matches your Kafka broker version.

Looking at your sbt you should not need org.apache.kafka:kafka on the classpath. I think your
zookeeper, log4j and slf4j-log4j12 exclusions on storm-kafka-client are unnecessary as well.
I don't believe those dependencies are part of the storm-kafka-client dependency tree.

In case making those changes doesn't solve it, could you post a bit more of the stack trace
you get?

2018-01-13 20:23 GMT+01:00 M. Aaron Bossert <mabossert@gmail.com<mailto:mabossert@gmail.com>>:
All,

I have resurrected some old code that includes a Kafka Producer class as well as a storm topology
that includes a Kafka Spout to ingest the messages coming from the aforementioned producer.

I was using the storm-kafka library with Scala 2.11, but when I changed to the newer code
base, which is using Scala 2.12, I found that the older library wouldn't work...thus I wanted
to switch to the new storm-kafka-client, but am not sure what version I should be using. 
here is my build.sbt file, as well as the error messages I am seeing when the Spout actually
runs (my favorite assistant, Google, tells me it is due to a version mismatch between Storm
and Kafka).  I am using HDP 2.6.2, for whatever that is worth...


java.lang.NoSuchMethodError: kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;IIILjava/lang/String;Ljava/lang/String;)


name := "apachestormstreaming"

version := "0.1"

scalaVersion := "2.12.4"

scalacOptions := Seq("-unchecked", "-deprecation")

resolvers ++= Seq(
    "clojars" at "http://clojars.org/repo/",
    "HDP" at "http://repo.hortonworks.com/content/repositories/releases/",
    "Hortonworks Jetty" at "http://repo.hortonworks.com/content/repositories/jetty-hadoop/"
)

libraryDependencies ++= Seq(
    "org.apache.storm" % "flux-core" % "1.1.0.2.6.2.0-205",
    "org.apache.storm" % "storm-core" % "1.1.0.2.6.2.0-205" % Provided
        exclude("org.slf4j", "slf4j-log4j12")
        exclude("log4j","log4j"),
    "org.pcap4j" % "pcap4j-core" % "2.0.0-alpha",
    "org.pcap4j" % "pcap4j-packetfactory-static" % "2.0.0-alpha",
    "org.apache.hadoop" % "hadoop-common" % "2.7.3.2.6.2.0-205"
        exclude("org.slf4j", "slf4j-log4j12")
        exclude("log4j","log4j")
        exclude("commons-beanutils", "commons-beanutils-core")
        exclude("commons-beanutils", "commons-beanutils")
        exclude("commons-collections", "commons-collections"),
    "org.apache.storm" % "storm-kafka-client" % "1.1.0.2.6.2.0-205"
        exclude("org.apache.kafka","kafka-clients")
        exclude("org.slf4j", "slf4j-log4j12")
        exclude("log4j","log4j")
        exclude("org.apache.zookeeper","zookeeper"),
    "org.apache.kafka" % "kafka-clients" % "0.10.1.1",
    "org.apache.kafka" %% "kafka" % "0.10.1.1"
        exclude("org.slf4j", "slf4j-log4j12")
        exclude("log4j","log4j")
        exclude("org.apache.zookeeper","zookeeper"),
    "org.apache.hbase" % "hbase-common" % "1.1.2.2.6.2.0-205"
        exclude("org.slf4j","slf4j-log4j12")
        exclude("log4j","log4j"),
    "org.apache.hbase" % "hbase-client" % "1.1.2.2.6.2.0-205"
        exclude("org.slf4j","slf4j-log4j12")
        exclude("log4j","log4j"),
    "com.paulgoldbaum" %% "scala-influxdb-client" % "0.5.2",
    "org.apache.commons" % "commons-lang3" % "3.6",
    "org.influxdb" % "influxdb-java" % "2.7"
)

assemblyMergeStrategy in assembly := {
    case PathList(ps @ _*) if ps.last endsWith "project.clj" =>
        MergeStrategy.rename
    case PathList(ps @ _*) if ps.last endsWith "UnknownPacketFactory.class" =>
        MergeStrategy.rename
    case PathList("org","apache","http", _ @ _*) =>
        MergeStrategy.first
    case PathList("org","apache","commons","codec", _ @ _*) =>
        MergeStrategy.first
    case PathList("io","netty", _ @ _*) =>
        MergeStrategy.last
    case PathList(ps @ _*) if ps.last equalsIgnoreCase  "io.netty.versions.properties" =>
        MergeStrategy.first
    case PathList(ps @ _*) if ps.last equalsIgnoreCase  "libnetty-transport-native-epoll.so"
=>
        MergeStrategy.first
    case x => val oldStrategy = (assemblyMergeStrategy in assembly).value
        oldStrategy(x)
}
















Mime
View raw message