incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [14/50] [abbrv] git commit: First commit.
Date Tue, 03 Jan 2012 14:03:28 GMT
First commit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/b8cea4ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/b8cea4ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/b8cea4ab

Branch: refs/heads/piper
Commit: b8cea4abd65c4197ba1afb942f64baec5bad85de
Parents: 701a83c
Author: Leo Neumeyer <leo@s4.io>
Authored: Sat Dec 10 15:31:12 2011 -0800
Committer: Leo Neumeyer <leo@s4.io>
Committed: Sat Dec 10 15:31:12 2011 -0800

----------------------------------------------------------------------
 .../java/org/apache/s4/appbuilder/AppMaker.java    |   65 +++++++
 .../main/java/org/apache/s4/appbuilder/EventA.java |    7 +
 .../main/java/org/apache/s4/appbuilder/EventB.java |    7 +
 .../main/java/org/apache/s4/appbuilder/Main.java   |   28 +++
 .../java/org/apache/s4/appbuilder/PEMaker.java     |   34 ++++
 .../main/java/org/apache/s4/appbuilder/PEX.java    |   19 ++
 .../main/java/org/apache/s4/appbuilder/PEY.java    |   19 ++
 .../main/java/org/apache/s4/appbuilder/PEZ.java    |   19 ++
 .../java/org/apache/s4/appbuilder/StreamMaker.java |   69 +++++++
 .../src/main/java/org/apache/s4/core/App.java      |  144 ++++++++-------
 .../org/apache/s4/core/apploading/SimpleApp.java   |    8 +-
 11 files changed, 344 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b8cea4ab/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/AppMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/AppMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/AppMaker.java
