incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [10/50] [abbrv] git commit: Fluent API to build S4 apps.
Date Tue, 03 Jan 2012 14:03:28 GMT
Fluent API to build S4 apps.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/59038903
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/59038903
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/59038903

Branch: refs/heads/piper
Commit: 59038903d158b183aa3a1d145aed9807f090fcee
Parents: fb61d6d
Author: Leo Neumeyer <leo@s4.io>
Authored: Tue Dec 13 15:45:35 2011 -0800
Committer: Leo Neumeyer <leo@s4.io>
Committed: Tue Dec 13 15:45:35 2011 -0800

----------------------------------------------------------------------
 .../java/org/apache/s4/appbuilder/AppMaker.java    |   94 --------
 .../main/java/org/apache/s4/appbuilder/EventA.java |    7 -
 .../main/java/org/apache/s4/appbuilder/EventB.java |    7 -
 .../main/java/org/apache/s4/appbuilder/MyApp.java  |   48 ----
 .../java/org/apache/s4/appbuilder/PEMaker.java     |   37 ---
 .../main/java/org/apache/s4/appbuilder/PEX.java    |   19 --
 .../main/java/org/apache/s4/appbuilder/PEY.java    |   19 --
 .../main/java/org/apache/s4/appbuilder/PEZ.java    |   19 --
 .../java/org/apache/s4/appbuilder/StreamMaker.java |   88 -------
 .../main/java/org/apache/s4/appmaker/AppMaker.java |  126 +++++++++++
 .../main/java/org/apache/s4/appmaker/PEMaker.java  |  176 +++++++++++++++
 .../java/org/apache/s4/appmaker/StreamMaker.java   |  125 ++++++++++
 .../src/main/java/org/apache/s4/core/App.java      |   15 +-
 .../java/org/apache/s4/core/ProcessingElement.java |    8 +-
 .../src/main/java/org/apache/s4/core/Stream.java   |   10 +-
 subprojects/s4-core/src/main/resources/logback.xml |    2 +-
 .../java/org/apache/s4/appmaker/AppMakerTest.java  |   14 ++
 .../test/java/org/apache/s4/appmaker/EventA.java   |    7 +
 .../test/java/org/apache/s4/appmaker/EventB.java   |    7 +
 .../test/java/org/apache/s4/appmaker/MyApp.java    |   24 ++
 .../src/test/java/org/apache/s4/appmaker/PEX.java  |   19 ++
 .../src/test/java/org/apache/s4/appmaker/PEY.java  |   19 ++
 .../src/test/java/org/apache/s4/appmaker/PEZ.java  |   19 ++
 .../test/java/org/apache/s4/core/TriggeredApp.java |    4 +-
 .../java/org/apache/s4/example/counter/MyApp.java  |   30 ++--
 .../java/org/apache/s4/example/model/MyApp.java    |    2 +-
 26 files changed, 568 insertions(+), 377 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/AppMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/AppMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/AppMaker.java
