aries-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] tmaret closed pull request #17: Journaled Event implementation based on Kafka
Date Fri, 11 Jan 2019 09:36:35 GMT
tmaret closed pull request #17: Journaled Event implementation based on Kafka
URL: https://github.com/apache/aries-journaled-events/pull/17
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/org.apache.aries.events.kafka/pom.xml b/org.apache.aries.events.kafka/pom.xml
new file mode 100644
index 0000000..bf85186
--- /dev/null
+++ b/org.apache.aries.events.kafka/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.aries.events</groupId>
+    <artifactId>org.apache.aries.events</artifactId>
+    <version>0.1.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>org.apache.aries.events.kafka</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.aries.events</groupId>
+      <artifactId>org.apache.aries.events.api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <!-- Apache Kafka -->
+    <dependency>
+      <groupId>org.apache.servicemix.bundles</groupId>
+      <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
+      <version>2.1.0_1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.12</artifactId>
+      <version>2.1.0</version>
+    </dependency>
+  </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaEndpoint.java
b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaEndpoint.java
new file mode 100644
index 0000000..a638815
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaEndpoint.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+@ObjectClassDefinition(name = "Apache Aries Events - Apache Kafka endpoint",
+        description = "Apache Kafka endpoint")
+public @interface KafkaEndpoint {
+
+    @AttributeDefinition(name = "Kafka Bootstrap Servers",
+            description = "A comma separated list of host/port pairs to use for establishing
the initial connection to the Kafka cluster.")
+    String kafkaBootstrapServers() default "localhost:9092";
+
+
+}
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessaging.java
b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessaging.java
new file mode 100644
index 0000000..7f97d3e
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessaging.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.SubscribeRequestBuilder;
+import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
+import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.api.Type;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.metatype.annotations.Designate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.Integer.parseInt;
+import static java.lang.Long.parseLong;
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.singleton;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.stream.StreamSupport.stream;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+@Type("kafka")
+@Component(service = Messaging.class, configurationPolicy = ConfigurationPolicy.REQUIRE)
+@Designate(ocd = KafkaEndpoint.class)
+public class KafkaMessaging implements Messaging {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessaging.class);
+
+    /**
+     * The partition to send and receive records.
+     */
+    private static final int PARTITION = 0;
+
+    /**
+     * Shared Kafka producer instance ({@code KafkaProducer}s are thread-safe).
+     */
+    private KafkaProducer<String, byte[]> producer;
+
+    private Map<String, Object> producerConfig;
+
+    private KafkaEndpoint endPoint;
+
+    @Activate
+    public void activate(KafkaEndpoint endPoint) {
+        this.endPoint = endPoint;
+        producerConfig = new HashMap<>();
+        producerConfig.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, endPoint.kafkaBootstrapServers());
+        // We favour durability over throughput
+        // and thus requires full acknowledgment
+        // from replica leader and followers.
+        producerConfig.put(ACKS_CONFIG, "all");
+        producerConfig = unmodifiableMap(producerConfig);
+    }
+
+    @Deactivate
+    public void deactivate() {
+        closeQuietly(producer);
+    }
+
+    @Override
+    public void send(String topic, Message message) {
+        ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(topic,
PARTITION, null, message.getPayload(), toHeaders(message.getProperties()));
+        try {
+            RecordMetadata metadata = kafkaProducer().send(record).get();
+            LOG.info(format("Sent to %s", metadata));
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(format("Failed to send mesage on topic %s", topic),
e);
+        }
+    }
+
+    @Override
+    public Subscription subscribe(SubscribeRequestBuilder requestBuilder) {
+        SubscribeRequest request = requestBuilder.build();
+        KafkaConsumer<String, byte[]> consumer = buildKafkaConsumer(request.getSeek());
+
+        TopicPartition topicPartition = new TopicPartition(request.getTopic(), PARTITION);
+
+        Collection<TopicPartition> topicPartitions = singleton(topicPartition);
+        consumer.assign(topicPartitions);
+
+        if (request.getPosition() != null) {
+            consumer.seek(topicPartition, asKafkaPosition(request.getPosition()).getOffset());
+        } else if (request.getSeek() == Seek.earliest) {
+            consumer.seekToBeginning(topicPartitions);
+        } else {
+            consumer.seekToEnd(topicPartitions);
+        }
+
+        KafkaSubscription subscription = new KafkaSubscription(consumer, request.getCallback());
+        // TODO pool the threads
+        Thread thread = new Thread(subscription);
+        thread.setDaemon(true);
+        thread.start();
+        return subscription;
+    }
+
+    @Override
+    public Position positionFromString(String position) {
+        String[] chunks = position.split(":");
+        if (chunks.length != 2) {
+            throw new IllegalArgumentException(format("Illegal position format %s", position));
+        }
+        return new KafkaPosition(parseInt(chunks[0]), parseLong(chunks[1]));
+    }
+
+    static String positionToString(Position position) {
+        KafkaPosition kafkaPosition = asKafkaPosition(position);
+        return format("%s:%s", kafkaPosition.getPartition(), kafkaPosition.getOffset());
+    }
+
+    static Iterable<Header> toHeaders(Map<String, String> properties) {
+        return properties.entrySet().stream()
+                .map(KafkaMessaging::toHeader)
+                .collect(Collectors.toList());
+    }
+
+    static Map<String, String> toProperties(Headers headers) {
+        return stream(headers.spliterator(), true)
+                .collect(Collectors.toMap(Header::key, header -> new String(header.value(),
UTF_8)));
+    }
+
+    static RecordHeader toHeader(Map.Entry<String, String> property) {
+        return new RecordHeader(property.getKey(), property.getValue().getBytes(UTF_8));
+    }
+
+    static Message toMessage(ConsumerRecord<String, byte[]> record) {
+        return new Message(record.value(), toProperties(record.headers()));
+    }
+
+
+    private synchronized KafkaProducer<String, byte[]> kafkaProducer() {
+        if (producer == null) {
+            producer = new KafkaProducer<>(producerConfig);
+        }
+        return producer;
+    }
+
+    private KafkaConsumer<String, byte[]> buildKafkaConsumer(Seek seek) {
+
+        String groupId = UUID.randomUUID().toString();
+
+        Map<String, Object> consumerConfig = new HashMap<>();
+        consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, endPoint.kafkaBootstrapServers());
+        consumerConfig.put(GROUP_ID_CONFIG, groupId);
+        consumerConfig.put(ENABLE_AUTO_COMMIT_CONFIG, false);
+        consumerConfig.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        consumerConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        consumerConfig.put(AUTO_OFFSET_RESET_CONFIG, seek.name());
+
+        return new KafkaConsumer<>(unmodifiableMap(consumerConfig));
+    }
+
+
+    private void closeQuietly(Closeable closeable) {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (IOException ignore) {
+                // ignore
+            }
+        }
+    }
+
+    private static KafkaPosition asKafkaPosition(Position position) {
+        if (! KafkaPosition.class.isInstance(position)) {
+            throw new IllegalArgumentException(format("Position %s must be and instance of
%s", position, KafkaPosition.class.getCanonicalName()));
+        }
+        return (KafkaPosition) position;
+    }
+
+
+}
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaPosition.java
b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaPosition.java
new file mode 100644
index 0000000..7e36216
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaPosition.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import javax.annotation.Nonnull;
+
+import org.apache.aries.events.api.Position;
+
+public final class KafkaPosition implements Position {
+
+    private final int partition;
+
+    private final long offset;
+
+    public KafkaPosition(int partition, long offset) {
+        this.partition = partition;
+        this.offset = offset;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+    @Override
+    public String toString() {
+        return positionToString();
+    }
+
+    @Override
+    public String positionToString() {
+        return KafkaMessaging.positionToString(this);
+    }
+
+    @Override
+    public int compareTo(@Nonnull Position p) {
+        return Long.compare(offset, ((KafkaPosition)p).offset);
+    }
+}
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaSubscription.java
b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaSubscription.java
new file mode 100644
index 0000000..eaf94c6
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaSubscription.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.util.function.Consumer;
+
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Subscription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.String.format;
+import static java.time.Duration.ofHours;
+import static java.util.Objects.requireNonNull;
+import static org.apache.aries.events.kafka.KafkaMessaging.toMessage;
+
+public class KafkaSubscription implements Subscription, Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSubscription.class);
+
+    private volatile boolean running = true;
+
+    private final KafkaConsumer<String, byte[]> consumer;
+
+    private final Consumer<Received> callback;
+
+    public KafkaSubscription(KafkaConsumer<String, byte[]> consumer, Consumer<Received>
callback) {
+        this.consumer = requireNonNull(consumer);
+        this.callback = requireNonNull(callback);
+    }
+
+    @Override
+    public void run() {
+        try {
+            for (;running;) {
+                ConsumerRecords<String, byte[]> records = consumer.poll(ofHours(1));
+                records.forEach(record -> callback.accept(toReceived(record)));
+            }
+        } catch (WakeupException e) {
+            if (running) {
+                LOG.error("WakeupException while running {}", e.getMessage(), e);
+                throw e;
+            } else {
+                LOG.debug("WakeupException while stopping {}", e.getMessage(), e);
+            }
+        } catch(Throwable t) {
+            LOG.error(format("Catch Throwable %s closing subscription", t.getMessage()),
t);
+            throw t;
+        } finally {
+            // Close the network connections and sockets
+            consumer.close();
+        }
+    }
+
+    @Override
+    public void close() {
+        running = false;
+        consumer.wakeup();
+    }
+
+    private Received toReceived(ConsumerRecord<String, byte[]> record) {
+        Position position = new KafkaPosition(record.partition(), record.offset());
+        return new Received(position, toMessage(record));
+    }
+
+}
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaMessagingTest.java
b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaMessagingTest.java
new file mode 100644
index 0000000..b37951b
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaMessagingTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.SubscribeRequestBuilder;
+import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.kafka.setup.KafkaBaseTest;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static java.nio.charset.Charset.forName;
+import static java.util.Collections.singletonMap;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+public class KafkaMessagingTest extends KafkaBaseTest {
+
+    @Test
+    public void testPositionFromString() throws Exception {
+        Messaging messaging = new KafkaMessaging();
+        KafkaPosition kafkaPosition = (KafkaPosition) messaging.positionFromString("0:100");
+        assertEquals(0, kafkaPosition.getPartition());
+        assertEquals(100, kafkaPosition.getOffset());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPositionFromStringIllegalArgument() throws Exception {
+        Messaging messaging = new KafkaMessaging();
+        messaging.positionFromString("0:100:23");
+    }
+
+    @Test(timeout = 10000)
+    public void testSendAndReceive() throws Exception {
+
+        String topic = "test_send_and_receive";
+        createTopic(topic, 1);
+
+        KafkaEndpoint kafkaEndpoint = Mockito.mock(KafkaEndpoint.class);
+        when(kafkaEndpoint.kafkaBootstrapServers())
+                .thenReturn(getKafkaLocal().getKafkaBootstrapServer());
+        KafkaMessaging messaging = new KafkaMessaging();
+        messaging.activate(kafkaEndpoint);
+
+        byte[] payload = "test".getBytes(forName("UTF-8"));
+
+        Message message = new Message(payload, singletonMap("prop1", "value1"));
+        messaging.send(topic, message);
+
+        Semaphore invoked = new Semaphore(0);
+
+        SubscribeRequestBuilder requestBuilder = SubscribeRequestBuilder
+                .to(topic, (received) -> invoked.release())
+                .startAt(new KafkaPosition(0, 0));
+
+        try (Subscription subscription = messaging.subscribe(requestBuilder)) {
+            invoked.tryAcquire(10, TimeUnit.SECONDS);
+        }
+
+        messaging.deactivate();
+    }
+
+}
\ No newline at end of file
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaPositionTest.java
b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaPositionTest.java
new file mode 100644
index 0000000..3acd699
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaPositionTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.aries.events.api.Position;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KafkaPositionTest {
+
+    private static final Random RAND = new Random();
+
+    @Test
+    public void testGetPartition() throws Exception {
+        assertEquals(10, new KafkaPosition(10, 1000).getPartition());
+    }
+
+    @Test
+    public void testGetOffset() throws Exception {
+        assertEquals(1000, new KafkaPosition(10, 1000).getOffset());
+    }
+
+    @Test
+    public void testPositionToString() throws Exception {
+        assertEquals("10:1000", new KafkaPosition(10, 1000).positionToString());
+    }
+
+    @Test
+    public void testCompareTo() throws Exception {
+        assertEquals(0, comparePositions(position(RAND.nextInt(), 5), position(RAND.nextInt(),
5)));
+        assertEquals(1, comparePositions(position(RAND.nextInt(), 10), position(RAND.nextInt(),
5)));
+        assertEquals(-1, comparePositions(position(RAND.nextInt(), 2), position(RAND.nextInt(),
5)));
+    }
+
+    @Test
+    public void testOrder() {
+        NavigableMap<Position, String> positions = new TreeMap<>();
+        positions.put(new KafkaPosition(0, 0), "earliest");
+        positions.put(new KafkaPosition(0, 1), "mid");
+        positions.put(new KafkaPosition(0, 2), "latest");
+        assertEquals("earliest", positions.firstEntry().getValue());
+        assertEquals("latest", positions.lastEntry().getValue());
+    }
+
+    private int comparePositions(KafkaPosition position1, KafkaPosition position2) {
+        return position1.compareTo(position2);
+    }
+
+    private KafkaPosition position(int partition, long offset) {
+        return new KafkaPosition(partition, offset);
+    }
+}
\ No newline at end of file
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaBaseTest.java
b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaBaseTest.java
new file mode 100644
index 0000000..61763b6
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaBaseTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka.setup;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.createTempDirectory;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.aries.events.kafka.setup.KafkaLocal.getKafkaProperties;
+import static org.apache.kafka.clients.admin.AdminClient.create;
+import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
+
+public class KafkaBaseTest {
+
+    private static KafkaLocal kafkaLocal;
+
+    private static ZooKeeperLocal zooKeeperLocal;
+
+    private static Logger LOG = LoggerFactory.getLogger(KafkaBaseTest.class);
+
+    @BeforeClass
+    public static void startKafka() throws IOException {
+
+        int zkPort = randomAvailablePort();
+        String zkDir = createTempDirectory("zk").toString();
+        String zkConnect = format("127.0.0.1:%s", zkPort);
+        zooKeeperLocal = new ZooKeeperLocal(ZooKeeperLocal.getZooKeeperProperties(zkDir,
zkPort));
+        LOG.info(format("Started local ZooKeeper server on port %s and dataDirectory %s",
zkPort, zkDir));
+
+        int kafkaPort = randomAvailablePort();
+        String kafkaLogDir = createTempDirectory("kafka").toString();
+        kafkaLocal = new KafkaLocal(getKafkaProperties(kafkaLogDir, kafkaPort, zkConnect));
+        LOG.info(format("Started local Kafka on port %s and logDirectory %s", zkConnect,
kafkaLogDir));
+    }
+
+    @AfterClass
+    public static void shutdownKafka() {
+        if (kafkaLocal != null) {
+            kafkaLocal.stop();
+        }
+        if (zooKeeperLocal != null) {
+            zooKeeperLocal.stop();
+        }
+    }
+
+    public static KafkaLocal getKafkaLocal() {
+        return kafkaLocal;
+    }
+
+
+    public Set<String> listTopics() {
+        try (AdminClient admin = buildAdminClient()) {
+            ListTopicsResult result = admin.listTopics();
+            return result.names().get();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list topics", e);
+        }
+    }
+
+    public void createTopic(String topicName, int numPartitions) {
+        NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
+        try (AdminClient admin = buildAdminClient()) {
+            CreateTopicsResult result = admin.createTopics(singletonList(newTopic));
+            result.values().get(topicName).get();
+            LOG.info(format("created topic %s", topicName));
+        } catch (Exception e) {
+            throw new RuntimeException(format("Failed to create topic %s", topicName), e);
+        }
+    }
+
+    public void deleteTopic(String topicName) {
+        try (AdminClient admin = buildAdminClient()) {
+            DeleteTopicsResult result = admin.deleteTopics(Collections.singleton(topicName));
+            result.all().get();
+            LOG.info(format("deleted topic %s", topicName));
+        } catch (Exception e) {
+            throw new RuntimeException(format("Failed to delete topic %s", topicName), e);
+        }
+    }
+
+    private static int randomAvailablePort() {
+        try (ServerSocket ss = new ServerSocket(0)) {
+            return ss.getLocalPort();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private AdminClient buildAdminClient() {
+        return create(singletonMap(BOOTSTRAP_SERVERS_CONFIG, kafkaLocal.getKafkaBootstrapServer()));
+    }
+
+}
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaLocal.java
b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaLocal.java
new file mode 100644
index 0000000..e55e136
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/KafkaLocal.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka.setup;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+
+import static java.lang.String.format;
+
+public class KafkaLocal {
+
+    private final KafkaServerStartable server;
+
+    private final String kafkaBootstrapServer;
+
+    public KafkaLocal(Map<String, Object> kafkaProperties) {
+        KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+        kafkaBootstrapServer = format("%s:%s", kafkaConfig.hostName(), kafkaConfig.port());
+        server = new KafkaServerStartable(kafkaConfig);
+        server.startup();
+    }
+
+    public void stop() {
+        server.shutdown();
+    }
+
+    public String getKafkaBootstrapServer() {
+        return kafkaBootstrapServer;
+    }
+
+    public static Map<String, Object> getKafkaProperties(String logDir, int port, String
zkConnect) {
+        Map<String, Object> props = new HashMap<>();
+        props.put("host.name", "localhost");
+        props.put("log.dir", logDir);
+        props.put("port", port);
+        props.put("zookeeper.connect", zkConnect);
+        return props;
+    }
+}
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/ZooKeeperLocal.java
b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/ZooKeeperLocal.java
new file mode 100644
index 0000000..ea479ff
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/setup/ZooKeeperLocal.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka.setup;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Properties;
+
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+public class ZooKeeperLocal {
+
+    private final ZooKeeperServerMain server;
+
+    public ZooKeeperLocal(Properties zkProperties) {
+        QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
+        try {
+            quorumPeerConfig.parseProperties(zkProperties);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        ServerConfig serverConfig = new ServerConfig();
+        serverConfig.readFrom(quorumPeerConfig);
+
+        server = new ZooKeeperServerMain();
+
+        Thread dt = new Thread(runnable(serverConfig));
+        dt.setDaemon(true);
+        dt.start();
+    }
+
+    public void stop() {
+        try {
+            Method shutdown = server.getClass().getDeclaredMethod("shutdown");
+            shutdown.setAccessible(true);
+            shutdown.invoke(server);
+        } catch (Exception e) {
+            throw new RuntimeException();
+        }
+    }
+
+    public static Properties getZooKeeperProperties(String dataDirectory, int port) {
+        Properties props = new Properties();
+        props.put("dataDir", dataDirectory);
+        props.put("clientPort", port);
+        return props;
+    }
+
+    private Runnable runnable(ServerConfig serverConfig) {
+        return () -> {
+            try {
+                server.runFromConfig(serverConfig);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+}
diff --git a/pom.xml b/pom.xml
index 6610556..6e279c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,13 +43,14 @@
     <modules>
         <module>org.apache.aries.events.api</module>
         <module>org.apache.aries.events.memory</module>
+        <module>org.apache.aries.events.kafka</module>
         <module>org.apache.aries.events.mongo</module>
     </modules>
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <bnd.version>4.1.0</bnd.version>
-        <slf4j.version>1.7.14</slf4j.version>
+        <slf4j.version>1.7.25</slf4j.version>
         <log4j.version>1.2.6</log4j.version>
         <exam.version>4.12.0</exam.version>
     </properties>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message