storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From doss...@apache.org
Subject [1/3] storm git commit: STORM-1518: Backport of STORM-1504
Date Wed, 03 Feb 2016 13:56:53 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 937bc742c -> d9c04093a


STORM-1518: Backport of STORM-1504


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ea9abeb2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ea9abeb2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ea9abeb2

Branch: refs/heads/1.x-branch
Commit: ea9abeb2d67402cf82e80adf6498a30a38b3a0a4
Parents: 937bc74
Author: Aaron Dossett <aaron.dossett@target.com>
Authored: Tue Feb 2 15:35:39 2016 -0600
Committer: Aaron Dossett <aaron.dossett@target.com>
Committed: Tue Feb 2 15:35:39 2016 -0600

----------------------------------------------------------------------
 external/storm-hdfs/README.md                   | 15 +++-
 external/storm-hdfs/pom.xml                     | 22 ++++++
 .../storm/hdfs/avro/AbstractAvroSerializer.java | 80 +++++++++++++++++++
 .../storm/hdfs/avro/AvroSchemaRegistry.java     | 28 +++++++
 .../org/apache/storm/hdfs/avro/AvroUtils.java   | 44 +++++++++++
 .../hdfs/avro/ConfluentAvroSerializer.java      | 83 ++++++++++++++++++++
 .../storm/hdfs/avro/FixedAvroSerializer.java    | 67 ++++++++++++++++
 .../storm/hdfs/avro/GenericAvroSerializer.java  | 36 +++++++++
 .../storm/hdfs/bolt/AvroGenericRecordBolt.java  |  4 -
 .../hdfs/avro/TestFixedAvroSerializer.java      | 76 ++++++++++++++++++
 .../hdfs/avro/TestGenericAvroSerializer.java    | 68 ++++++++++++++++
 .../test/resources/FixedAvroSerializer.config   |  2 +
 pom.xml                                         |  3 +
 13 files changed, 522 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ea9abeb2/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index bf63ad9..2fc4c7b 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -307,7 +307,6 @@ The `org.apache.storm.hdfs.bolt.AvroGenericRecordBolt` class allows you
