incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [1/2] git commit: S4-114 extract interfaces from remote emitters, senders and streams - in order to allow for easy mocking and different implementations
Date Thu, 24 Jan 2013 17:53:14 GMT
S4-114 extract interfaces from remote emitters, senders and streams
- in order to allow for easy mocking and different implementations


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/eb851534
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/eb851534
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/eb851534

Branch: refs/heads/S4-114
Commit: eb851534e56ae28677abb2c4f24333420be9ae92
Parents: 051082e
Author: Matthieu Morel <mmorel@apache.org>
Authored: Thu Jan 24 16:16:59 2013 +0100
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Thu Jan 24 16:17:54 2013 +0100

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/DefaultCommModule.java |   10 +-
 .../apache/s4/comm/tcp/DefaultRemoteEmitters.java  |   61 ++++
 .../org/apache/s4/comm/tcp/RemoteEmitters.java     |   51 +---
 .../org/apache/s4/comm/topology/RemoteStreams.java |  232 +-------------
 .../apache/s4/comm/topology/ZkRemoteStreams.java   |  247 +++++++++++++++
 .../java/org/apache/s4/core/DefaultCoreModule.java |    8 +-
 .../org/apache/s4/core/DefaultRemoteSenders.java   |  127 ++++++++
 .../java/org/apache/s4/core/RemoteSenders.java     |  118 +-------
 .../org/apache/s4/fixtures/MockCommModule.java     |    4 -
 .../org/apache/s4/fixtures/MockCoreModule.java     |    6 +
 10 files changed, 463 insertions(+), 401 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/eb851534/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index 9c37b4d..fb0fce3 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -31,6 +31,7 @@ import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
+import org.apache.s4.comm.tcp.DefaultRemoteEmitters;
 import org.apache.s4.comm.tcp.RemoteEmitters;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterFromZK;
@@ -41,6 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
+import com.google.inject.Scopes;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.name.Names;
 
@@ -88,11 +90,11 @@ public class DefaultCommModule extends AbstractModule {
         install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
                 SerializerDeserializerFactory.class));
 
-        bind(Cluster.class).to(ClusterFromZK.class);
+        bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
 
-        bind(Clusters.class).to(ClustersFromZK.class);
+        bind(Clusters.class).to(ClustersFromZK.class).in(Scopes.SINGLETON);
 
-        bind(RemoteEmitters.class);
+        bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
 
         bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
 
@@ -106,7 +108,7 @@ public class DefaultCommModule extends AbstractModule {
                     .getString("s4.comm.emitter.remote.class"));
             install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
                     RemoteEmitterFactory.class));