new file mode 100644
index 0000000..bd0917d
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/AppMaker.java
@@ -0,0 +1,65 @@
+package org.apache.s4.appbuilder;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
+
+public class AppMaker {
+
+    /**
+     * The app graph is stored as follows:
+     * <p>
+     * 
+     * <p>
+     * PE[1]: S[1,1], S[1,2], ...
+     * <p>
+     * PE[2]: S[2,1], S[2,2], ...
+     */
+
+    private Multimap<PEMaker, StreamMaker> graph = LinkedListMultimap.create();
+
+    public AppMaker() {
+
+    }
+
+    /**
+     * Add a processing element.
+     * 
+     * @param streams
+     *            events emitted by this PE will be put into these streams.
+     * 
+     * @return a pe maker.
+     */
+    // public <T extends Event> PEMaker addPE(StreamMaker<T>... streams) {
+    //
+    // PEMaker pem = new PEMaker();
+    // for (int i = 0; i < streams.length; i++)
+    // graph.put(pem, streams[i]);
+    //
+    // return pem;
+    // }
+    public PEMaker addPE(Class<? extends ProcessingElement> type) {
+        return new PEMaker(type);
+    }
+
+    /**
+     * Add a stream.
+     * 
+     * @param eventType
+     *            the type of events emitted by this PE.
+     * 
+     * @return a stream maker.
+     */
+    public StreamMaker addStream(Class<? extends Event> type) {
+
+        return new StreamMaker(type);
+
+    }
+
+    public App make() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b8cea4ab/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventA.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventA.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventA.java
new file mode 100644
index 0000000..c5c7414
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventA.java
@@ -0,0 +1,7 @@
+package org.apache.s4.appbuilder;
+
+import org.apache.s4.base.Event;
+
+public class EventA extends Event {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b8cea4ab/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventB.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventB.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventB.java
new file mode 100644
index 0000000..3feb90f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventB.java
@@ -0,0 +1,7 @@
+package org.apache.s4.appbuilder;
+
+import org.apache.s4.base.Event;
+
+public class EventB extends Event {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b8cea4ab/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/Main.java
new file mode 100644
index 0000000..7f7cbab
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/Main.java
@@ -0,0 +1,28 @@
+package org.apache.s4.appbuilder;
+
+public class Main {
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+
+        AppMaker am = new AppMaker();
+
+        PEMaker pem1, pem2;
+        StreamMaker s1;
+        StreamMaker s2, s3;
+
+        pem1 = am.addPE(PEZ.class);
+
+        s1 = am.addStream(EventA.class).withName("My first stream.").withKeyFinder("{gender}").to(pem1);
+
+        pem2 = am.addPE(PEY.class).to(s1);
+
+        s2 = am.addStream(EventB.class).withName("My second stream.").withKeyFinder("{age}").to(pem2);
+
+        s3 = am.addStream(EventB.class).withName("My third stream.").withKeyFinder("{height}").to(pem2);
+
+        am.addPE(PEX.class).to(s2).to(s3);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b8cea4ab/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEMaker.java
new file mode 100644
index 0000000..f8fc071
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEMaker.java
@@ -0,0 +1,34 @@
+package org.apache.s4.appbuilder;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+
+public class PEMaker {
+
+    private Class<? extends ProcessingElement> type;
+
+    /* Only package classes can instantiate this class. */
+    PEMaker(Class<? extends ProcessingElement> type) {
+        this.type = type;
+    }
+
+    public PEMaker withTrigger(Class<? extends Event> eventType, int numEvents, long
interval, TimeUnit timeUnit) {
+
+        return this;
+    }
+
+    public PEMaker withTimerInterval(long interval, TimeUnit timeUnit) {
+
+        return this;
+    }
+
+    public <T extends Event> PEMaker to(StreamMaker stream) {
+        return this;
+    }
+
+    public ProcessingElement make() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b8cea4ab/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEX.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEX.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEX.java
new file mode 100644
index 0000000..7df2d29
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEX.java
@@ -0,0 +1,19 @@
+package org.apache.s4.appbuilder;
+
+import org.apache.s4.core.ProcessingElement;
+
+public class PEX extends ProcessingElement {
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b8cea4ab/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEY.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEY.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEY.java
new file mode 100644
index 0000000..35e4fbe
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEY.java
@@ -0,0 +1,19 @@
+package org.apache.s4.appbuilder;
+
+import org.apache.s4.core.ProcessingElement;
+
+public class PEY extends ProcessingElement {
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b8cea4ab/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEZ.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEZ.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEZ.java
new file mode 100644
index 0000000..cbd0dd9
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEZ.java
@@ -0,0 +1,19 @@
+package org.apache.s4.appbuilder;
+
+import org.apache.s4.core.ProcessingElement;
+
+public class PEZ extends ProcessingElement {
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b8cea4ab/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/StreamMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/StreamMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/StreamMaker.java
new file mode 100644
index 0000000..c7fac03
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/StreamMaker.java
@@ -0,0 +1,69 @@
+package org.apache.s4.appbuilder;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.KeyFinder;
+
+public class StreamMaker {
+
+    private String name = "";
+    private KeyFinder<?> keyFinder = null;
+    private String keyFinderString;
+    private PEMaker pem;
+    private Class<? extends Event> type;
+
+    /* Only package classes can instantiate this class. */
+    StreamMaker(Class<? extends Event> type) {
+        this.type = type;
+    }
+
+    /**
+     * Name the stream.
+     * 
+     * @param name
+     *            the stream name, default is an empty string.
+     * @return the stream maker object
+     */
+    public StreamMaker withName(String name) {
+        this.name = name;
+        return this;
+    }
+
+    /**
+     * Define the key finder for this stream.
+     * 
+     * @param keyFinder
+     *            a function to lookup the value of the key.
+     * @return the stream maker object
+     */
+    public StreamMaker withKeyFinder(KeyFinder<?> keyFinder) {
+        this.keyFinder = keyFinder;
+        this.keyFinderString = null;
+        return this;
+    }
+
+    /**
+     * Define the key finder for this stream using a descriptor.
+     * 
+     * @param keyFinderString
+     *            a descriptor to lookup the value of the key.
+     * @return the stream maker object
+     */
+    public StreamMaker withKeyFinder(String keyFinderString) {
+        this.keyFinder = null;
+        this.keyFinderString = keyFinderString;
+        return this;
+    }
+
+    /**
+     * Define the key finder for this stream using a descriptor.
+     * 
+     * @param keyFinderString
+     *            a descriptor to lookup the value of the key.
+     * @return the stream maker object
+     */
+    public StreamMaker to(PEMaker pem) {
+        this.pem = pem;
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b8cea4ab/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index ecca99d..fbb1cc4 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -15,8 +15,6 @@
  */
 package org.apache.s4.core;
 
-
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -34,7 +32,7 @@ import com.google.inject.Injector;
 /*
  * Container base class to hold all processing elements. We will implement administrative
methods here. 
  */
-public abstract class App {
+public abstract class App extends AbstractModule {
 
     private static final Logger logger = LoggerFactory.getLogger(App.class);
 
@@ -46,13 +44,13 @@ public abstract class App {
     private Sender sender;
     @Inject
     private Receiver receiver;
-    //@Inject private @Named("isCluster") Boolean isCluster;
+
+    // @Inject private @Named("isCluster") Boolean isCluster;
 
     /**
-     * The internal clock can be configured as "wall clock" or "event clock".
-     * The wall clock computes time from the system clock while the
-     * "event clock" uses the most recently seen event time stamp. TODO:
-     * implement event clock functionality.
+     * The internal clock can be configured as "wall clock" or "event clock". The wall clock
computes time from the
+     * system clock while the "event clock" uses the most recently seen event time stamp.
TODO: implement event clock
+     * functionality.
      */
     public enum ClockType {
         WALL_CLOCK, EVENT_CLOCK
@@ -61,9 +59,9 @@ public abstract class App {
     /**
      * @return true if the application is running in cluster mode.
      */
-//    public boolean isCluster() {
-//        return isCluster.booleanValue();
-//    }
+    // public boolean isCluster() {
+    // return isCluster.booleanValue();
+    // }
 
     /**
      * @return the unique app id
@@ -73,7 +71,8 @@ public abstract class App {
     }
 
     /**
-     * @param id the unique id for this app
+     * @param id
+     *            the unique id for this app
      */
     public void setId(int id) {
         this.id = id;
@@ -132,8 +131,7 @@ public abstract class App {
     }
 
     /**
-     * The internal clock is configured as "wall clock" or "event clock" when
-     * this object is created.
+     * The internal clock is configured as "wall clock" or "event clock" when this object
is created.
      * 
      * @return the App time in milliseconds.
      */
@@ -142,8 +140,7 @@ public abstract class App {
     }
 
     /**
-     * The internal clock is configured as "wall clock" or "event clock" when
-     * this object is created.
+     * The internal clock is configured as "wall clock" or "event clock" when this object
is created.
      * 
      * @param timeUnit
      * @return the App time in timeUnit
@@ -156,8 +153,7 @@ public abstract class App {
      * Set the {@link ClockType}.
      * 
      * @param clockType
-     *            the clockTyoe for this app must be
-     *            {@link ClockType.WALL_CLOCK} (default) or
+     *            the clockTyoe for this app must be {@link ClockType.WALL_CLOCK} (default)
or
      *            {@link ClockType.EVENT_CLOCK}
      */
     public void setClockType(ClockType clockType) {
@@ -191,8 +187,10 @@ public abstract class App {
     }
 
     /**
-     * @param sender - sends events to the communication layer.
-     * @param receiver - receives events from the communication layer.
+     * @param sender
+     *            - sends events to the communication layer.
+     * @param receiver
+     *            - receives events from the communication layer.
      */
     public void setCommLayer(Sender sender, Receiver receiver) {
         this.sender = sender;
@@ -201,13 +199,12 @@ public abstract class App {
     }
 
     /**
-     * Creates a stream with a specific key finder. The event is delivered to
-     * the PE instances in the target PE prototypes by key.
+     * Creates a stream with a specific key finder. The event is delivered to the PE instances
in the target PE
+     * prototypes by key.
      * 
      * <p>
-     * If the value of the key is "joe" and the target PE prototypes are
-     * AddressPE and WorkPE, the event will be delivered to the instances with
-     * key="joe" in the PE prototypes AddressPE and WorkPE.
+     * If the value of the key is "joe" and the target PE prototypes are AddressPE and WorkPE,
the event will be
+     * delivered to the instances with key="joe" in the PE prototypes AddressPE and WorkPE.
      * 
      * @param name
      *            the name of the stream
@@ -217,19 +214,17 @@ public abstract class App {
      *            the target processing elements
      * @return the stream
      */
-    protected <T extends Event> Stream<T> createStream(String name,
-            KeyFinder<T> finder, ProcessingElement... processingElements) {
+    protected <T extends Event> Stream<T> createStream(String name, KeyFinder<T>
finder,
+            ProcessingElement... processingElements) {
 
         return new Stream<T>(this, name, finder, processingElements);
     }
 
     /**
-     * Creates a broadcast stream that sends the events to all the PE instances
-     * in each of the target prototypes.
+     * Creates a broadcast stream that sends the events to all the PE instances in each of
the target prototypes.
      * 
      * <p>
-     * Keep in mind that if you had a million PE instances, the event would be
-     * delivered to all them.
+     * Keep in mind that if you had a million PE instances, the event would be delivered
to all them.
      * 
      * @param name
      *            the name of the stream
@@ -237,8 +232,7 @@ public abstract class App {
      *            the target processing elements
      * @return the stream
      */
-    protected <T extends Event> Stream<T> createStream(String name,
-            ProcessingElement... processingElements) {
+    protected <T extends Event> Stream<T> createStream(String name, ProcessingElement...
processingElements) {
 
         return new Stream<T>(this, name, processingElements);
     }
@@ -263,47 +257,55 @@ public abstract class App {
             return null;
         }
     }
-    
+
     /**
-    * Facility for starting S4 apps by passing a module class and an application class
-    *
-    * Usage: java &ltclasspath+params&gt org.apache.s4.core.App &ltappClassName&gt
&ltmoduleClassName&gt
-    *
-    */
-        public static void main(String[] args) {
-            if (args.length!=2) {
-                usage(args);
-            }
-            logger.info("Starting S4 app with module [{}] and app [{}]", args[0], args[1]);
-            Injector injector = null;
-            try {
-                if (!AbstractModule.class.isAssignableFrom(Class.forName(args[0]))) {
-                    logger.error("Module class [{}] is not an instance of [{}]", args[0],
AbstractModule.class.getName());
-                    System.exit(-1);
-                }
-                injector = Guice.createInjector((AbstractModule) Class.forName(args[0]).newInstance());
-            } catch (InstantiationException e) {
-                logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
-                System.exit(-1);
-            } catch (IllegalAccessException e) {
-                logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
-                System.exit(-1);
-            } catch (ClassNotFoundException e) {
-                logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
+     * Facility for starting S4 apps by passing a module class and an application class
+     * 
+     * Usage: java &ltclasspath+params&gt org.apache.s4.core.App &ltappClassName&gt
&ltmoduleClassName&gt
+     * 
+     */
+    public static void main(String[] args) {
+        if (args.length != 2) {
+            usage(args);
+        }
+        logger.info("Starting S4 app with module [{}] and app [{}]", args[0], args[1]);
+        Injector injector = null;
+        try {
+            if (!AbstractModule.class.isAssignableFrom(Class.forName(args[0]))) {
+                logger.error("Module class [{}] is not an instance of [{}]", args[0], AbstractModule.class.getName());
                 System.exit(-1);
             }
-            App app;
-            try {
-                app = (App)injector.getInstance(Class.forName(args[1]));
-                app.init();
-                app.start();
-            } catch (ClassNotFoundException e) {
-                logger.error("Invalid S4 application class [{}] : {}", args[0], e.getMessage());
-            }
-        }
-
-        private static void usage(String[] args) {
-            logger.info("Invalid parameters " + Arrays.toString(args) + " \nUsage: java <classpath+params>
org.apache.s4.core.App <appClassName> <moduleClassName>");
+            injector = Guice.createInjector((AbstractModule) Class.forName(args[0]).newInstance());
+        } catch (InstantiationException e) {
+            logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
+            System.exit(-1);
+        } catch (IllegalAccessException e) {
+            logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
             System.exit(-1);
+        } catch (ClassNotFoundException e) {
+            logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
+            System.exit(-1);
+        }
+        App app;
+        try {
+            app = (App) injector.getInstance(Class.forName(args[1]));
+            app.init();
+            app.start();
+        } catch (ClassNotFoundException e) {
+            logger.error("Invalid S4 application class [{}] : {}", args[0], e.getMessage());
         }
+    }
+
+    private static void usage(String[] args) {
+        logger.info("Invalid parameters " + Arrays.toString(args)
+                + " \nUsage: java <classpath+params> org.apache.s4.core.App <appClassName>
<moduleClassName>");
+        System.exit(-1);
+    }
+
+    /* Implement Guice abstract method. */
+    @Override
+    protected void configure() {
+        // TODO Auto-generated method stub
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b8cea4ab/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
index 98dc728..92aae16 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
@@ -12,11 +12,11 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 
-
 public class SimpleApp extends App {
     private SocketAdapter<StringEvent> socketAdapter;
-    
-    public SimpleApp () {}
+
+    public SimpleApp() {
+    }
 
     @Override
     protected void start() {
@@ -44,4 +44,4 @@ public class SimpleApp extends App {
     protected void close() {
         // TODO Auto-generated method stub
     }
-}
\ No newline at end of file
+}


Mime
View raw message