aries-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] tmaret closed pull request #4: Kafka implementation
Date Wed, 02 Jan 2019 19:12:39 GMT
tmaret closed pull request #4: Kafka implementation
URL: https://github.com/apache/aries-journaled-events/pull/4
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/org.apache.aries.events.kafka/pom.xml b/org.apache.aries.events.kafka/pom.xml
new file mode 100644
index 0000000..d4a136d
--- /dev/null
+++ b/org.apache.aries.events.kafka/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.aries.events</groupId>
+    <artifactId>org.apache.aries.events</artifactId>
+    <version>0.1.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>org.apache.aries.events.kafka</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.aries.events</groupId>
+      <artifactId>org.apache.aries.events.api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <!-- Apache Kafka -->
+    <dependency>
+      <groupId>org.apache.servicemix.bundles</groupId>
+      <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
+      <version>2.1.0_1</version>
+    </dependency>
+  </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaEndpoint.java
b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaEndpoint.java
new file mode 100644
index 0000000..a638815
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaEndpoint.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+@ObjectClassDefinition(name = "Apache Aries Events - Apache Kafka endpoint",
+        description = "Apache Kafka endpoint")
+public @interface KafkaEndpoint {
+
+    @AttributeDefinition(name = "Kafka Bootstrap Servers",
+            description = "A comma separated list of host/port pairs to use for establishing
the initial connection to the Kafka cluster.")
+    String kafkaBootstrapServers() default "localhost:9092";
+
+
+}
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessage.java
b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessage.java
new file mode 100644
index 0000000..a1c61a6
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.util.Map;
+
+import org.apache.aries.events.api.Message;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
+
+public class KafkaMessage implements Message {
+
+    private final byte[] payload;
+
+    private final Map<String, String> props;
+
+    public KafkaMessage(byte[] payload, Map<String, String> props) {
+        this.payload = requireNonNull(payload).clone();
+        this.props = unmodifiableMap(requireNonNull(props));
+    }
+
+    @Override
+    public byte[] getPayload() {
+        return payload;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return props;
+    }
+}
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessaging.java
b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessaging.java
new file mode 100644
index 0000000..edb0294
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessaging.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.aries.events.api.Message;
+import org.apache.aries.events.api.Messaging;
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Seek;
+import org.apache.aries.events.api.Subscription;
+import org.apache.aries.events.api.Type;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.metatype.annotations.Designate;
+
+import static java.lang.Integer.parseInt;
+import static java.lang.Long.parseLong;
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.singleton;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.stream.StreamSupport.stream;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+@Type("kafka")
+@Component(service = Messaging.class, configurationPolicy = ConfigurationPolicy.REQUIRE)
+@Designate(ocd = KafkaEndpoint.class)
+public class KafkaMessaging implements Messaging {
+
+    /**
+     * The partition to send and receive records.
+     */
+    private static final int PARTITION = 0;
+
+    /**
+     * Shared Kafka producer instance ({@code KafkaProducer}s are thread-safe).
+     */
+    private KafkaProducer<String, byte[]> producer;
+
+    private Map<String, Object> producerConfig;
+
+    private KafkaEndpoint endPoint;
+
+    @Activate
+    public void activate(KafkaEndpoint endPoint) {
+        this.endPoint = endPoint;
+        producerConfig = new HashMap<>();
+        producerConfig.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, endPoint.kafkaBootstrapServers());
+        // We favour durability over throughput
+        // and thus requires full acknowledgment
+        // from replica leader and followers.
+        producerConfig.put(ACKS_CONFIG, "all");
+        producerConfig = unmodifiableMap(producerConfig);
+    }
+
+    @Deactivate
+    public void deactivate() {
+        closeQuietly(producer);
+    }
+
+    @Override
+    public Position send(String topic, Message message) {
+        ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(topic,
PARTITION, null, message.getPayload(), toHeaders(message.getProperties()));
+        try {
+            RecordMetadata metadata = kafkaProducer().send(record).get();
+            return new KafkaPosition(metadata.partition(), metadata.offset());
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(format("Failed to send mesage on topic %s", topic),
e);
+        }
+    }
+
+    @Override
+    public Subscription subscribe(String topic, Position position, Seek seek, Consumer<Received>
callback) {
+        KafkaConsumer<String, byte[]> consumer = buildKafkaConsumer(seek);
+        assignAndSeek(consumer, topic, position);
+        Subscription subscription = new KafkaSubscription(consumer, callback);
+        // TODO pool the threads
+        Thread thread = new Thread();
+        thread.setDaemon(true);
+        thread.start();
+        return subscription;
+    }
+
+    @Override
+    public Message newMessage(byte[] payload, Map<String, String> props) {
+        return new KafkaMessage(payload, props);
+    }
+
+    @Override
+    public Position positionFromString(String position) {
+        String[] chunks = position.split(":");
+        if (chunks.length != 2) {
+            throw new IllegalArgumentException(format("Illegal position format %s", position));
+        }
+        return new KafkaPosition(parseInt(chunks[0]), parseLong(chunks[1]));
+    }
+
+    /*
+    @Override
+    public String positionToString(Position position) {
+        KafkaPosition kafkaPosition = asKafkaPosition(position);
+        return format("%s:%s", kafkaPosition.getPartition(), kafkaPosition.getOffset());
+    }
+    */
+
+    static Iterable<Header> toHeaders(Map<String, String> properties) {
+        return properties.entrySet().stream()
+                .map(KafkaMessaging::toHeader)
+                .collect(Collectors.toList());
+    }
+
+    static Map<String, String> toProperties(Headers headers) {
+        return stream(headers.spliterator(), true)
+                .collect(Collectors.toMap(Header::key, header -> new String(header.value(),
UTF_8)));
+    }
+
+    static RecordHeader toHeader(Map.Entry<String, String> property) {
+        return new RecordHeader(property.getKey(), property.getValue().getBytes(UTF_8));
+    }
+
+    static Message toMessage(ConsumerRecord<String, byte[]> record) {
+        return new KafkaMessage(record.value(), toProperties(record.headers()));
+    }
+
+
+    private synchronized KafkaProducer<String, byte[]> kafkaProducer() {
+        if (producer == null) {
+            producer = new KafkaProducer<>(producerConfig);
+        }
+        return producer;
+    }
+
+    private KafkaConsumer<String, byte[]> buildKafkaConsumer(Seek seek) {
+
+        String groupId = UUID.randomUUID().toString();
+
+        Map<String, Object> consumerConfig = new HashMap<>();
+        consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, endPoint.kafkaBootstrapServers());
+        consumerConfig.put(GROUP_ID_CONFIG, groupId);
+        consumerConfig.put(ENABLE_AUTO_COMMIT_CONFIG, false);
+        consumerConfig.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        consumerConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        consumerConfig.put(AUTO_OFFSET_RESET_CONFIG, seek.name());
+
+        return new KafkaConsumer<>(unmodifiableMap(consumerConfig));
+    }
+
+    private void assignAndSeek(KafkaConsumer consumer, String topicName, Position position){
+        KafkaPosition kafkaPosition = asKafkaPosition(position);
+        TopicPartition topicPartition = new TopicPartition(topicName, PARTITION);
+        consumer.assign(singleton(topicPartition));
+        consumer.seek(topicPartition, kafkaPosition.getOffset());
+    }
+
+    private void closeQuietly(Closeable closeable) {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (IOException ignore) {
+                // ignore
+            }
+        }
+    }
+
+    private KafkaPosition asKafkaPosition(Position position) {
+        if (! KafkaPosition.class.isInstance(position)) {
+            throw new IllegalArgumentException(format("Position %s must be and instance of
%s", position, KafkaPosition.class.getCanonicalName()));
+        }
+        return (KafkaPosition) position;
+    }
+
+
+}
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaPosition.java
b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaPosition.java
new file mode 100644
index 0000000..4262279
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaPosition.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import org.apache.aries.events.api.Position;
+
+public final class KafkaPosition implements Position {
+
+    private final int partition;
+
+    private final long offset;
+
+    public KafkaPosition(int partition, long offset) {
+        this.partition = partition;
+        this.offset = offset;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+}
diff --git a/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaSubscription.java
b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaSubscription.java
new file mode 100644
index 0000000..d824172
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaSubscription.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import java.util.function.Consumer;
+
+import org.apache.aries.events.api.Position;
+import org.apache.aries.events.api.Received;
+import org.apache.aries.events.api.Subscription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.Long.MAX_VALUE;
+import static java.time.Duration.ofSeconds;
+import static java.util.Objects.requireNonNull;
+import static org.apache.aries.events.kafka.KafkaMessaging.toMessage;
+
+public class KafkaSubscription implements Subscription, Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSubscription.class);
+
+    private volatile boolean running = true;
+
+    private final KafkaConsumer<String, byte[]> consumer;
+
+    private final Consumer<Received> callback;
+
+    public KafkaSubscription(KafkaConsumer<String, byte[]> consumer, Consumer<Received>
callback) {
+        this.consumer = requireNonNull(consumer);
+        this.callback = requireNonNull(callback);
+    }
+
+    @Override
+    public void run() {
+        try {
+            for (;running;) {
+                ConsumerRecords<String, byte[]> records = consumer.poll(ofSeconds(MAX_VALUE));
+                records.forEach(record -> callback.accept(toReceived(record)));
+            }
+        } catch (WakeupException e) {
+            if (running) {
+                LOG.error("WakeupException while running {}", e.getMessage(), e);
+                throw e;
+            } else {
+                LOG.debug("WakeupException while stopping {}", e.getMessage(), e);
+            }
+        } catch(Throwable t) {
+            LOG.error(String.format("Catch Throwable %s closing subscription", t.getMessage()),
t);
+            throw t;
+        } finally {
+            // Close the network connections and sockets
+            consumer.close();
+        }
+    }
+
+    @Override
+    public void close() {
+        running = false;
+        consumer.wakeup();
+    }
+
+    private Received toReceived(ConsumerRecord<String, byte[]> record) {
+        Position position = new KafkaPosition(record.partition(), record.offset());
+        return new Received(position, toMessage(record));
+    }
+
+}
diff --git a/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaMessagingTest.java
b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaMessagingTest.java
new file mode 100644
index 0000000..8791c08
--- /dev/null
+++ b/org.apache.aries.events.kafka/src/test/java/org/apache/aries/events/kafka/KafkaMessagingTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.events.kafka;
+
+import org.apache.aries.events.api.Messaging;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KafkaMessagingTest {
+
+    @Test
+    public void testPositionFromString() throws Exception {
+        Messaging messaging = new KafkaMessaging();
+        KafkaPosition kafkaPosition = (KafkaPosition) messaging.positionFromString("0:100");
+        assertEquals(0, kafkaPosition.getPartition());
+        assertEquals(100, kafkaPosition.getOffset());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPositionFromStringIllegalArgument() throws Exception {
+        Messaging messaging = new KafkaMessaging();
+        messaging.positionFromString("0:100:23");
+    }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 44c5c17..a0f820c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,6 +47,7 @@
     <modules>
         <module>org.apache.aries.events.api</module>
         <module>org.apache.aries.events.memory</module>
+        <module>org.apache.aries.events.kafka</module>
     </modules>
 
     <properties>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message