-            bind(RemoteEmitters.class);
+            bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
 
         } catch (ClassNotFoundException e) {
             logger.error("Cannot find class implementation ", e);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/eb851534/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
new file mode 100644
index 0000000..edd4fab
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.comm.tcp;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.comm.RemoteEmitterFactory;
+import org.apache.s4.comm.topology.Cluster;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * Manages the {@link RemoteEmitter} instances for sending messages to remote subclusters.
+ * 
+ */
+@Singleton
+public class DefaultRemoteEmitters implements RemoteEmitters {
+
+    ConcurrentMap<Cluster, RemoteEmitter> emitters = new ConcurrentHashMap<Cluster,
RemoteEmitter>();
+
+    @Inject
+    RemoteEmitterFactory emitterFactory;
+
+    /* (non-Javadoc)
+     * @see org.apache.s4.comm.tcp.RemoteEmitters#getEmitter(org.apache.s4.comm.topology.Cluster)
+     */
+    @Override
+    public RemoteEmitter getEmitter(Cluster topology) {
+        RemoteEmitter emitter = emitters.get(topology);
+        if (emitter == null) {
+            RemoteEmitter newEmitter = emitterFactory.createRemoteEmitter(topology);
+            emitter = emitters.putIfAbsent(topology, newEmitter);
+            if (emitter == null) {
+                emitter = newEmitter;
+            } else {
+                // use the existing emitter instead
+                newEmitter.close();
+            }
+        }
+        return emitter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/eb851534/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
index 4b3040d..f298805 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
@@ -1,57 +1,10 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.s4.comm.tcp;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.comm.RemoteEmitterFactory;
 import org.apache.s4.comm.topology.Cluster;
 
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-/**
- * Manages the {@link RemoteEmitter} instances for sending messages to remote subclusters.
- * 
- */
-@Singleton
-public class RemoteEmitters {
-
-    ConcurrentMap<Cluster, RemoteEmitter> emitters = new ConcurrentHashMap<Cluster,
RemoteEmitter>();
+public interface RemoteEmitters {
 
-    @Inject
-    RemoteEmitterFactory emitterFactory;
+    public abstract RemoteEmitter getEmitter(Cluster topology);
 
-    public RemoteEmitter getEmitter(Cluster topology) {
-        RemoteEmitter emitter = emitters.get(topology);
-        if (emitter == null) {
-            RemoteEmitter newEmitter = emitterFactory.createRemoteEmitter(topology);
-            emitter = emitters.putIfAbsent(topology, newEmitter);
-            if (emitter == null) {
-                emitter = newEmitter;
-            } else {
-                // use the existing emitter instead
-                newEmitter.close();
-            }
-        }
-        return emitter;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/eb851534/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index b1fe6b7..f9cdcd5 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -1,215 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.s4.comm.topology;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.google.inject.name.Named;
-
-/**
- * <p>
- * Monitors streams available in the S4 cluster.
- * </p>
- * <p>
- * Maintains a data structure reflecting the currently published streams with their consumers
and publishers.
- * </p>
- * <p>
- * Provides methods to publish producers and consumers of streams
- * </p>
- * 
- */
-@Singleton
-public class RemoteStreams implements IZkStateListener, IZkChildListener {
-
-    private static final Logger logger = LoggerFactory.getLogger(ClustersFromZK.class);
-    private KeeperState state;
-    private final ZkClient zkClient;
-    private final Lock lock;
-    private final static String STREAMS_PATH = "/s4/streams";
-    // by stream name, then "producer"|"consumer" then
-    private Map<String, Map<String, Set<StreamConsumer>>> streams = new
HashMap<String, Map<String, Set<StreamConsumer>>>();
-
-    public enum StreamType {
-        PRODUCER, CONSUMER;
-
-        public String getPath(String streamName) {
-            switch (this) {
-                case PRODUCER:
-                    return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
-                case CONSUMER:
-                    return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
-                default:
-                    throw new RuntimeException("Invalid path in enum StreamType");
-            }
-        }
-
-        public String getCollectionName() {
-            switch (this) {
-                case PRODUCER:
-                    return "producers";
-                case CONSUMER:
-                    return "consumers";
-                default:
-                    throw new RuntimeException("Invalid path in enum StreamType");
-            }
-        }
-    }
-
-    @Inject
-    public RemoteStreams(@Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception
{
-
-        lock = new ReentrantLock();
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
-        ZkSerializer serializer = new ZNRecordSerializer();
-        zkClient.setZkSerializer(serializer);
-        zkClient.subscribeStateChanges(this);
-        zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
-        // bug in zkClient, it does not invoke handleNewSession the first time
-        // it connects
-        this.handleStateChanged(KeeperState.SyncConnected);
-
-        this.handleNewSession();
-
-    }
-
-    public Set<StreamConsumer> getConsumers(String streamName) {
-        if (!streams.containsKey(streamName)) {
-            return Collections.emptySet();
-        } else {
-            return streams.get(streamName).get("consumers");
-        }
-    }
 
-    /**
-     * One method to do any processing if there is a change in ZK, all callbacks will be
processed sequentially
-     */
-    private void doProcess() {
-        lock.lock();
-        try {
-            refreshStreams();
-        } catch (Exception e) {
-            logger.warn("Exception in tryToAcquireTask", e);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public void handleStateChanged(KeeperState state) throws Exception {
-        this.state = state;
-        if (state.equals(KeeperState.Expired)) {
-            logger.error("Zookeeper session expired, possibly due to a network partition.
This node is considered as dead by Zookeeper. Proceeding to stop this node.");
-            System.exit(1);
-        }
-
-    }
-
-    @Override
-    public void handleNewSession() throws Exception {
-        logger.info("New session:" + zkClient.getSessionId());
-        zkClient.subscribeChildChanges(STREAMS_PATH, this);
-
-        doProcess();
-    }
-
-    @Override
-    public void handleChildChange(String paramString, List<String> paramList) throws
Exception {
-        doProcess();
-    }
+public interface RemoteStreams {
 
-    private void refreshStreams() {
-        List<String> children = zkClient.getChildren(STREAMS_PATH);
-        for (String streamName : children) {
-            if (!streams.containsKey(streamName)) {
-                logger.info("Detected new stream [{}]", streamName);
-                streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
-                zkClient.subscribeChildChanges(StreamType.PRODUCER.getPath(streamName), this);
-                zkClient.subscribeChildChanges(StreamType.CONSUMER.getPath(streamName), this);
-                streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
-            }
+    public abstract Set<StreamConsumer> getConsumers(String streamName);
 
-            update(streamName, StreamType.PRODUCER);
-            update(streamName, StreamType.CONSUMER);
-        }
-    }
-
-    private void update(String streamName, StreamType type) {
-        List<String> elements = zkClient.getChildren(type.getPath(streamName));
-        Set<StreamConsumer> consumers = new HashSet<StreamConsumer>();
-        for (String element : elements) {
-            ZNRecord producerData = zkClient.readData(type.getPath(streamName) + "/" + element,
true);
-            if (producerData != null) {
-                StreamConsumer consumer = new StreamConsumer(Integer.valueOf(producerData.getSimpleField("appId")),
-                        producerData.getSimpleField("clusterName"));
-                consumers.add(consumer);
-            }
-        }
-        streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
-    }
-
-    public void addOutputStream(String appId, String clusterName, String streamName) {
-        lock.lock();
-        try {
-            logger.debug("Adding output stream [{}] for app [{}] in cluster [{}]", new String[]
{ streamName, appId,
-                    clusterName });
-            createStreamPaths(streamName);
-            ZNRecord producer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
-            producer.putSimpleField("appId", appId);
-            producer.putSimpleField("clusterName", clusterName);
-            try {
-                zkClient.createEphemeralSequential(StreamType.PRODUCER.getPath(streamName)
+ "/producer-", producer);
-            } catch (Throwable e) {
-                logger.error("Exception trying to create producer stream [{}] for app [{}]
and cluster [{}] : [{}] :",
-                        new String[] { streamName, appId, clusterName, e.getMessage() });
-            }
-            refreshStreams();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    /**
-     * Creates (it they don't exist yet) persistent znodes for producers and consumers of
a stream.
-     */
-    private void createStreamPaths(String streamName) {
-        zkClient.createPersistent(StreamType.PRODUCER.getPath(streamName), true);
-        zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
-    }
+    public abstract void addOutputStream(String appId, String clusterName, String streamName);
 
     /**
      * Publishes interest in a stream from an application.
@@ -218,25 +15,6 @@ public class RemoteStreams implements IZkStateListener, IZkChildListener
{
      * @param clusterName
      * @param streamName
      */
-    public void addInputStream(int appId, String clusterName, String streamName) {
-        lock.lock();
-        try {
-            logger.debug("Adding input stream [{}] for app [{}] in cluster [{}]",
-                    new String[] { streamName, String.valueOf(appId), clusterName });
-            createStreamPaths(streamName);
-            ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
-            consumer.putSimpleField("appId", String.valueOf(appId));
-            consumer.putSimpleField("clusterName", clusterName);
-            try {
-                // NOTE: We create 1 sequential znode per consumer node instance
-                zkClient.createEphemeralSequential(StreamType.CONSUMER.getPath(streamName)
+ "/consumer-", consumer);
-            } catch (Throwable e) {
-                logger.error("Exception trying to create consumer stream [{}] for app [{}]
and cluster [{}] : [{}] :",
-                        new String[] { streamName, String.valueOf(appId), clusterName, e.getMessage()
});
-            }
-            refreshStreams();
-        } finally {
-            lock.unlock();
-        }
-    }
+    public abstract void addInputStream(int appId, String clusterName, String streamName);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/eb851534/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
new file mode 100644
index 0000000..f4a9623
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.comm.topology;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+
+/**
+ * <p>
+ * Monitors streams available in the S4 cluster.
+ * </p>
+ * <p>
+ * Maintains a data structure reflecting the currently published streams with their consumers
and publishers.
+ * </p>
+ * <p>
+ * Provides methods to publish producers and consumers of streams
+ * </p>
+ * 
+ */
+@Singleton
+public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, RemoteStreams
{
+
+    private static final Logger logger = LoggerFactory.getLogger(ClustersFromZK.class);
+    private KeeperState state;
+    private final ZkClient zkClient;
+    private final Lock lock;
+    private final static String STREAMS_PATH = "/s4/streams";
+    // by stream name, then "producer"|"consumer" then
+    private Map<String, Map<String, Set<StreamConsumer>>> streams = new
HashMap<String, Map<String, Set<StreamConsumer>>>();
+
+    public enum StreamType {
+        PRODUCER, CONSUMER;
+
+        public String getPath(String streamName) {
+            switch (this) {
+                case PRODUCER:
+                    return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
+                case CONSUMER:
+                    return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
+                default:
+                    throw new RuntimeException("Invalid path in enum StreamType");
+            }
+        }
+
+        public String getCollectionName() {
+            switch (this) {
+                case PRODUCER:
+                    return "producers";
+                case CONSUMER:
+                    return "consumers";
+                default:
+                    throw new RuntimeException("Invalid path in enum StreamType");
+            }
+        }
+    }
+
+    @Inject
+    public ZkRemoteStreams(@Named("s4.cluster.zk_address") String zookeeperAddress,
+            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception
{
+
+        lock = new ReentrantLock();
+        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+        ZkSerializer serializer = new ZNRecordSerializer();
+        zkClient.setZkSerializer(serializer);
+        zkClient.subscribeStateChanges(this);
+        zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
+        // bug in zkClient, it does not invoke handleNewSession the first time
+        // it connects
+        this.handleStateChanged(KeeperState.SyncConnected);
+
+        this.handleNewSession();
+
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.s4.comm.topology.RemoteStreams#getConsumers(java.lang.String)
+     */
+    @Override
+    public Set<StreamConsumer> getConsumers(String streamName) {
+        if (!streams.containsKey(streamName)) {
+            return Collections.emptySet();
+        } else {
+            return streams.get(streamName).get("consumers");
+        }
+    }
+
+    /**
+     * One method to do any processing if there is a change in ZK, all callbacks will be
processed sequentially
+     */
+    private void doProcess() {
+        lock.lock();
+        try {
+            refreshStreams();
+        } catch (Exception e) {
+            logger.warn("Exception in tryToAcquireTask", e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void handleStateChanged(KeeperState state) throws Exception {
+        this.state = state;
+        if (state.equals(KeeperState.Expired)) {
+            logger.error("Zookeeper session expired, possibly due to a network partition.
This node is considered as dead by Zookeeper. Proceeding to stop this node.");
+            System.exit(1);
+        }
+
+    }
+
+    @Override
+    public void handleNewSession() throws Exception {
+        logger.info("New session:" + zkClient.getSessionId());
+        zkClient.subscribeChildChanges(STREAMS_PATH, this);
+
+        doProcess();
+    }
+
+    @Override
+    public void handleChildChange(String paramString, List<String> paramList) throws
Exception {
+        doProcess();
+    }
+
+    private void refreshStreams() {
+        List<String> children = zkClient.getChildren(STREAMS_PATH);
+        for (String streamName : children) {
+            if (!streams.containsKey(streamName)) {
+                logger.info("Detected new stream [{}]", streamName);
+                streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
+                zkClient.subscribeChildChanges(StreamType.PRODUCER.getPath(streamName), this);
+                zkClient.subscribeChildChanges(StreamType.CONSUMER.getPath(streamName), this);
+                streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
+            }
+
+            update(streamName, StreamType.PRODUCER);
+            update(streamName, StreamType.CONSUMER);
+        }
+    }
+
+    private void update(String streamName, StreamType type) {
+        List<String> elements = zkClient.getChildren(type.getPath(streamName));
+        Set<StreamConsumer> consumers = new HashSet<StreamConsumer>();
+        for (String element : elements) {
+            ZNRecord producerData = zkClient.readData(type.getPath(streamName) + "/" + element,
true);
+            if (producerData != null) {
+                StreamConsumer consumer = new StreamConsumer(Integer.valueOf(producerData.getSimpleField("appId")),
+                        producerData.getSimpleField("clusterName"));
+                consumers.add(consumer);
+            }
+        }
+        streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.s4.comm.topology.RemoteStreams#addOutputStream(java.lang.String, java.lang.String,
java.lang.String)
+     */
+    @Override
+    public void addOutputStream(String appId, String clusterName, String streamName) {
+        lock.lock();
+        try {
+            logger.debug("Adding output stream [{}] for app [{}] in cluster [{}]", new String[]
{ streamName, appId,
+                    clusterName });
+            createStreamPaths(streamName);
+            ZNRecord producer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
+            producer.putSimpleField("appId", appId);
+            producer.putSimpleField("clusterName", clusterName);
+            try {
+                zkClient.createEphemeralSequential(StreamType.PRODUCER.getPath(streamName)
+ "/producer-", producer);
+            } catch (Throwable e) {
+                logger.error("Exception trying to create producer stream [{}] for app [{}]
and cluster [{}] : [{}] :",
+                        new String[] { streamName, appId, clusterName, e.getMessage() });
+            }
+            refreshStreams();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Creates (it they don't exist yet) persistent znodes for producers and consumers of
a stream.
+     */
+    private void createStreamPaths(String streamName) {
+        zkClient.createPersistent(StreamType.PRODUCER.getPath(streamName), true);
+        zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.s4.comm.topology.RemoteStreams#addInputStream(int, java.lang.String,
java.lang.String)
+     */
+    @Override
+    public void addInputStream(int appId, String clusterName, String streamName) {
+        lock.lock();
+        try {
+            logger.debug("Adding input stream [{}] for app [{}] in cluster [{}]",
+                    new String[] { streamName, String.valueOf(appId), clusterName });
+            createStreamPaths(streamName);
+            ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
+            consumer.putSimpleField("appId", String.valueOf(appId));
+            consumer.putSimpleField("clusterName", clusterName);
+            try {
+                // NOTE: We create 1 sequential znode per consumer node instance
+                zkClient.createEphemeralSequential(StreamType.CONSUMER.getPath(streamName)
+ "/consumer-", consumer);
+            } catch (Throwable e) {
+                logger.error("Exception trying to create consumer stream [{}] for app [{}]
and cluster [{}] : [{}] :",
+                        new String[] { streamName, String.valueOf(appId), clusterName, e.getMessage()
});
+            }
+            refreshStreams();
+        } finally {
+            lock.unlock();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/eb851534/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 7789b9f..c4cafee 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -28,6 +28,8 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.util.S4RLoaderFactory;
 import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.ZkRemoteStreams;
 import org.apache.s4.core.ft.CheckpointingFramework;
 import org.apache.s4.core.ft.NoOpCheckpointingFramework;
 import org.apache.s4.core.staging.BlockingRemoteSendersExecutorServiceFactory;
@@ -45,6 +47,7 @@ import com.google.common.io.Files;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
 import com.google.inject.Provides;
+import com.google.inject.Scopes;
 import com.google.inject.name.Named;
 import com.google.inject.name.Names;
 
@@ -81,7 +84,7 @@ public class DefaultCoreModule extends AbstractModule {
         /* The hashing function to map keys top partitions. */
         bind(Hasher.class).to(DefaultHasher.class);
 
-        bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
+        bind(DeploymentManager.class).to(DistributedDeploymentManager.class).in(Scopes.SINGLETON);
 
         bind(S4RLoaderFactory.class);
 
@@ -95,6 +98,9 @@ public class DefaultCoreModule extends AbstractModule {
 
         bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
 
+        bind(RemoteStreams.class).to(ZkRemoteStreams.class).in(Scopes.SINGLETON);
+        bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
+
     }
 
     @Provides

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/eb851534/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
new file mode 100644
index 0000000..b9c6fd7
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.core;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.StreamConsumer;
+import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+
+/**
+ * Sends events to remote clusters. Target clusters are selected dynamically based on the
stream name information from
+ * the event.
+ * 
+ */
+public class DefaultRemoteSenders implements RemoteSenders {
+
+    Logger logger = LoggerFactory.getLogger(DefaultRemoteSenders.class);
+
+    final RemoteEmitters remoteEmitters;
+
+    final RemoteStreams remoteStreams;
+
+    final Clusters remoteClusters;
+
+    final SerializerDeserializer serDeser;
+
+    final Hasher hasher;
+
+    ConcurrentMap<String, RemoteSender> sendersByTopology = new ConcurrentHashMap<String,
RemoteSender>();
+
+    private final ExecutorService executorService;
+
+    @Inject
+    public DefaultRemoteSenders(RemoteEmitters remoteEmitters, RemoteStreams remoteStreams,
Clusters remoteClusters,
+            SerializerDeserializerFactory serDeserFactory, Hasher hasher,
+            RemoteSendersExecutorServiceFactory senderExecutorFactory) {
+        this.remoteEmitters = remoteEmitters;
+        this.remoteStreams = remoteStreams;
+        this.remoteClusters = remoteClusters;
+        this.hasher = hasher;
+        executorService = senderExecutorFactory.create();
+
+        serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.s4.core.RemoteSenders#send(java.lang.String, org.apache.s4.base.Event)
+     */
+    @Override
+    public void send(String hashKey, Event event) {
+
+        Set<StreamConsumer> consumers = remoteStreams.getConsumers(event.getStreamName());
+        event.setAppId(-1);
+        for (StreamConsumer consumer : consumers) {
+            // NOTE: even though there might be several ephemeral znodes for the same app
and topology, they are
+            // represented by a single stream consumer
+            RemoteSender sender = sendersByTopology.get(consumer.getClusterName());
+            if (sender == null) {
+                RemoteSender newSender = new RemoteSender(remoteEmitters.getEmitter(remoteClusters.getCluster(consumer
+                        .getClusterName())), hasher, consumer.getClusterName());
+                // TODO cleanup when remote topologies die
+                sender = sendersByTopology.putIfAbsent(consumer.getClusterName(), newSender);
+                if (sender == null) {
+                    sender = newSender;
+                }
+            }
+            // NOTE: this implies multiple serializations, there might be an optimization
+            executorService.execute(new SendToRemoteClusterTask(hashKey, event, sender));
+        }
+    }
+
+    class SendToRemoteClusterTask implements Runnable {
+
+        String hashKey;
+        Event event;
+        RemoteSender sender;
+
+        public SendToRemoteClusterTask(String hashKey, Event event, RemoteSender sender)
{
+            super();
+            this.hashKey = hashKey;
+            this.event = event;
+            this.sender = sender;
+        }
+
+        @Override
+        public void run() {
+            try {
+                sender.send(hashKey, serDeser.serialize(event));
+            } catch (InterruptedException e) {
+                logger.error("Interrupted blocking send operation for event {}. Event is
lost.", event);
+                Thread.currentThread().interrupt();
+            }
+
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/eb851534/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index 7c7238f..62d62f3 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -1,123 +1,9 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.s4.core;
 
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-
 import org.apache.s4.base.Event;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
-import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.topology.Clusters;
-import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.comm.topology.StreamConsumer;
-import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-
-/**
- * Sends events to remote clusters. Target clusters are selected dynamically based on the
stream name information from
- * the event.
- * 
- */
-public class RemoteSenders {
-
-    Logger logger = LoggerFactory.getLogger(RemoteSenders.class);
-
-    final RemoteEmitters remoteEmitters;
-
-    final RemoteStreams remoteStreams;
-
-    final Clusters remoteClusters;
-
-    final SerializerDeserializer serDeser;
-
-    final Hasher hasher;
-
-    ConcurrentMap<String, RemoteSender> sendersByTopology = new ConcurrentHashMap<String,
RemoteSender>();
-
-    private final ExecutorService executorService;
-
-    @Inject
-    public RemoteSenders(RemoteEmitters remoteEmitters, RemoteStreams remoteStreams, Clusters
remoteClusters,
-            SerializerDeserializerFactory serDeserFactory, Hasher hasher,
-            RemoteSendersExecutorServiceFactory senderExecutorFactory) {
-        this.remoteEmitters = remoteEmitters;
-        this.remoteStreams = remoteStreams;
-        this.remoteClusters = remoteClusters;
-        this.hasher = hasher;
-        executorService = senderExecutorFactory.create();
-
-        serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
-    }
-
-    public void send(String hashKey, Event event) {
-
-        Set<StreamConsumer> consumers = remoteStreams.getConsumers(event.getStreamName());
-        event.setAppId(-1);
-        for (StreamConsumer consumer : consumers) {
-            // NOTE: even though there might be several ephemeral znodes for the same app
and topology, they are
-            // represented by a single stream consumer
-            RemoteSender sender = sendersByTopology.get(consumer.getClusterName());
-            if (sender == null) {
-                RemoteSender newSender = new RemoteSender(remoteEmitters.getEmitter(remoteClusters.getCluster(consumer
-                        .getClusterName())), hasher, consumer.getClusterName());
-                // TODO cleanup when remote topologies die
-                sender = sendersByTopology.putIfAbsent(consumer.getClusterName(), newSender);
-                if (sender == null) {
-                    sender = newSender;
-                }
-            }
-            // NOTE: this implies multiple serializations, there might be an optimization
-            executorService.execute(new SendToRemoteClusterTask(hashKey, event, sender));
-        }
-    }
-
-    class SendToRemoteClusterTask implements Runnable {
-
-        String hashKey;
-        Event event;
-        RemoteSender sender;
-
-        public SendToRemoteClusterTask(String hashKey, Event event, RemoteSender sender)
{
-            super();
-            this.hashKey = hashKey;
-            this.event = event;
-            this.sender = sender;
-        }
 
-        @Override
-        public void run() {
-            try {
-                sender.send(hashKey, serDeser.serialize(event));
-            } catch (InterruptedException e) {
-                logger.error("Interrupted blocking send operation for event {}. Event is
lost.", event);
-                Thread.currentThread().interrupt();
-            }
+public interface RemoteSenders {
 
-        }
+    public abstract void send(String hashKey, Event event);
 
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/eb851534/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
index 1dadc5b..c4f1772 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
@@ -32,8 +32,6 @@ import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.comm.topology.Clusters;
 import org.apache.s4.comm.topology.PhysicalCluster;
-import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.core.RemoteSenders;
 import org.mockito.Mockito;
 
 import com.google.common.collect.ImmutableMap;
@@ -58,8 +56,6 @@ public class MockCommModule extends AbstractModule {
         /* Use Kryo to serialize events. */
         install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
                 SerializerDeserializerFactory.class));
-        bind(RemoteStreams.class).toInstance(Mockito.mock(RemoteStreams.class));
-        bind(RemoteSenders.class).toInstance(Mockito.mock(RemoteSenders.class));
         bind(RemoteEmitters.class).toInstance(Mockito.mock(RemoteEmitters.class));
         bind(Clusters.class).toInstance(Mockito.mock(Clusters.class));
         Cluster mockedCluster = Mockito.mock(Cluster.class);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/eb851534/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index ae19011..7900ecc 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -20,12 +20,15 @@ package org.apache.s4.fixtures;
 
 import org.apache.s4.comm.DeserializerExecutorFactory;
 import org.apache.s4.comm.staging.MemoryAwareDeserializerExecutorFactory;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.core.RemoteSenders;
 import org.apache.s4.core.staging.BlockingSenderExecutorServiceFactory;
 import org.apache.s4.core.staging.BlockingStreamExecutorServiceFactory;
 import org.apache.s4.core.staging.SenderExecutorServiceFactory;
 import org.apache.s4.core.staging.StreamExecutorServiceFactory;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.NoOpDeploymentManager;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +60,9 @@ public class MockCoreModule extends AbstractModule {
         bind(SenderExecutorServiceFactory.class).to(BlockingSenderExecutorServiceFactory.class);
         bind(DeserializerExecutorFactory.class).to(MemoryAwareDeserializerExecutorFactory.class);
 
+        bind(RemoteStreams.class).toInstance(Mockito.mock(RemoteStreams.class));
+        bind(RemoteSenders.class).toInstance(Mockito.mock(RemoteSenders.class));
+
         bind(Integer.class).annotatedWith(Names.named("s4.sender.parallelism")).toInstance(8);
         bind(Integer.class).annotatedWith(Names.named("s4.sender.workQueueSize")).toInstance(10000);
 


Mime
View raw message