aries-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] cschneider closed pull request #16: ARIES-1881 Mongo based implementation
Date Thu, 10 Jan 2019 21:48:57 GMT
cschneider closed pull request #16: ARIES-1881 Mongo based implementation
URL: https://github.com/apache/aries-journaled-events/pull/16
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
index 529c980..44b77df 100644
--- a/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
+++ b/org.apache.aries.events.api/src/main/java/org/apache/aries/events/api/Message.java
@@ -17,10 +17,13 @@
  */
 package org.apache.aries.events.api;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
 
 /**
  * TODO If we allow wild card consumption then a message also needs a topic
@@ -31,6 +34,8 @@
     private final Map<String, String> properties;
 
     public Message(byte[] payload, Map<String, String> properties) {
+        requireNonNull(payload);
+        requireNonNull(properties);
         this.payload = payload.clone();
         this.properties = unmodifiableMap(new HashMap<>(properties));
     }
@@ -43,4 +48,25 @@ public Message(byte[] payload, Map<String, String> properties) {
         return properties;
     }
 
+    @Override
+    public String toString() {
+        return "Message" + properties;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Message message = (Message) o;
+        return Arrays.equals(payload, message.payload) &&
+                properties.equals(message.properties);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(properties);
+        result = 31 * result + Arrays.hashCode(payload);
+        return result;
+    }
+
 }
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiverImpl.java
b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiverImpl.java
index 2bf838f..f03f701 100644
--- a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiverImpl.java
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiverImpl.java
@@ -100,7 +100,7 @@ public void close() {
     private volatile boolean interrupted = false;
 
     private MessageReceiverImpl(MongoCollection<Document> col, Optional<MongoClient>
mongoClient) {
-        LOGGER.info("Creating new receiver: " + col.getNamespace().getCollectionName());
+        LOGGER.debug("Creating new receiver: " + col.getNamespace().getCollectionName());
         this.mongoClient = mongoClient;
         this.col = col;
     }
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSenderImpl.java
b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSenderImpl.java
index 849e91f..7f60be1 100644
--- a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSenderImpl.java
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageSenderImpl.java
@@ -70,7 +70,7 @@ public void close() {}
     private final long maxAge;
 
     private MessageSenderImpl(MongoCollection<Document> collection, long maxAge) {
-        LOGGER.info("Creating new publisher: " + collection.getNamespace().getCollectionName());
+        LOGGER.debug("Creating new publisher: " + collection.getNamespace().getCollectionName());
         ensureIndexes(collection);
         this.collection = collection;
         this.maxAge = maxAge;
diff --git a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoSubscription.java
b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoSubscription.java
index ea95f2f..f991502 100644
--- a/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoSubscription.java
+++ b/org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MongoSubscription.java
@@ -22,12 +22,8 @@
 import org.apache.aries.events.api.Received;
 import org.apache.aries.events.api.Seek;
 import org.apache.aries.events.api.Subscription;
-import org.bson.Document;
 import org.slf4j.Logger;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.function.Consumer;
 
 import static java.lang.Thread.currentThread;
@@ -114,7 +110,7 @@ private void poll(MessageReceiver receiver) {
         while (!interrupted()) {
             try {
                 Message message = receiver.receive(index);
-                LOGGER.info("Received: " + message);
+                LOGGER.debug("Received: " + message);
                 Received received = new Received(position(index), message);
                 consumer.accept(received);
                 index += 1L;
@@ -124,7 +120,7 @@ private void poll(MessageReceiver receiver) {
                 LOGGER.error("Error handling message", e);
             }
         }
-        LOGGER.info("Quitting " + this);
+        LOGGER.debug("Quitting " + this);
         receiver.close();
     }
 
diff --git a/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/MongoProvider.java
b/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/MongoProvider.java
new file mode 100644
index 0000000..783c702
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/MongoProvider.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.bson.Document;
+import org.junit.rules.ExternalResource;
+
+import java.util.Optional;
+import java.util.logging.Logger;
+
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Provides connection to an external mongodb instance
+ * New database gets created for each test and dropped
+ * afterwards.
+ * Database URL must be provided by mongoUri system
+ * property
+ */
+public class MongoProvider extends ExternalResource {
+
+    MongoCollection<Document> getCollection(String name) {
+        return database.getCollection(name);
+    }
+
+    //*********************************************
+    // Internals
+    //*********************************************
+
+    private static final String MONGO_URI_PROP = "aries.events.test.mongoUri";
+    private static final String DEFAULT_DB_NAME = "tmp_aries_events_test";
+    private MongoDatabase database;
+    private MongoClient client;
+
+    @Override
+    protected void before() {
+	String mongoUri = mongoUri();
+	client = MongoClients.create(mongoUri);
+	String dbName = Optional.ofNullable(new MongoClientURI(mongoUri).getDatabase())
+		.orElse(DEFAULT_DB_NAME);
+	database = client.getDatabase(dbName);
+    }
+
+    @Override
+    protected void after() {
+        if (database != null) {
+	    database.drop();
+	}
+        if (client != null) {
+	    client.close();
+	}
+    }
+
+    private static String mongoUri() {
+	String result = System.getProperty(MONGO_URI_PROP);
+	if (result == null) {
+	    String message = "No mongo URI provided.\n" +
+		    "  In order to enable mongo tests, define " + MONGO_URI_PROP + " system property\n"
+
+		    "  to point to a running instance of mongodb.\n" +
+		    "  Example:\n" +
+		    "        mvn test -D" + MONGO_URI_PROP + "=mongodb://localhost:27017/";
+	    System.out.println("WARNING: " + message);
+	    assumeTrue(message, false);
+	}
+	return result;
+    }
+
+}
diff --git a/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/SenderReceiverTest.java
b/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/SenderReceiverTest.java
new file mode 100644
index 0000000..f48627d
--- /dev/null
+++ b/org.apache.aries.events.mongo/src/test/java/org/apache/aries/events/mongo/SenderReceiverTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The SF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.aries.events.mongo;
+
+import com.mongodb.client.MongoCollection;
+import org.apache.aries.events.api.Message;
+import org.bson.Document;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.AbstractMap.SimpleEntry;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.aries.events.mongo.MessageReceiverImpl.messageReceiver;
+import static org.apache.aries.events.mongo.MessageSenderImpl.messageSender;
+import static org.junit.Assert.assertEquals;
+
+public class SenderReceiverTest {
+
+    @Test public void testReplicate() throws InterruptedException {
+        MongoCollection<Document> collection = mongoProvider.getCollection("events");
+        MessageSender sender = messageSender(collection, 1000 * 60 * 60 * 24 * 7);
+        MessageReceiver receiver = messageReceiver(collection);
+        Message expected = new Message(new byte[]{ 1, 2, 3 }, mapOf(
+                keyVal("key1", "val1"),
+                keyVal("key2", "val2"))
+        );
+        sender.send(expected);
+        sender.send(expected);
+        Message actual = receiver.receive(0);
+        assertEquals(expected, actual);
+    }
+
+    @Test(expected = NoSuchElementException.class)
+    public void testEvicted() throws InterruptedException {
+        MongoCollection<Document> collection = mongoProvider.getCollection("events");
+        MessageSender sender = messageSender(collection, 0);
+        MessageReceiver receiver = messageReceiver(collection);
+        Message expected = new Message(new byte[] { 1, 2, 3}, emptyMap());
+        sender.send(expected);
+        sender.send(expected);
+        receiver.receive(0);
+    }
+
+    //*********************************************
+    // Internals
+    //*********************************************
+
+    private MongoCollection<Document> collection;
+
+    @Rule
+    public MongoProvider mongoProvider = new MongoProvider();
+
+    private static Map.Entry<String, String> keyVal(String key, String value) {
+        return new SimpleEntry<>(key, value);
+    }
+
+    private static Map<String, String> mapOf(Map.Entry<String, String>... mappings)
{
+        Map<String, String> result = new HashMap<>();
+        for (Map.Entry<String, String> entry : mappings) {
+            result.put(entry.getKey(), entry.getValue());
+        }
+        return result;
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message