Thanks for the great update. Which version of schema-registry resolves the issue?

- Jungtaek Lim (HeartSaVioR)

2016년 10월 13일 (목) 오후 11:13, Kristopher Kane <kkane.list@gmail.com>님이 작성:
To follow up on this, our specific problem was with a custom CachedSchemaRegistry.  external/storm-hdfs master branch uses CachedSchemaRegistry from a Confluent dep on version 1 which also has this bug.  It is fixed in later Confluent versions of confluentinc/schema-registry like this missing cache entry in 1.x: https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java#L152

I don't have time right now to see if storm-hdfs can bump to something newer than https://github.com/apache/storm/blob/master/external/storm-hdfs/pom.xml#L205  but will report if I do. 

Kris

On Sun, Sep 11, 2016 at 1:31 PM, Kristopher Kane <kkane.list@gmail.com> wrote:
This took a while as I could not get INFO logging to come out of the serializer in Kryo.  The CachedSchemaRegistry and EnhancedCachedSchmeaRegistry - in our raw scheme deserializer (with serialization into Avro) is 100% cache after the initial load.  As you said, the serializer with the IdentityHashMap in org.apache.storm.hdfs.avro.ConfluentAvroSerializer.getFingerprint(ConfluentAvroSerializer.java:74)  looks to be the problem - although I couldn't get it to log the cache misses for me in the serializer itself. 

Here is the evidence:

This thread dump is after the topology has been running for a few minutes.  No new schemas are being introduced but still opening 'HttpURLConnection'.  The deserializer bolt says all cache requests are a hit and are not going to the SchemaRegistry

"Thread-3-disruptor-executor[8 8]-send-queue" - Thread t@49
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
- locked <670c88ea> (a java.io.BufferedInputStream)
at sun.net.www.MeteredStream.read(MeteredStream.java:134)
- locked <2a8ccb96> (a sun.net.www.http.KeepAliveStream)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3336)
at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:503)
at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:129)
at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:224)
at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1244)
at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:755)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2199)
at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:137)
at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.lookUpSubjectVersion(RestUtils.java:164)
at org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry.getVersionFromRegistry(EnhancedCachedSchemaRegistry.java:80)
at org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry.getVersion(EnhancedCachedSchemaRegistry.java:176)
- locked <6b561b0f> (a org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry)
at org.apache.storm.hdfs.avro.ConfluentAvroSerializer.getFingerprint(ConfluentAvroSerializer.java:74)
at org.apache.storm.hdfs.avro.AbstractAvroSerializer.write(AbstractAvroSerializer.java:50)
at org.apache.storm.hdfs.avro.AbstractAvroSerializer.write(AbstractAvroSerializer.java:45)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:486)
at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44)
at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44)
at backtype.storm.daemon.worker$mk_transfer_fn$transfer_fn__5543.invoke(worker.clj:142)
at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3539.invoke(executor.clj:274)
at backtype.storm.disruptor$clojure_handler$reify__3196.onEvent(disruptor.clj:58)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at backtype.storm.disruptor$consume_loop_STAR_$fn__3209.invoke(disruptor.clj:94)
at backtype.storm.util$async_loop$fn__544.invoke(util.clj:475)
at clojure.lang.AFn.run(AFn.java:22)
at java.lang.Thread.run(Thread.java:745)


I've attached two screen shots of the JVisualVM CPU profilers.  The one titled 'multiple-jvm' represents a topology with two workers and high CPU usage on RestUtil.  The file title 'single-jvm' represents a topology with one worker and no CPU usage for RestUtil.


I think this is pretty good evidence but I would love to know how to log from the serializer running in Kryo as this would give me 100% proof.  Can anyone help me understand what is going on there? 

Thanks, 

Kris

On Wed, Sep 7, 2016 at 12:11 PM, Aaron Niskodé-Dossett <dossett@gmail.com> wrote:
Let us know what you find, especially if the serializer needs to be more defensive to ensure proper caching.

On Tue, Sep 6, 2016 at 8:45 AM Kristopher Kane <kkane.list@gmail.com> wrote:
Come to think of it, I did see RestUtils rank some what higher in the visualvm CPU profiler but did not give it the attention it deserved. 

On Tue, Sep 6, 2016 at 9:39 AM, Aaron Niskodé-Dossett <dossett@gmail.com> wrote:
Hi Kris,

One possibility is that the Serializer isn't actually caching the schema <-> id mappings and is hitting the schema registry every time.  The call to register() in getFingerprint() [1] in particular can be a finicky since the cache is ultimately in an IDENTITY hash map, not a regular old hashmap[2].  I'm familiar with the Avro deserializer you're using and though it accounted for this, but perhaps not.

You could add timing information to the getFingerprint() and getSchema() calls in ConfluentAvroSerializer.  If the results indicate cache misses, that's probably your culprit.

Best, Aaron



On Tue, Sep 6, 2016 at 7:40 AM Kristopher Kane <kkane.list@gmail.com> wrote:
Hi everyone.  

I have a simple topology that uses the Avro serializer (https://github.com/apache/storm/blob/master/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java)  and writes to Elasticsearch. 

The topology is like this:

Kafka (raw scheme) -> Avro deserializer -> Elasticsearch

This topology runs well with one worker, however, once I add one more worker (total of two) and change nothing else, the topology throughput drops and tuples start timing out.  

I've attached visualvm/jstatd to the workers when in multi worker mode - and added some jmx configs to the worker opts - but I am unable to see anything glaring.

I've never seen Storm act this way but have also never worked with a custom serializer so assume that it is the culprit but I cannot explain why. 

Any pointers would be appreciated. 

Kris