to write
                 .withRotationPolicy(rotationPolicy)
                 .withSyncPolicy(syncPolicy);
 ```
-
 The setup is very similar to the `SequenceFileBolt` example above.  The key difference is
that instead of specifying a
 `SequenceFormat` you must provide a string representation of an Avro schema through the `withSchemaAsString()`
method.
 An `org.apache.avro.Schema` object cannot be directly provided since it does not implement
`Serializable`.
@@ -315,6 +314,18 @@ An `org.apache.avro.Schema` object cannot be directly provided since
it does not
 The AvroGenericRecordBolt expects to receive tuples containing an Avro GenericRecord that
conforms to the provided
 schema.
 
+To use this bolt you **must** register the appropriate Kryo serializers with your topology
configuration.  A convenience
+method is provided for this:
+
+`AvroGenericRecordBolt.addAvroKryoSerializations(conf);`
+
+By default Storm will use the ```GenericAvroSerializer``` to handle serialization.  This
will work, but there are much 
+faster options available if you can pre-define the schemas you will be using or utilize an
external schema registry. An
+implementation using the Confluent Schema Registry is provided, but others can be implemented
and provided to Storm.
+Please see the javadoc for classes in org.apache.storm.hdfs.avro for information about using
the built-in options or
+creating your own.
+
+
 ## HDFS Bolt support for Trident API
 storm-hdfs also includes a Trident `state` implementation for writing data to HDFS, with
an API that closely mirrors
 that of the bolts.
@@ -548,4 +559,4 @@ under the License.
 # Committer Sponsors
 
  * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
- * Bobby Evans ([bobby@apache.org](mailto:bobby@apache.org))
\ No newline at end of file
+ * Bobby Evans ([bobby@apache.org](mailto:bobby@apache.org))

http://git-wip-us.apache.org/repos/asf/storm/blob/ea9abeb2/external/storm-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index da4148a..5e8cce4 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -35,6 +35,13 @@
         </developer>
     </developers>
 
+    <repositories>
+        <repository>
+            <id>confluent</id>
+            <url>http://packages.confluent.io/maven</url>
+        </repository>
+    </repositories>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
@@ -74,6 +81,10 @@
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -183,6 +194,17 @@
             <version>4.11</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>io.confluent</groupId>
+            <artifactId>kafka-avro-serializer</artifactId>
+            <version>1.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/ea9abeb2/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java
new file mode 100644
index 0000000..ddf015d
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+
+//Generously adapted from:
+//https://github.com/kijiproject/kiji-express/blob/master/kiji-express/src/main/scala/org/kiji/express/flow/framework/serialization/AvroSerializer.scala
+//Which has as an ASL2.0 license
+
+/**
+ * This abstract class can be extended to implement concrete classes capable of (de)serializing
generic avro objects
+ * across a Topology.  The methods in the AvroSchemaRegistry interface specify how schemas
can be mapped to unique
+ * identifiers and vice versa.  Implementations based on pre-defining schemas or utilizing
an external schema registry
+ * are provided.
+ */
+public abstract class AbstractAvroSerializer extends Serializer<GenericContainer> implements
AvroSchemaRegistry {
+
+    @Override
+    public void write(Kryo kryo, Output output, GenericContainer record) {
+
+        String fingerPrint = this.getFingerprint(record.getSchema());
+        output.writeString(fingerPrint);
+        GenericDatumWriter<GenericContainer> writer = new GenericDatumWriter<>(record.getSchema());
+
+        BinaryEncoder encoder = EncoderFactory
+                .get()
+                .directBinaryEncoder(output, null);
+        try {
+            writer.write(record, encoder);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass)
{
+        Schema theSchema = this.getSchema(input.readString());
+        GenericDatumReader<GenericContainer> reader = new GenericDatumReader<>(theSchema);
+        Decoder decoder = DecoderFactory
+                .get()
+                .directBinaryDecoder(input, null);
+
+        GenericContainer foo;
+        try {
+            foo = reader.read(null, decoder);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return foo;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea9abeb2/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java
new file mode 100644
index 0000000..0d1dc8b
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import org.apache.avro.Schema;
+
+import java.io.Serializable;
+
+public interface AvroSchemaRegistry extends Serializable {
+    String getFingerprint(Schema schema);
+
+    Schema getSchema(String fingerPrint);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea9abeb2/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java
new file mode 100644
index 0000000..5549291
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.storm.Config;
+
+public class AvroUtils {
+    /**
+     * A helper method to extract avro serialization configurations from the topology configuration
and register
+     * specific kryo serializers as necessary.  A default serializer will be provided if
none is specified in the
+     * configuration.  "avro.serializer" should specify the complete class name of the serializer,
e.g.
+     * "org.apache.stgorm.hdfs.avro.GenericAvroSerializer"
+     *
+     * @param conf The topology configuration
+     * @throws ClassNotFoundException If the specified serializer cannot be located.
+     */
+    public static void addAvroKryoSerializations(Config conf) throws ClassNotFoundException
{
+        final Class serializerClass;
+        if (conf.containsKey("avro.serializer")) {
+            serializerClass = Class.forName((String)conf.get("avro.serializer"));
+        }
+        else {
+            serializerClass = GenericAvroSerializer.class;
+        }
+        conf.registerSerialization(GenericData.Record.class, serializerClass);
+        conf.setSkipMissingKryoRegistrations(false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea9abeb2/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
new file mode 100644
index 0000000..2008a3e
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * This class provides a mechanism to utilize the Confluent Schema Registry (https://github.com/confluentinc/schema-registry)
+ * for Storm to (de)serialize Avro generic records across a topology.  It assumes the schema
registry is up and running
+ * completely independent of Storm.
+ */
+public class ConfluentAvroSerializer extends AbstractAvroSerializer {
+
+    private SchemaRegistryClient theClient;
+    final private String url;
+
+    /**
+     * A constructor for use by test cases ONLY, thus the default scope.
+     * @param url The complete URL reference of a confluent schema registry, e.g. "http://HOST:PORT"
+     */
+    ConfluentAvroSerializer(String url) {
+        this.url = url;
+        this.theClient = new CachedSchemaRegistryClient(this.url, 10000);
+    }
+
+    /**
+     * A constructor with a signature that Storm can locate and use with kryo registration.
+     * See Storm's SerializationFactory class for details
+     *
+     * @param k Unused but needs to be present for Serialization Factory to find this constructor
+     * @param stormConf The global storm configuration. Must define "avro.schemaregistry.confluent"
to locate the
+     *                  confluent schema registry. Should in the form of "http://HOST:PORT"
+     */
+    public ConfluentAvroSerializer(Kryo k, Map stormConf) {
+        url = (String) stormConf.get("avro.schemaregistry.confluent");
+        this.theClient = new CachedSchemaRegistryClient(this.url, 10000);
+    }
+
+    @Override
+    public String getFingerprint(Schema schema) {
+        final String subject = schema.getName();
+        final int guid;
+        try {
+            guid = theClient.register(subject, schema);
+        } catch (IOException | RestClientException e) {
+            throw new RuntimeException(e);
+        }
+        return Integer.toString(guid);
+    }
+
+    @Override
+    public Schema getSchema(String fingerPrint) {
+        final Schema theSchema;
+        try {
+            theSchema = theClient.getByID(Integer.parseInt(fingerPrint));
+        } catch (IOException | RestClientException e) {
+            throw new RuntimeException(e);
+        }
+        return theSchema;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea9abeb2/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java
new file mode 100644
index 0000000..4dd5fdc
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.commons.codec.binary.Base64;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A class to help (de)serialize a pre-defined set of Avro schemas.  Schemas should be listed,
one per line, in a file
+ * called "FixedAvroSerializer.config", which must be part of the Storm topology jar file.
 Any schemas intended to be
+ * used with this class **MUST** be defined in that file.
+ */
+public class FixedAvroSerializer extends AbstractAvroSerializer {
+
+    private final static String FP_ALGO = "CRC-64-AVRO";
+    final Map<String, Schema> fingerprint2schemaMap = new HashMap<>();
+    final Map<Schema, String> schema2fingerprintMap = new HashMap<>();
+
+    public FixedAvroSerializer() throws IOException, NoSuchAlgorithmException {
+        InputStream in = this.getClass().getClassLoader().getResourceAsStream("FixedAvroSerializer.config");
+        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+
+        String line;
+        while((line = reader.readLine()) != null) {
+            Schema schema = new Schema.Parser().parse(line);
+            byte [] fp = SchemaNormalization.parsingFingerprint(FP_ALGO, schema);
+            String fingerPrint = new String(Base64.decodeBase64(fp));
+
+            fingerprint2schemaMap.put(fingerPrint, schema);
+            schema2fingerprintMap.put(schema, fingerPrint);
+        }
+    }
+
+    @Override
+    public String getFingerprint(Schema schema) {
+        return schema2fingerprintMap.get(schema);
+    }
+
+    @Override
+    public Schema getSchema(String fingerPrint) {
+        return fingerprint2schemaMap.get(fingerPrint);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea9abeb2/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java
new file mode 100644
index 0000000..ecf8c49
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import org.apache.avro.Schema;
+
+/**
+ * A default implementation of the AvroSerializer that will just pass literal schemas back
and forth.  This should
+ * only be used if no other serializer will fit a use case.
+ */
+public class GenericAvroSerializer extends AbstractAvroSerializer {
+    @Override
+    public String getFingerprint(Schema schema) {
+        return schema.toString();
+    }
+
+    @Override
+    public Schema getSchema(String fingerPrint) {
+        return new Schema.Parser().parse(fingerPrint);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/ea9abeb2/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
index c817c98..1fd2e2f 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
@@ -17,8 +17,6 @@
  */
 package org.apache.storm.hdfs.bolt;
 
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
@@ -41,8 +39,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.util.EnumSet;
-import java.util.List;
-import java.util.LinkedList;
 import java.util.Map;
 
 public class AvroGenericRecordBolt extends AbstractHdfsBolt{

http://git-wip-us.apache.org/repos/asf/storm/blob/ea9abeb2/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java
new file mode 100644
index 0000000..a584f91
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import org.apache.avro.Schema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestFixedAvroSerializer {
+    //These should match FixedAvroSerializer.config in the test resources
+    private static final String schemaString1 = "{\"type\":\"record\"," +
+            "\"name\":\"stormtest1\"," +
+            "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," +
+            "{ \"name\":\"int1\", \"type\":\"int\" }]}";
+    private static final String schemaString2 = "{\"type\":\"record\"," +
+            "\"name\":\"stormtest2\"," +
+            "\"fields\":[{\"name\":\"foobar1\",\"type\":\"string\"}," +
+            "{ \"name\":\"intint1\", \"type\":\"int\" }]}";
+    private static final Schema schema1;
+    private static final Schema schema2;
+
+    final AvroSchemaRegistry reg;
+
+    static {
+
+        Schema.Parser parser = new Schema.Parser();
+        schema1 = parser.parse(schemaString1);
+
+        parser = new Schema.Parser();
+        schema2 = parser.parse(schemaString2);
+    }
+
+    public TestFixedAvroSerializer() throws Exception{
+        reg  = new FixedAvroSerializer();
+    }
+
+    @Test
+    public void testSchemas() {
+        testTheSchema(schema1);
+        testTheSchema(schema2);
+    }
+
+    @Test public void testDifferentFPs() {
+        String fp1 = reg.getFingerprint(schema1);
+        String fp2 = reg.getFingerprint(schema2);
+
+        Assert.assertNotEquals(fp1, fp2);
+    }
+
+    private void testTheSchema(Schema schema) {
+        String fp1 = reg.getFingerprint(schema);
+        Schema found = reg.getSchema(fp1);
+        String fp2 = reg.getFingerprint(found);
+
+        Assert.assertEquals(found, schema);
+        Assert.assertEquals(fp1, fp2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea9abeb2/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java
new file mode 100644
index 0000000..ddfdcf5
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import org.apache.avro.Schema;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestGenericAvroSerializer {
+    private static final String schemaString1 = "{\"type\":\"record\"," +
+            "\"name\":\"stormtest1\"," +
+            "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," +
+            "{ \"name\":\"int1\", \"type\":\"int\" }]}";
+    private static final String schemaString2 = "{\"type\":\"record\"," +
+            "\"name\":\"stormtest2\"," +
+            "\"fields\":[{\"name\":\"foobar1\",\"type\":\"string\"}," +
+            "{ \"name\":\"intint1\", \"type\":\"int\" }]}";
+    private static final Schema schema1;
+    private static final Schema schema2;
+
+    AvroSchemaRegistry reg = new GenericAvroSerializer();
+
+    static {
+
+        Schema.Parser parser = new Schema.Parser();
+        schema1 = parser.parse(schemaString1);
+
+        parser = new Schema.Parser();
+        schema2 = parser.parse(schemaString2);
+    }
+
+    @Test
+    public void testSchemas() {
+        testTheSchema(schema1);
+        testTheSchema(schema2);
+    }
+
+    @Test public void testDifferentFPs() {
+        String fp1 = reg.getFingerprint(schema1);
+        String fp2 = reg.getFingerprint(schema2);
+
+        Assert.assertNotEquals(fp1, fp2);
+    }
+
+    private void testTheSchema(Schema schema) {
+        String fp1 = reg.getFingerprint(schema);
+        Schema found = reg.getSchema(fp1);
+        String fp2 = reg.getFingerprint(found);
+
+        Assert.assertEquals(found, schema);
+        Assert.assertEquals(fp1, fp2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea9abeb2/external/storm-hdfs/src/test/resources/FixedAvroSerializer.config
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/resources/FixedAvroSerializer.config b/external/storm-hdfs/src/test/resources/FixedAvroSerializer.config
new file mode 100644
index 0000000..971d411
--- /dev/null
+++ b/external/storm-hdfs/src/test/resources/FixedAvroSerializer.config
@@ -0,0 +1,2 @@
+{"type":"record", "name":"stormtest1", "fields":[{"name":"foo1","type":"string"}, {"name":"int1",
"type":"int" }]}
+{"type":"record", "name":"stormtest2", "fields":[{"name":"foobar1","type":"string"}, {"name":"intint1",
"type":"int" }]}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/ea9abeb2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 691a8cb..025fb68 100644
--- a/pom.xml
+++ b/pom.xml
@@ -357,6 +357,9 @@
                                 <!-- StormSQL -->
                                 <exclude>**/src/codegen/config.fmpp</exclude>
                                 <exclude>**/src/codegen/data/Parser.tdd</exclude>
+
+                                <!-- Avro Serializer Test Resource -->
+                                <exclude>**/src/test/resources/FixedAvroSerializer.config</exclude>
                             </excludes>
                         </configuration>
                     </plugin>


Mime
View raw message