deleted file mode 100644
index dcee431..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/AppMaker.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-
-abstract public class AppMaker extends App {
-
-    /**
-     * NOTES: reflection+guice:
-     * <code>http://groups.google.com/group/google-guice/browse_thread/thread/23f4bf986a999e00/73f83a98c288a3e1?lnk=gst&q=binding+api#73f83a98c288a3e1</code>
-     */
-
-    /**
-     * The app graph is stored as follows:
-     * <p>
-     * PE to Stream
-     * <p>
-     * PE[1]: S[1,1], S[1,2], ...
-     * <p>
-     * PE[2]: S[2,1], S[2,2], ...
-     * <p>
-     * Stream to PE
-     * <p>
-     * S[1]: PE[1]
-     * <p>
-     * S[2] : PE[2]
-     * 
-     */
-
-    private Multimap<PEMaker, StreamMaker> psGraph = LinkedListMultimap.create();
-    private Map<StreamMaker, PEMaker> spGraph = Maps.newHashMap();
-
-    void add(PEMaker pem, StreamMaker stream) {
-
-        psGraph.put(pem, stream);
-    }
-
-    void add(StreamMaker stream, PEMaker pem) {
-
-        spGraph.put(stream, pem);
-    }
-
-    public PEMaker addPE(Class<? extends ProcessingElement> type) {
-        return new PEMaker(this, type);
-    }
-
-    /**
-     * Add a stream.
-     * 
-     * @param eventType
-     *            the type of events emitted by this PE.
-     * 
-     * @return a stream maker.
-     */
-    public StreamMaker addStream(Class<? extends Event> type) {
-
-        return new StreamMaker(this, type);
-
-    }
-
-    public App make() {
-        return null;
-    }
-
-    public String toString() {
-
-        StringBuilder sb = new StringBuilder();
-        Map<PEMaker, Collection<StreamMaker>> psMap = psGraph.asMap();
-
-        for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : psMap.entrySet())
{
-            sb.append(entry.getKey() + ": ");
-            for (StreamMaker sm : entry.getValue()) {
-                sb.append(sm + " ");
-            }
-            sb.append("\n");
-        }
-
-        for (Map.Entry<StreamMaker, PEMaker> entry : spGraph.entrySet()) {
-            sb.append(entry.getKey() + ": " + entry.getValue());
-            sb.append("\n");
-        }
-
-        return sb.toString();
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventA.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventA.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventA.java
deleted file mode 100644
index c5c7414..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventA.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import org.apache.s4.base.Event;
-
-public class EventA extends Event {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventB.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventB.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventB.java
deleted file mode 100644
index 3feb90f..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventB.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import org.apache.s4.base.Event;
-
-public class EventB extends Event {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/MyApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/MyApp.java
deleted file mode 100644
index 5f3af49..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/MyApp.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.s4.appbuilder;
-
-public class MyApp extends AppMaker {
-
-    public static void main(String[] args) {
-
-        MyApp myApp = new MyApp();
-        myApp.init();
-        System.out.println(myApp.toString());
-    }
-
-    @Override
-    protected void start() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onInit() {
-        PEMaker pem1, pem2;
-        StreamMaker s1;
-        StreamMaker s2, s3;
-
-        pem1 = addPE(PEZ.class);
-
-        s1 = addStream(EventA.class).withName("My first stream.").withKey("{gender}").to(pem1);
-
-        pem2 = addPE(PEY.class).to(s1);
-
-        s2 = addStream(EventB.class).withName("My second stream.").withKey("{age}").to(pem2);
-
-        s3 = addStream(EventB.class).withName("My third stream.").withKey("{height}").to(pem2);
-
-        addPE(PEX.class).to(s2).to(s3);
-    }
-
-    @Override
-    protected void onStart() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onClose() {
-        // TODO Auto-generated method stub
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEMaker.java
deleted file mode 100644
index 58a8c6c..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEMaker.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.ProcessingElement;
-
-public class PEMaker {
-
-    private Class<? extends ProcessingElement> type;
-    private AppMaker appMaker;
-
-    /* Only package classes can instantiate this class. */
-    PEMaker(AppMaker appMaker, Class<? extends ProcessingElement> type) {
-        this.type = type;
-        this.appMaker = appMaker;
-    }
-
-    public PEMaker withTrigger(Class<? extends Event> eventType, int numEvents, long
interval, TimeUnit timeUnit) {
-
-        return this;
-    }
-
-    public PEMaker withTimerInterval(long interval, TimeUnit timeUnit) {
-
-        return this;
-    }
-
-    public <T extends Event> PEMaker to(StreamMaker stream) {
-        appMaker.add(this, stream);
-        return this;
-    }
-
-    public ProcessingElement make() {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEX.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEX.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEX.java
deleted file mode 100644
index 7df2d29..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEX.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import org.apache.s4.core.ProcessingElement;
-
-public class PEX extends ProcessingElement {
-
-    @Override
-    protected void onCreate() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onRemove() {
-        // TODO Auto-generated method stub
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEY.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEY.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEY.java
deleted file mode 100644
index 35e4fbe..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEY.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import org.apache.s4.core.ProcessingElement;
-
-public class PEY extends ProcessingElement {
-
-    @Override
-    protected void onCreate() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onRemove() {
-        // TODO Auto-generated method stub
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEZ.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEZ.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEZ.java
deleted file mode 100644
index cbd0dd9..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEZ.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import org.apache.s4.core.ProcessingElement;
-
-public class PEZ extends ProcessingElement {
-
-    @Override
-    protected void onCreate() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onRemove() {
-        // TODO Auto-generated method stub
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/StreamMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/StreamMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/StreamMaker.java
deleted file mode 100644
index 596634b..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/StreamMaker.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.KeyFinder;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-
-public class StreamMaker {
-
-    private String name = "";
-    private KeyFinder<?> keyFinder = null;
-    private String keyFinderString;
-    private PEMaker pem;
-    private Class<? extends Event> type;
-    private AppMaker appMaker;
-
-    /* Only package classes can instantiate this class. */
-    StreamMaker(AppMaker appMaker, Class<? extends Event> type) {
-        this.type = type;
-        this.appMaker = appMaker;
-    }
-
-    /**
-     * Name the stream.
-     * 
-     * @param name
-     *            the stream name, default is an empty string.
-     * @return the stream maker object
-     */
-    public StreamMaker withName(String name) {
-        this.name = name;
-        return this;
-    }
-
-    /**
-     * Define the key finder for this stream.
-     * 
-     * @param keyFinder
-     *            a function to lookup the value of the key.
-     * @return the stream maker object
-     */
-    public StreamMaker withKey(KeyFinder<?> keyFinder) {
-        this.keyFinder = keyFinder;
-        this.keyFinderString = null;
-        return this;
-    }
-
-    /**
-     * Define the key finder for this stream using a descriptor.
-     * 
-     * @param keyFinderString
-     *            a descriptor to lookup the value of the key.
-     * @return the stream maker object
-     */
-    public StreamMaker withKey(String keyFinderString) {
-        this.keyFinder = null;
-        this.keyFinderString = keyFinderString;
-        return this;
-    }
-
-    /**
-     * Define the key finder for this stream using a descriptor.
-     * 
-     * @param keyFinderString
-     *            a descriptor to lookup the value of the key.
-     * @return the stream maker object
-     */
-    public StreamMaker to(PEMaker pem) {
-        appMaker.add(this, pem);
-        this.pem = pem;
-        return this;
-    }
-
-    private Stream<? extends Event> getStream() {
-
-        // Stream stream = new Stream(appMaker, name, getKeyFinder(), getProcessingElements());
-        return null;// stream;
-    }
-
-    private KeyFinder<?> getKeyFinder() {
-
-        return null;
-    }
-
-    private ProcessingElement[] getProcessingElements() {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java
new file mode 100644
index 0000000..ead530d
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java
@@ -0,0 +1,126 @@
+package org.apache.s4.appmaker;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * A fluent API to build S4 applications.
+ * 
+ * *
+ * <p>
+ * Usage example:
+ * 
+ * <pre>
+ * public class MyApp extends AppMaker {
+ * 
+ *     &#064;Override
+ *     void configure() {
+ * 
+ *         PEMaker pe1, pe2;
+ *         StreamMaker s1;
+ *         StreamMaker s2, s3;
+ * 
+ *         pe1 = addPE(PEZ.class);
+ * 
+ *         s1 = addStream(EventA.class).withName(&quot;My first stream.&quot;).withKey(&quot;{gender}&quot;).to(pe1);
+ * 
+ *         pe2 = addPE(PEY.class).to(s1);
+ * 
+ *         s2 = addStream(EventB.class).withName(&quot;My second stream.&quot;).withKey(&quot;{age}&quot;).to(pe2);
+ * 
+ *         s3 = addStream(EventB.class).withName(&quot;My third stream.&quot;).withKey(&quot;{height}&quot;).to(pe2);
+ * 
+ *         addPE(PEX.class).to(s2).to(s3);
+ *     }
+ * }
+ * </pre>
+ */
+abstract public class AppMaker {
+
+    private static final Logger logger = LoggerFactory.getLogger(AppMaker.class);
+
+    /* Use multi-maps to save the graph. */
+    private Multimap<PEMaker, StreamMaker> pe2stream = LinkedListMultimap.create();
+    private Multimap<StreamMaker, PEMaker> stream2pe = LinkedListMultimap.create();
+
+    /**
+     * Configure the application.
+     */
+    abstract protected void configure();
+
+    /* Used internally to build the graph. */
+    void add(PEMaker pem, StreamMaker stream) {
+
+        pe2stream.put(pem, stream);
+        logger.trace("Adding pe [{}] to stream [{}].", pem, stream);
+    }
+
+    /* Used internally to build the graph. */
+    void add(StreamMaker stream, PEMaker pem) {
+
+        stream2pe.put(stream, pem);
+        logger.trace("Adding stream [{}] to pe [{}].", stream, pem);
+    }
+
+    protected PEMaker addPE(Class<? extends ProcessingElement> type) {
+        return new PEMaker(this, type);
+    }
+
+    /**
+     * Add a stream.
+     * 
+     * @param eventType
+     *            the type of events emitted by this PE.
+     * 
+     * @return a stream maker.
+     */
+    protected StreamMaker addStream(Class<? extends Event> type) {
+
+        return new StreamMaker(this, type);
+
+    }
+
+    App make() {
+        return null;
+    }
+
+    /**
+     * A printable representation of the application graph.
+     * 
+     * @return the application graph.
+     */
+    public String toString() {
+
+        StringBuilder sb = new StringBuilder();
+
+        Map<PEMaker, Collection<StreamMaker>> pe2streamMap = pe2stream.asMap();
+        for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : pe2streamMap.entrySet())
{
+            sb.append(entry.getKey() + ": ");
+            for (StreamMaker sm : entry.getValue()) {
+                sb.append(sm + " ");
+            }
+            sb.append("\n");
+        }
+
+        Map<StreamMaker, Collection<PEMaker>> stream2peMap = stream2pe.asMap();
+        for (Map.Entry<StreamMaker, Collection<PEMaker>> entry : stream2peMap.entrySet())
{
+            sb.append(entry.getKey() + ": ");
+            for (PEMaker pm : entry.getValue()) {
+                sb.append(pm + " ");
+            }
+            sb.append("\n");
+        }
+
+        return sb.toString();
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java
new file mode 100644
index 0000000..c444fda
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java
@@ -0,0 +1,176 @@
+package org.apache.s4.appmaker;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * Helper class to add a processing element to an S4 application.
+ * 
+ * @see example {@link S4Maker}
+ * 
+ */
+public class PEMaker {
+
+    final private Class<? extends ProcessingElement> type;
+    final private AppMaker app;
+
+    private long timerInterval = 0;
+
+    private long triggerInterval = 0;
+    private Class<? extends Event> triggerEventType = null;
+    private int triggerNumEvents = 0;
+
+    private int cacheMaximumSize = 0;
+    private long cacheDuration = 0;
+
+    PEMaker(AppMaker app, Class<? extends ProcessingElement> type) {
+        this.type = type;
+        this.app = app;
+        app.add(this, null);
+    }
+
+    /**
+     * Configure the PE expiration and cache size.
+     * <p>
+     * PE instances will be automatically removed from the cache once a fixed duration has
elapsed after the PEs
+     * creation, or last access.
+     * <p>
+     * Least accessed PEs will automatically be removed from the cache when the number of
PEs approaches maximumSize.
+     * <p>
+     * When this method is called all existing PE instances are destroyed.
+     * 
+     * 
+     * @param maximumSize
+     *            the approximate maximum number of PEs in the cache.
+     * @param duration
+     *            the PE duration
+     * @param timeUnit
+     *            the time unit
+     * @return the PEMaker
+     */
+    public PEMaker withPECache(int maximumSize, long duration, TimeUnit timeUnit) {
+
+        cacheMaximumSize = maximumSize;
+        cacheDuration = timeUnit.convert(duration, TimeUnit.MILLISECONDS);
+
+        return this;
+    }
+
+    /**
+     * Configure a trigger that is fired when the following conditions occur:
+     * 
+     * <ul>
+     * <li>An event of eventType arrived to the PE instance
+     * <li>numEvents have arrived since the last time this trigger was fired -OR- time
since last event is greater than
+     * interval.
+     * </ul>
+     * 
+     * <p>
+     * When the trigger fires, the method <tt>trigger(EventType event)</tt> is
called. Where <tt>EventType</tt> matches
+     * the argument eventType.
+     * 
+     * @param eventType
+     *            the type of event on which this trigger will fire.
+     * @param numEvents
+     *            number of events since last trigger activation. Must be greater than zero.
(Set to one to trigger on
+     *            every input event.)
+     * @param interval
+     *            minimum time between triggers. Set to zero if no time interval needed.
+     * @param timeUnit
+     *            the TimeUnit for the argument interval. Can set to null if no time interval
needed.
+     * @return the PEMaker
+     */
+    public PEMaker withTrigger(Class<? extends Event> eventType, int numEvents, long
interval, TimeUnit timeUnit) {
+
+        triggerEventType = eventType;
+        triggerNumEvents = numEvents;
+
+        if (timeUnit != null)
+            triggerInterval = timeUnit.convert(interval, TimeUnit.MILLISECONDS);
+
+        return this;
+    }
+
+    /**
+     * Set a timer that calls {@link ProcessingElement#onTime()}.
+     * 
+     * If {@code interval==0} the timer is disabled.
+     * 
+     * @param interval
+     *            in timeUnit
+     * @param timeUnit
+     *            the timeUnit of interval
+     * @return the PEMaker
+     */
+    public PEMaker withTimerInterval(long interval, TimeUnit timeUnit) {
+        timerInterval = TimeUnit.MILLISECONDS.convert(interval, timeUnit);
+
+        timerInterval = interval;
+
+        return this;
+    }
+
+    /**
+     * Send events from this PE to a stream.
+     * 
+     * @param stream
+     * 
+     * 
+     * @return the PE maker.
+     */
+    public PEMaker to(StreamMaker stream) {
+        app.add(this, stream);
+        return this;
+    }
+
+    /**
+     * @return the timerInterval
+     */
+    long getTimerInterval() {
+        return timerInterval;
+    }
+
+    /**
+     * @return the triggerInterval
+     */
+    long getTriggerInterval() {
+        return triggerInterval;
+    }
+
+    /**
+     * @return the triggerEventType
+     */
+    Class<? extends Event> getTriggerEventType() {
+        return triggerEventType;
+    }
+
+    /**
+     * @return the triggerNumEvents
+     */
+    int getTriggerNumEvents() {
+        return triggerNumEvents;
+    }
+
+    /**
+     * @return the cacheMaximumSize
+     */
+    int getCacheMaximumSize() {
+        return cacheMaximumSize;
+    }
+
+    /**
+     * @return the cacheDuration
+     */
+    long getCacheDuration() {
+        return cacheDuration;
+    }
+
+    /**
+     * @return the type
+     */
+    Class<? extends ProcessingElement> getType() {
+        return type;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java
new file mode 100644
index 0000000..5fe2897
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java
@@ -0,0 +1,125 @@
+package org.apache.s4.appmaker;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.KeyFinder;
+
+/**
+ * Helper class to add a stream to an S4 application.
+ * 
+ * @see example {@link S4Maker}
+ * 
+ */
+public class StreamMaker {
+
+    final private AppMaker app;
+    private Class<? extends Event> type;
+    private String name = "";
+    private KeyFinder<? extends Event> keyFinder;
+    private String keyDescriptor = null;
+
+    StreamMaker(AppMaker app, Class<? extends Event> type) {
+        this.app = app;
+        this.type = type;
+        app.add(null, this);
+    }
+
+    /**
+     * Name the stream.
+     * 
+     * @param name
+     *            the stream name, default is an empty string.
+     * @return the stream maker object
+     */
+    public StreamMaker withName(String name) {
+        this.name = name;
+        return this;
+    }
+
+    /**
+     * Define the key finder for this stream.
+     * 
+     * @param keyFinder
+     *            a function to lookup the value of the key.
+     * @return the stream maker.
+     */
+    public <T extends Event> StreamMaker withKey(KeyFinder<T> keyFinder) {
+        this.keyFinder = keyFinder;
+        return this;
+    }
+
+    /**
+     * Define the key finder for this stream using a descriptor.
+     * 
+     * @param keyFinderString
+     *            a descriptor to lookup the value of the key.
+     * @return the stream maker.
+     */
+    public StreamMaker withKey(String keyDescriptor) {
+
+        this.keyDescriptor = keyDescriptor;
+        return this;
+    }
+
+    /**
+     * Send events from this stream to a PE.
+     * 
+     * @param pe
+     *            a target PE.
+     * 
+     * @return the stream maker.
+     */
+    public StreamMaker to(PEMaker pe) {
+        app.add(this, pe);
+        return this;
+    }
+
+    /**
+     * Send events from this stream to various PEs.
+     * 
+     * @param pe
+     *            a target PE array.
+     * 
+     * @return the stream maker.
+     */
+    public StreamMaker to(PEMaker[] pes) {
+        for (int i = 0; i < pes.length; i++)
+            app.add(this, pes[i]);
+        return this;
+    }
+
+    /**
+     * @return the app
+     */
+    AppMaker getApp() {
+        return app;
+    }
+
+    /**
+     * @return the type
+     */
+    Class<? extends Event> getType() {
+        return type;
+    }
+
+    /**
+     * @return the name
+     */
+    String getName() {
+        return name;
+    }
+
+    /**
+     * @return the keyFinder
+     */
+    KeyFinder<? extends Event> getKeyFinder() {
+        return keyFinder;
+    }
+
+    /**
+     * @return the keyDescriptor
+     */
+    String getKeyDescriptor() {
+        return keyDescriptor;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 9c4ec81..0fea25f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.base.Event;
+import org.apache.s4.core.App.ClockType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +35,7 @@ import com.google.inject.Injector;
 /*
  * Container base class to hold all processing elements. We will implement administrative
methods here. 
  */
-public abstract class App extends AbstractModule {
+public abstract class App {
 
     private static final Logger logger = LoggerFactory.getLogger(App.class);
 
@@ -250,7 +251,7 @@ public abstract class App extends AbstractModule {
     protected <T extends Event> Stream<T> createStream(String name, KeyFinder<T>
finder,
             ProcessingElement... processingElements) {
 
-        return new Stream<T>(this).withName(name).withKey(finder).to(processingElements);
+        return new Stream<T>(this).setName(name).setKey(finder).setPEs(processingElements);
     }
 
     /**
@@ -267,7 +268,7 @@ public abstract class App extends AbstractModule {
      */
     protected <T extends Event> Stream<T> createStream(String name, ProcessingElement...
processingElements) {
 
-        return new Stream<T>(this).withName(name).to(processingElements);
+        return new Stream<T>(this).setName(name).setPEs(processingElements);
     }
 
     /**
@@ -355,12 +356,4 @@ public abstract class App extends AbstractModule {
                 + " \nUsage: java <classpath+params> org.apache.s4.core.App <appClassName>
<moduleClassName>");
         System.exit(-1);
     }
-
-    /* Implement Guice abstract method. */
-    @Override
-    protected void configure() {
-        // TODO Auto-generated method stub
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 5314824..0922c47 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -97,7 +97,7 @@ import com.google.common.collect.Maps;
  * 
  * 
  */
-public abstract class ProcessingElement implements Cloneable {
+abstract public class ProcessingElement implements Cloneable {
 
     private static final Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
 
@@ -223,7 +223,7 @@ public abstract class ProcessingElement implements Cloneable {
      *            the time unit
      * @return the PE prototype
      */
-    public ProcessingElement withPECache(int maximumSize, long duration, TimeUnit timeUnit)
{
+    public ProcessingElement setPECache(int maximumSize, long duration, TimeUnit timeUnit)
{
 
         if (!isPrototype) {
             logger.error("This method can only be used on the PE prototype. Cache not configured.");
@@ -265,7 +265,7 @@ public abstract class ProcessingElement implements Cloneable {
      *            the TimeUnit for the argument interval. Can set to null if no time interval
needed.
      * @return the PE prototype
      */
-    public ProcessingElement withTrigger(Class<? extends Event> eventType, int numEvents,
long interval,
+    public ProcessingElement setTrigger(Class<? extends Event> eventType, int numEvents,
long interval,
             TimeUnit timeUnit) {
 
         if (!isPrototype) {
@@ -319,7 +319,7 @@ public abstract class ProcessingElement implements Cloneable {
      *            the timeUnit of interval
      * @return the PE prototype
      */
-    public ProcessingElement withTimerInterval(long interval, TimeUnit timeUnit) {
+    public ProcessingElement setTimerInterval(long interval, TimeUnit timeUnit) {
         timerIntervalInMilliseconds = TimeUnit.MILLISECONDS.convert(interval, timeUnit);
 
         /* We only allow timers in the PE prototype, not in the instances. */

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 2d0a3a3..f8cd81c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -103,7 +103,7 @@ public class Stream<T extends Event> extends Streamable<T>
implements Runnable {
      *            the stream name, default is an empty string.
      * @return the stream maker object
      */
-    public Stream<T> withName(String name) {
+    public Stream<T> setName(String name) {
         this.name = name;
         return this;
     }
@@ -115,7 +115,7 @@ public class Stream<T extends Event> extends Streamable<T>
implements Runnable {
      *            a function to lookup the value of the key.
      * @return the stream maker object
      */
-    public Stream<T> withKey(KeyFinder<T> keyFinder) {
+    public Stream<T> setKey(KeyFinder<T> keyFinder) {
         this.key = new Key<T>(keyFinder, DEFAULT_SEPARATOR);
         return this;
     }
@@ -127,7 +127,7 @@ public class Stream<T extends Event> extends Streamable<T>
implements Runnable {
      *            a descriptor to lookup the value of the key.
      * @return the stream maker object
      */
-    public Stream<T> withKey(String keyFinderString) {
+    public Stream<T> setKey(String keyFinderString) {
 
         return this;
     }
@@ -140,7 +140,7 @@ public class Stream<T extends Event> extends Streamable<T>
implements Runnable {
      * 
      * @return the stream maker object
      */
-    public Stream<T> to(ProcessingElement pe) {
+    public Stream<T> setPE(ProcessingElement pe) {
         app.addStream(this, pe);
         return this;
     }
@@ -153,7 +153,7 @@ public class Stream<T extends Event> extends Streamable<T>
implements Runnable {
      * 
      * @return the stream maker object
      */
-    public Stream<T> to(ProcessingElement[] pes) {
+    public Stream<T> setPEs(ProcessingElement[] pes) {
         for (int i = 0; i < pes.length; i++)
             app.addStream(this, pes[i]);
         return this;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/resources/logback.xml b/subprojects/s4-core/src/main/resources/logback.xml
index 6b246ee..ea8c85a 100644
--- a/subprojects/s4-core/src/main/resources/logback.xml
+++ b/subprojects/s4-core/src/main/resources/logback.xml
@@ -8,7 +8,7 @@
     </encoder>
   </appender>
 
-  <root level="info">
+  <root level="debug">
     <appender-ref ref="STDOUT" />
   </root>
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/AppMakerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/AppMakerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/AppMakerTest.java
new file mode 100644
index 0000000..43b4cfb
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/AppMakerTest.java
@@ -0,0 +1,14 @@
+package org.apache.s4.appmaker;
+
+import org.junit.Test;
+
+public class AppMakerTest {
+
+    @Test
+    public void test() {
+
+        MyApp myApp = new MyApp();
+        myApp.configure();
+        System.out.println(myApp.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventA.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventA.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventA.java
new file mode 100644
index 0000000..c6a6325
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventA.java
@@ -0,0 +1,7 @@
+package org.apache.s4.appmaker;
+
+import org.apache.s4.base.Event;
+
+public class EventA extends Event {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventB.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventB.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventB.java
new file mode 100644
index 0000000..04ae5e0
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventB.java
@@ -0,0 +1,7 @@
+package org.apache.s4.appmaker;
+
+import org.apache.s4.base.Event;
+
+public class EventB extends Event {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java
new file mode 100644
index 0000000..5fbe2c7
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java
@@ -0,0 +1,24 @@
+package org.apache.s4.appmaker;
+
+public class MyApp extends AppMaker {
+
+    @Override
+    protected void configure() {
+
+        PEMaker pe1, pe2;
+        StreamMaker s1;
+        StreamMaker s2, s3;
+
+        pe1 = addPE(PEZ.class);
+
+        s1 = addStream(EventA.class).withName("My first stream.").withKey("{gender}").to(pe1);
+
+        pe2 = addPE(PEY.class).to(s1);
+
+        s2 = addStream(EventB.class).withName("My second stream.").withKey("{age}").to(pe2);
+
+        s3 = addStream(EventB.class).withName("My third stream.").withKey("{height}").to(pe2);
+
+        addPE(PEX.class).to(s2).to(s3);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEX.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEX.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEX.java
new file mode 100644
index 0000000..c2e5532
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEX.java
@@ -0,0 +1,19 @@
+package org.apache.s4.appmaker;
+
+import org.apache.s4.core.ProcessingElement;
+
+public class PEX extends ProcessingElement {
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java
new file mode 100644
index 0000000..16d951a
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java
@@ -0,0 +1,19 @@
+package org.apache.s4.appmaker;
+
+import org.apache.s4.core.ProcessingElement;
+
+public class PEY extends ProcessingElement {
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java
new file mode 100644
index 0000000..e893754
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java
@@ -0,0 +1,19 @@
+package org.apache.s4.appmaker;
+
+import org.apache.s4.core.ProcessingElement;
+
+public class PEZ extends ProcessingElement {
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggeredApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggeredApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggeredApp.java
index 80207f9..9e6d1e3 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggeredApp.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggeredApp.java
@@ -32,10 +32,10 @@ public class TriggeredApp extends App {
         Stream<StringEvent> stream = createStream("stream", new SentenceKeyFinder(),
prototype);
         switch (TriggerTest.triggerType) {
             case COUNT_BASED:
-                prototype.withTrigger(Event.class, 1, 0, TimeUnit.SECONDS);
+                prototype.setTrigger(Event.class, 1, 0, TimeUnit.SECONDS);
                 break;
             case TIME_BASED:
-                prototype.withTrigger(Event.class, 1, 1, TimeUnit.MILLISECONDS);
+                prototype.setTrigger(Event.class, 1, 1, TimeUnit.MILLISECONDS);
             default:
                 break;
         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
index c8586a4..cae9a3b 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
@@ -61,37 +61,37 @@ final public class MyApp extends App {
         /* PE that prints counts to console. */
         PrintPE printPE = createPE(PrintPE.class);
 
-        Stream<CountEvent> userCountStream = createStream(CountEvent.class).withName("User
Count Stream")
-                .withKey(new CountKeyFinder()).to(printPE);
+        Stream<CountEvent> userCountStream = createStream(CountEvent.class).setName("User
Count Stream")
+                .setKey(new CountKeyFinder()).setPE(printPE);
 
-        Stream<CountEvent> genderCountStream = createStream(CountEvent.class).withName("Gender
Count Stream")
-                .withKey(new CountKeyFinder()).to(printPE);
+        Stream<CountEvent> genderCountStream = createStream(CountEvent.class).setName("Gender
Count Stream")
+                .setKey(new CountKeyFinder()).setPE(printPE);
 
-        Stream<CountEvent> ageCountStream = createStream(CountEvent.class).withName("Age
Count Stream")
-                .withKey(new CountKeyFinder()).to(printPE);
+        Stream<CountEvent> ageCountStream = createStream(CountEvent.class).setName("Age
Count Stream")
+                .setKey(new CountKeyFinder()).setPE(printPE);
 
         /* PEs that count events by user, gender, and age. */
         CounterPE userCountPE = createPE(CounterPE.class);// .withTrigger(Event.class, interval,
10l, TimeUnit.SECONDS);
-        userCountPE.withTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+        userCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
         userCountPE.setCountStream(userCountStream);
 
         CounterPE genderCountPE = createPE(CounterPE.class);
-        genderCountPE.withTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+        genderCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
         genderCountPE.setCountStream(genderCountStream);
 
         CounterPE ageCountPE = createPE(CounterPE.class);
-        ageCountPE.withTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+        ageCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
         ageCountPE.setCountStream(ageCountStream);
 
         /* Streams that output user events keyed on user, gender, and age. */
-        Stream<UserEvent> userStream = createStream(UserEvent.class).withName("User
Stream")
-                .withKey(new UserIDKeyFinder()).to(userCountPE);
+        Stream<UserEvent> userStream = createStream(UserEvent.class).setName("User
Stream")
+                .setKey(new UserIDKeyFinder()).setPE(userCountPE);
 
-        Stream<UserEvent> genderStream = createStream(UserEvent.class).withName("Gender
Stream")
-                .withKey(new GenderKeyFinder()).to(genderCountPE);
+        Stream<UserEvent> genderStream = createStream(UserEvent.class).setName("Gender
Stream")
+                .setKey(new GenderKeyFinder()).setPE(genderCountPE);
 
-        Stream<UserEvent> ageStream = createStream(UserEvent.class).withName("Age Stream").withKey(new
AgeKeyFinder())
-                .to(ageCountPE);
+        Stream<UserEvent> ageStream = createStream(UserEvent.class).setName("Age Stream").setKey(new
AgeKeyFinder())
+                .setPE(ageCountPE);
 
         generateUserEventPE = createPE(GenerateUserEventPE.class);
         generateUserEventPE.setStreams(userStream, genderStream, ageStream);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-example/src/main/java/org/apache/s4/example/model/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/model/MyApp.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/model/MyApp.java
index a069513..ee649cb 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/model/MyApp.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/model/MyApp.java
@@ -94,7 +94,7 @@ public class MyApp extends App {
          */
         modelPE.setStream(distanceStream, resultStream);
         // modelPE.setOutputIntervalInEvents(10); // output every 10 events
-        metricsPE.withTimerInterval(outputInterval, timeUnit); // output every 5
+        metricsPE.setTimerInterval(outputInterval, timeUnit); // output every 5
                                                                // seconds
         // obsStream = new Stream<ObsEvent>(this, "Observation Stream", new
         // ClassIDKeyFinder(), modelPE);


Mime
View raw message