incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [39/50] [abbrv] Rename packages in preparation for move to Apache
Date Tue, 03 Jan 2012 11:19:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java b/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
new file mode 100644
index 0000000..ae877fb
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
@@ -0,0 +1,431 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+import static org.apache.s4.util.MetricsName.S4_APP_METRICS;
+import static org.apache.s4.util.MetricsName.S4_CORE_METRICS;
+import static org.apache.s4.util.MetricsName.pecontainer_ev_dq_ct;
+import static org.apache.s4.util.MetricsName.pecontainer_ev_err_ct;
+import static org.apache.s4.util.MetricsName.pecontainer_ev_nq_ct;
+import static org.apache.s4.util.MetricsName.pecontainer_ev_process_ct;
+import static org.apache.s4.util.MetricsName.pecontainer_exec_elapse_time;
+import static org.apache.s4.util.MetricsName.pecontainer_msg_drop_ct;
+import static org.apache.s4.util.MetricsName.pecontainer_pe_ct;
+import static org.apache.s4.util.MetricsName.pecontainer_qsz;
+import static org.apache.s4.util.MetricsName.pecontainer_qsz_w;
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.ft.CheckpointingEvent;
+import org.apache.s4.ft.SafeKeeper;
+import org.apache.s4.logger.Monitor;
+import org.apache.s4.util.clock.Clock;
+import org.apache.s4.util.clock.EventClock;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+public class PEContainer implements Runnable, AsynchronousEventProcessor {
+    private static Logger logger = Logger.getLogger(PEContainer.class);
+    BlockingQueue<EventWrapper> workQueue;
+    private List<PrototypeWrapper> prototypeWrappers = new ArrayList<PrototypeWrapper>();
+    private Monitor monitor;
+    private Clock clock;
+    private int maxQueueSize = 1000;
+    private boolean trackByKey;
+    private Map<String, Integer> countByEventType = Collections.synchronizedMap(new HashMap<String, Integer>());
+	private SafeKeeper safeKeeper;
+
+    private ControlEventProcessor controlEventProcessor = null;
+
+    public void setMaxQueueSize(int maxQueueSize) {
+        this.maxQueueSize = maxQueueSize;
+    }
+
+    public void setMonitor(Monitor monitor) {
+        this.monitor = monitor;
+    }
+
+    public void setClock(Clock s4Clock) {
+        this.clock = s4Clock;
+    }
+
+    public Clock getClock() {
+        return clock;
+    }
+
+    public void setTrackByKey(boolean trackByKey) {
+        this.trackByKey = trackByKey;
+    }
+
+	public void setSafeKeeper(SafeKeeper sk) {
+		this.safeKeeper = sk;
+	}
+
+    public void addProcessor(AbstractPE processor) {
+        System.out.println("adding pe: " + processor);
+        PrototypeWrapper pw = new PrototypeWrapper(processor, clock);
+        prototypeWrappers.add(pw);
+        adviceLists.add(pw.advise());
+    }
+
+    public void setProcessors(AbstractPE[] processors) {
+        for (int i = 0; i < processors.length; i++) {
+            addProcessor(processors[i]);
+        }
+    }
+
+    public void setControlEventProcessor(ControlEventProcessor cep) {
+        this.controlEventProcessor = cep;
+    }
+
+    public PEContainer() {
+
+    }
+
+    List<List<EventAdvice>> adviceLists = new ArrayList<List<EventAdvice>>();
+
+    public void init() {
+        workQueue = new LinkedBlockingQueue<EventWrapper>(maxQueueSize);
+        for (PrototypeWrapper pw : prototypeWrappers) {
+            adviceLists.add(pw.advise());
+        }
+        Thread t = new Thread(this, "PEContainer");
+        t.start();
+        t = new Thread(new Watcher());
+        t.start();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.s4.processor.AsynchronousEventProcessor#queueWork(io.s4.collector.
+     * EventWrapper)
+     */
+    @Override
+    public void queueWork(EventWrapper eventWrapper) {
+        boolean isAddSucceed = false;
+
+        try {
+            isAddSucceed = workQueue.offer(eventWrapper);
+            if (monitor != null) {
+                if (isAddSucceed) {
+                    monitor.increment(pecontainer_ev_nq_ct.toString(),
+                                      1,
+                                      S4_CORE_METRICS.toString());
+                } else {
+                    monitor.increment(pecontainer_msg_drop_ct.toString(),
+                                      1,
+                                      S4_CORE_METRICS.toString());
+                }
+                monitor.set(pecontainer_qsz.toString(),
+                            getQueueSize(),
+                            S4_CORE_METRICS.toString());
+            }
+        } catch (Exception e) {
+            logger.error("metrics name doesn't exist", e);
+        }
+    }
+
+    // This will always be called by a different thread than the one executing
+    // run()
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.processor.AsynchronousEventProcessor#getQueueSize()
+     */
+    @Override
+    public int getQueueSize() {
+        return workQueue.size();
+    }
+
+    /**
+     * An event is a control event if its stream name begins with the character
+     * '#'.
+     * 
+     * Control events are handled specially.
+     * 
+     * @param e
+     *            the event wrapper to test
+     * @return true if and only if e is a control message.
+     */
+    private boolean testControlEvent(EventWrapper e) {
+        String streamName = e.getStreamName();
+
+        if (streamName.length() > 0 && streamName.charAt(0) == '#')
+            return true;
+
+        return false;
+    }
+
+    public void run() {
+        long startTime, endTime;
+        while (true) {
+            EventWrapper eventWrapper = null;
+            try {
+                eventWrapper = workQueue.take();
+                if (clock instanceof EventClock) {
+                    EventClock eventClock = (EventClock) clock;
+                    eventClock.update(eventWrapper);
+                    // To what time to update the clock
+                }
+                if (trackByKey) {
+                    boolean foundOne = false;
+                    for (CompoundKeyInfo compoundKeyInfo : eventWrapper.getCompoundKeys()) {
+                        foundOne = true;
+                        updateCount(eventWrapper.getStreamName() + " "
+                                + compoundKeyInfo.getCompoundKey());
+                    }
+
+                    if (!foundOne) {
+                        updateCount(eventWrapper.getStreamName() + " *");
+                    }
+                }
+
+                startTime = System.currentTimeMillis();
+                if (logger.isDebugEnabled()) {
+                    logger.debug("STEP 5 (PEContainer): workQueue.take - "
+                            + eventWrapper.toString());
+                }
+                // Logger.getLogger("s4").debug(
+                // "Incoming: " + event.getEventName());
+                if (monitor != null) {
+                    monitor.increment(pecontainer_ev_dq_ct.toString(),
+                                      1,
+                                      S4_CORE_METRICS.toString());
+                }
+                // printPlainPartitionInfoList(event.getCompoundKeyList());
+
+                if (eventWrapper.getStreamName().endsWith("_checkpointing")
+                        || eventWrapper.getStreamName().endsWith("_recovery")) {
+                    // in that case, we don't need to iterate over all prototypes and advises: 
+                    // the target PE is specified in the event
+                    handleCheckpointingOrRecovery(eventWrapper);
+                } else {
+
+                    boolean ctrlEvent = testControlEvent(eventWrapper);
+
+                    // execute the PEs interested in this event
+                    for (int i = 0; i < prototypeWrappers.size(); i++) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("STEP 6 (PEContainer): prototypeWrappers("
+                                    + i
+                                    + ") - "
+                                    + prototypeWrappers.get(i).toString()
+                                    + " - " + eventWrapper.getStreamName());
+                        }
+
+                        // first check if this is a control message and handle
+                        // it if
+                        // so.
+                        if (ctrlEvent) {
+                            if (controlEventProcessor != null) {
+                                controlEventProcessor.process(eventWrapper,
+                                        prototypeWrappers.get(i));
+                            }
+
+                            continue;
+                        }
+
+                        // otherwise, continue processing event.
+                        List<EventAdvice> adviceList = adviceLists.get(i);
+                        for (EventAdvice eventAdvice : adviceList) {
+                            if (eventAdvice.getEventName().equals("*")
+                                    || eventAdvice.getEventName().equals(
+                                            eventWrapper.getStreamName())) {
+                                // event name matches
+                            } else {
+                                continue;
+                            }
+
+                            if (eventAdvice.getKey().equals("*")) {
+                                invokePE(prototypeWrappers.get(i).getPE("*"),
+                                        eventWrapper, null);
+                                continue;
+                            }
+                            
+                            for (CompoundKeyInfo compoundKeyInfo : eventWrapper
+                                    .getCompoundKeys()) {
+                                if (eventAdvice.getKey().equals(
+                                        compoundKeyInfo.getCompoundKey())) {
+                                    invokePE(
+                                            prototypeWrappers
+                                                    .get(i)
+                                                    .getPE(compoundKeyInfo
+                                                            .getCompoundValue()),
+                                            eventWrapper, compoundKeyInfo);
+                                }
+                            }
+                        }
+                    }
+                }
+
+                endTime = System.currentTimeMillis();
+                if (monitor != null) {
+                    // TODO: need to be changed for more accurate calc
+                    monitor.increment(pecontainer_exec_elapse_time.toString(),
+                                      (int) (endTime - startTime),
+                                      S4_CORE_METRICS.toString());
+                }
+            } catch (InterruptedException ie) {
+                Logger.getLogger("s4").warn("PEContainer is interrupted", ie);
+                return;
+            } catch (Exception e) {
+                Logger.getLogger("s4")
+                      .error("Exception choosing processing element to run", e);
+            }
+        }
+    }
+
+    private void handleCheckpointingOrRecovery(EventWrapper eventWrapper) {
+        CheckpointingEvent checkpointingEvent = null;
+        try {
+            checkpointingEvent = (CheckpointingEvent) eventWrapper.getEvent();
+        } catch (ClassCastException e) {
+            logger.error("Checkpointing stream ["
+                    + eventWrapper.getStreamName()
+                    + "] can only handle checkpointing events. Received event is not a checkpointing event; it will be ignored.");
+            return;
+        }
+        // 1. event is targeted towards PE prototype whose name is given by the
+        // name of the stream
+        // 2. PE id is given by the event
+        for (int i = 0; i < prototypeWrappers.size(); i++) {
+            if (checkpointingEvent.getSafeKeeperId().getPrototypeId()
+                    .equals(prototypeWrappers.get(i).getId())) {
+                
+                // check that PE is subscribed to checkpointing stream
+                List<EventAdvice> advices = adviceLists.get(i);
+                for (EventAdvice eventAdvice : advices) {
+                    if (eventAdvice.getEventName().equals(eventWrapper.getStreamName())){
+                        invokePE(
+                                prototypeWrappers.get(i).getPE(
+                                        checkpointingEvent.getSafeKeeperId().getKey()),
+                                        eventWrapper, null);
+                        break;
+                    }
+                }
+            }
+        }
+        
+
+    }
+
+    private void invokePE(AbstractPE pe, EventWrapper eventWrapper,
+                          CompoundKeyInfo compoundKeyInfo) {
+        try {
+            long startTime = System.currentTimeMillis();
+            pe.execute(eventWrapper.getStreamName(),
+                       compoundKeyInfo,
+                       eventWrapper.getEvent());
+            long endTime = System.currentTimeMillis();
+            if (monitor != null) {
+                monitor.increment(pecontainer_ev_process_ct.toString(),
+                                  1,
+                                  S4_CORE_METRICS.toString());
+                monitor.increment(pecontainer_ev_process_ct.toString(),
+                                  1,
+                                  S4_APP_METRICS.toString(),
+                                  "at",
+                                  pe.getId());
+                monitor.increment(pecontainer_exec_elapse_time.toString(),
+                                  (int) (endTime - startTime),
+                                  S4_APP_METRICS.toString(),
+                                  "at",
+                                  eventWrapper.getStreamName());
+            }
+        } catch (Exception e) {
+            if (monitor != null) {
+                monitor.increment(pecontainer_ev_err_ct.toString(),
+                                  1,
+                                  S4_CORE_METRICS.toString());
+                monitor.increment(pecontainer_ev_err_ct.toString(),
+                                  1,
+                                  S4_APP_METRICS.toString(),
+                                  "at",
+                                  pe.getId());
+            }
+            Logger.getLogger("s4")
+                  .error("Exception running processing element", e);
+        }
+
+    }
+
+    private void updateCount(String key) {
+        Integer countObj = countByEventType.get(key);
+        if (countObj == null) {
+            countObj = 0;
+        }
+        countObj++;
+        countByEventType.put(key, countObj);
+    }
+
+    class Watcher implements Runnable {
+        public void run() {
+            while (!Thread.interrupted()) {
+                try {
+                    int peCount = 0;
+                    for (PrototypeWrapper pw : prototypeWrappers) {
+                        peCount += pw.getPECount();
+                        if (monitor != null) {
+                            monitor.set(pecontainer_pe_ct.toString(),
+                                        pw.getPECount(),
+                                        S4_APP_METRICS.toString(),
+                                        "at",
+                                        pw.getId());
+                        }
+                    }
+
+                    Logger.getLogger("s4").info("PE count " + peCount);
+                    if (monitor != null) {
+                        monitor.set(pecontainer_pe_ct.toString(),
+                                    peCount,
+                                    S4_CORE_METRICS.toString());
+                        monitor.set(pecontainer_qsz_w.toString(),
+                                    getQueueSize(),
+                                    S4_CORE_METRICS.toString());
+                    }
+
+                    if (trackByKey) {
+                        for (String key : countByEventType.keySet()) {
+                            Integer countObj = countByEventType.get(key);
+                            if (countObj != null) {
+                                Logger.getLogger("s4").info("Count by " + key
+                                        + ": " + countObj);
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    Logger.getLogger("s4")
+                          .error("Exception running PEContainer watcher", e);
+                } finally {
+                    try {
+                        Thread.sleep(10000);
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/PrintEventPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/PrintEventPE.java b/s4-core/src/main/java/org/apache/s4/processor/PrintEventPE.java
new file mode 100644
index 0000000..96dd877
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/PrintEventPE.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+import org.apache.log4j.Logger;
+
+public class PrintEventPE extends AbstractPE {
+
+    @Override
+    public void output() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void processEvent(Object event) {
+        Logger.getLogger("s4").info(event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java b/s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java
new file mode 100644
index 0000000..1a7ca7a
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+import org.apache.s4.ft.SafeKeeper;
+import org.apache.s4.persist.ConMapPersister;
+import org.apache.s4.persist.Persister;
+import org.apache.s4.util.clock.Clock;
+
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+public class PrototypeWrapper {
+
+    private static Logger logger = Logger.getLogger(PrototypeWrapper.class);
+    private AbstractPE prototype;
+    Persister lookupTable;
+	SafeKeeper safeKeeper;
+
+    public String getId() {
+        return prototype.getId();
+    }
+
+    public PrototypeWrapper(AbstractPE prototype, Clock s4Clock) {
+        this.prototype = prototype;
+        lookupTable = new ConMapPersister(s4Clock);
+        // TODO lookup table with PEIds
+        System.out.println("Using ConMapPersister ..");
+        // this bit of reflection is not a performance issue because it is only
+        // invoked at configuration time
+        try {
+            ((ConMapPersister) lookupTable).setSelfClean(true);
+            ((ConMapPersister) lookupTable).init();
+            // set the persister in prototype
+            prototype.setLookupTable(lookupTable);
+            prototype.setPrototypeWrapper(this);
+        } catch (Exception e) {
+            // this is not expected
+            Logger.getLogger("s4")
+                  .error("Exception invoking setLookupTable on prototype", e);
+        }
+    }
+
+
+    /**
+     * Find PE corresponding to keyValue. If no such PE exists, then a new one
+     * is created by cloning the prototype and this is returned. As a
+     * side-effect, the last update time for the PE in the lookup table is
+     * modified.
+     * 
+     * @param keyValue
+     *            key value
+     * @return PE corresponding to keyValue.
+     */
+    public AbstractPE getPE(String keyValue) {
+        AbstractPE pe = null;
+        try {
+            pe = (AbstractPE) lookupTable.get(keyValue);
+            if (pe == null) {
+                pe = (AbstractPE) prototype.clone();
+                pe.initInstance();
+            }
+            // update the last update time on the entry
+            lookupTable.set(keyValue, pe, prototype.getTtl());
+
+        } catch (Exception e) {
+            logger.error("exception when looking up pe for key:" + keyValue, e);
+        }
+
+        return pe;
+    }
+
+    /**
+     * Find PE corresponding to keyValue. If no such PE exists, then null is
+     * returned. Note: the last update time is not modified in the lookup table.
+     * 
+     * @param keyValue
+     *            key value
+     * @return PE corresponding to keyValue, if such a PE exists. Null
+     *         otherwise.
+     */
+    public AbstractPE lookupPE(String keyValue) {
+        AbstractPE pe = null;
+
+        try {
+            pe = (AbstractPE) lookupTable.get(keyValue);
+
+        } catch (Exception e) {
+            logger.error("exception when looking up pe for key:" + keyValue, e);
+        }
+
+        return pe;
+    }
+    
+    public void expire(String keyValue) {
+        try {
+           lookupTable.set(keyValue, null, 0);
+        } catch (Exception e) {
+            logger.error("exception when removing pe for key:" + keyValue, e);
+        }
+   
+    }
+
+    public int getPECount() {
+        return lookupTable.keySet().size();
+    }
+
+    public List<EventAdvice> advise() {
+        return prototype.advise();
+    }
+
+	public void setSafeKeeper(SafeKeeper safeKeeper) {
+		this.safeKeeper = safeKeeper;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/ReroutePE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/ReroutePE.java b/s4-core/src/main/java/org/apache/s4/processor/ReroutePE.java
new file mode 100644
index 0000000..94d278b
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/ReroutePE.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.dispatcher.transformer.Transformer;
+import org.apache.s4.util.Cloner;
+import org.apache.s4.util.ClonerGenerator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class ReroutePE extends AbstractPE {
+    private static Logger logger = Logger.getLogger(ReroutePE.class);
+    private EventDispatcher dispatcher;
+    private Transformer[] transformers = new Transformer[0];
+    // private List<EventAdvice> keys;
+    private String outputStreamName;
+
+    public void setDispatcher(EventDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public void setTransformers(Transformer[] transformers) {
+        this.transformers = transformers;
+    }
+
+    // public void setEventNames(String[] eventNames) {
+    // keys = new ArrayList<EventAdvice>();
+    // for (String eventName : eventNames) {
+    // keys.add(new EventAdvice(eventName, "*"));
+    // }
+    // }
+
+    // @Override
+    // public List<EventAdvice> advise() {
+    // return keys;
+    // }
+
+    public void setOutputStreamName(String outputStreamName) {
+        this.outputStreamName = outputStreamName;
+    }
+
+    private Map<String, Cloner> clonerMap = new HashMap<String, Cloner>();
+
+    public void processEvent(Object event) {
+        Object newEvent = event;
+        if (transformers != null && transformers.length > 0) {
+            Cloner cloner = clonerMap.get(event.getClass().getName());
+            if (cloner == null) {
+                ClonerGenerator cg = new ClonerGenerator();
+                // generate byte code that knows how to call the clone method on
+                // this event
+                Class clonerClass = cg.generate(event.getClass());
+                try {
+                    cloner = (Cloner) clonerClass.newInstance();
+                    clonerMap.put(event.getClass().getName(), cloner);
+                } catch (InstantiationException ie) {
+                    Logger.getLogger(this.getClass()).error(ie);
+                    throw new RuntimeException(ie);
+                } catch (IllegalAccessException ias) {
+                    Logger.getLogger(this.getClass()).error(ias);
+                    throw new RuntimeException(ias);
+                }
+            }
+            newEvent = cloner.clone(event);
+        }
+
+        for (Transformer transformer : transformers) {
+            newEvent = transformer.transform(newEvent);
+            if (newEvent == null) {
+                return;
+            }
+        }
+
+        dispatcher.dispatchEvent(outputStreamName, newEvent);
+    }
+
+    public int getTtl() {
+        return -1; // never die
+    }
+
+    @Override
+    public void output() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/SimpleCountingPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/SimpleCountingPE.java b/s4-core/src/main/java/org/apache/s4/processor/SimpleCountingPE.java
new file mode 100644
index 0000000..a390928
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/SimpleCountingPE.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+import org.apache.s4.persist.Persister;
+
+import java.util.List;
+
+public class SimpleCountingPE extends AbstractPE {
+    private boolean clearOnOutput;
+    private OutputFormatter outputFormatter;
+    private Persister persister;
+    private int persistTime;
+    private String keyPrefix = "s4:counter";
+    private boolean dirty = false;
+
+    public void setClearOnOutput(boolean clearOnOutput) {
+        this.clearOnOutput = clearOnOutput;
+    }
+
+    public void setOutputFormatter(OutputFormatter outputFormatter) {
+        this.outputFormatter = outputFormatter;
+    }
+
+    public void setPersister(Persister persister) {
+        this.persister = persister;
+    }
+
+    public void setPersistTime(int persistTime) {
+        this.persistTime = persistTime;
+    }
+
+    public void setKeyPrefix(String keyPrefix) {
+        this.keyPrefix = keyPrefix;
+    }
+
+    private long counter = 0;
+
+    @Override
+    public void output() {
+        Object outputValue = null;
+        synchronized (this) {
+            if (!dirty) {
+                return;
+            }
+            outputValue = new Long(counter);
+            if (clearOnOutput) {
+                counter = 0;
+            }
+            dirty = false;
+        }
+
+        List<Object> simpleKeyValues = this.getKeyValue();
+        StringBuffer keyBuffer = new StringBuffer(keyPrefix);
+        for (Object simpleKeyValue : simpleKeyValues) {
+            keyBuffer.append(":");
+            keyBuffer.append(String.valueOf(simpleKeyValue));
+        }
+
+        if (outputFormatter != null) {
+            outputValue = outputFormatter.format(outputValue);
+        }
+
+        try {
+            persister.set(keyBuffer.toString(), outputValue, persistTime);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+
+    }
+
+    public void processEvent(Object event) {
+        synchronized (this) {
+            counter++;
+            dirty = true;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/schema/Schema.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/schema/Schema.java b/s4-core/src/main/java/org/apache/s4/schema/Schema.java
new file mode 100644
index 0000000..2af4d69
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/schema/Schema.java
@@ -0,0 +1,293 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.schema;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Schema {
+    private Map<String, Property> properties = new HashMap<String, Property>();
+    @SuppressWarnings("unchecked")
+    private Class type;
+
+    public Map<String, Property> getProperties() {
+        return Collections.unmodifiableMap(properties);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class getType() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Schema(Class clazz) {
+        type = clazz;
+        for (Method method : clazz.getMethods()) {
+            if (!method.getName().startsWith("set")
+		|| !method.getReturnType().equals(Void.TYPE)
+		|| method.getParameterTypes().length != 1) {
+                continue;
+            }
+
+            String getterMethodName = method.getName().replaceFirst("set",
+                                                                    "get");
+
+            String propertyName = "";
+            if (method.getName().length() > 4) {
+                propertyName = method.getName().substring(4);
+            }
+            propertyName = method.getName().substring(3, 4).toLowerCase()
+                    + propertyName;
+
+            Type parameterType = method.getGenericParameterTypes()[0];
+
+            Method getterMethod = null;
+            try {
+                getterMethod = clazz.getMethod(getterMethodName, new Class[] {});
+                /*
+                 * if (!getterMethod.getReturnType().equals(parameterType)) {
+                 * getterMethod = null; }
+                 */
+            } catch (NoSuchMethodException nsme) {
+                // this is an acceptable possibility, ignore
+                // nsme.printStackTrace();
+            }
+
+            Property property = Property.getProperty(propertyName,
+                                                     getterMethod,
+                                                     method,
+                                                     parameterType);
+            if (property != null) {
+                properties.put(propertyName, property);
+            }
+        }
+    }
+
+    public String toString() {
+        return this.toString("");
+    }
+
+    public String toString(String indent) {
+        StringBuffer sb = new StringBuffer();
+        sb.append(indent).append("{\n");
+        for (Property property : properties.values()) {
+            sb.append(property.toString(indent + "   "));
+        }
+
+        sb.append(indent).append("}\n");
+        return sb.toString();
+    }
+
+    static public class Property {
+        @SuppressWarnings("unchecked")
+        private static List<Class> nonBeanClasses = new ArrayList<Class>();
+        static {
+            nonBeanClasses.add(String.class);
+            nonBeanClasses.add(Number.class);
+        }
+        @SuppressWarnings("unchecked")
+        private Class type;
+        private Property componentProperty;
+        private Schema schema;
+        private String name;
+        private Method getterMethod;
+        private Method setterMethod;
+        private boolean isList;
+        private boolean isNumber;
+
+        @SuppressWarnings("unchecked")
+        public Class getType() {
+            return type;
+        }
+
+        public Schema getSchema() {
+            return schema;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public Method getGetterMethod() {
+            return getterMethod;
+        }
+
+        public Method getSetterMethod() {
+            return setterMethod;
+        }
+
+        public boolean isList() {
+            return isList;
+        }
+
+        public boolean isNumber() {
+            return isNumber;
+        }
+
+        public Property getComponentProperty() {
+            return componentProperty;
+        }
+
+        public static Property getProperty(String propertyName, Method getterMethod, Method setterMethod, Type parameterType) {
+            if (parameterType instanceof Class) {
+                Class ptClass = (Class) parameterType;
+
+                if (ptClass.isArray()) {
+                    Class componentType = ptClass.getComponentType();
+                    return new Property(propertyName,
+                                        getterMethod,
+                                        setterMethod,
+                                        ptClass,
+                                        getProperty("component",
+                                                    null,
+                                                    null,
+                                                    componentType));
+                } else if (ptClass.isPrimitive()) {
+                    return new Property(propertyName,
+                                        getterMethod,
+                                        setterMethod,
+                                        ptClass);
+                }
+
+                List<Class> hierarchy = getHieararchy(ptClass);
+
+                for (Class nonBeanClass : nonBeanClasses) {
+                    if (hierarchy.contains(nonBeanClass)) {
+                        return new Property(propertyName,
+                                            getterMethod,
+                                            setterMethod,
+                                            ptClass);
+                    }
+                }
+
+                // if here, then must have no-arg constructor and some bean-like
+                // properties
+                boolean noArgConstructorFound = false;
+                for (Constructor constructor : ptClass.getConstructors()) {
+                    if (constructor.getGenericParameterTypes().length == 0) {
+                        noArgConstructorFound = true;
+                        break;
+                    }
+                }
+
+                if (!noArgConstructorFound) {
+                    return null;
+                }
+
+                Schema schema = new Schema(ptClass);
+                if (schema.getProperties().size() == 0) {
+                    return null;
+                }
+
+                return new Property(propertyName,
+                                    getterMethod,
+                                    setterMethod,
+                                    ptClass,
+                                    schema);
+            } else if (parameterType instanceof ParameterizedType) {
+                ParameterizedType pt = (ParameterizedType) parameterType;
+                Class rawType = (Class) pt.getRawType();
+
+                Class[] interfaces = rawType.getInterfaces();
+                if (!rawType.equals(List.class)
+                        && !Arrays.asList(interfaces).contains(List.class)) {
+                    return null;
+                }
+
+                Type[] parameterInstantiations = pt.getActualTypeArguments();
+                if (parameterInstantiations.length != 1) {
+                    return null;
+                }
+
+                Type pIType = parameterInstantiations[0];
+
+                return new Property(propertyName,
+                                    getterMethod,
+                                    setterMethod,
+                                    rawType,
+                                    getProperty("component", null, null, pIType));
+            }
+
+            return null;
+        }
+
+        private static List<Class> getHieararchy(Class clazz) {
+            List<Class> hierarchy = new ArrayList<Class>();
+
+            for (Class level = clazz; level != null; level = level.getSuperclass()) {
+                hierarchy.add(level);
+            }
+            return hierarchy;
+        }
+
+        private Property(String name, Method getterMethod, Method setterMethod,
+                Class type) {
+            this.name = name;
+            this.getterMethod = getterMethod;
+            this.setterMethod = setterMethod;
+            this.type = type;
+
+            Class[] interfaces = type.getInterfaces();
+            if (type.equals(List.class)
+                    || Arrays.asList(interfaces).contains(List.class)) {
+                isList = true;
+            } else if (getHieararchy(type).contains(Number.class)) {
+                isNumber = true;
+            }
+        }
+
+        private Property(String name, Method getterMethod, Method setterMethod,
+                Class type, Schema schema) {
+            this(name, getterMethod, setterMethod, type);
+            this.schema = schema;
+        }
+
+        private Property(String name, Method getterMethod, Method setterMethod,
+                Class type, Property componentProperty) {
+            this(name, getterMethod, setterMethod, type);
+            this.componentProperty = componentProperty;
+        }
+
+        public String toString() {
+            return toString("");
+        }
+
+        public String toString(String indent) {
+            StringBuffer sb = new StringBuffer();
+            sb.append(indent)
+              .append(type.getName())
+              .append(" ")
+              .append(name)
+              .append("\n");
+            if (schema != null) {
+                sb.append(schema.toString(indent));
+            }
+            if (componentProperty != null) {
+                sb.append(componentProperty.toString(indent));
+            }
+
+            return sb.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/schema/SchemaContainer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/schema/SchemaContainer.java b/s4-core/src/main/java/org/apache/s4/schema/SchemaContainer.java
new file mode 100644
index 0000000..1777b67
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/schema/SchemaContainer.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.schema;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SchemaContainer {
+    private Map<String, Schema> schemaMap = new ConcurrentHashMap<String, Schema>();
+
+    public Schema getSchema(Class<?> clazz) {
+        String schemaKey = clazz.getClassLoader().toString() + ":"
+                + clazz.getName();
+
+        Schema schema = schemaMap.get(schemaKey);
+        if (schema == null) {
+            schema = new Schema(clazz);
+            schemaMap.put(schemaKey, schema);
+        }
+
+        return schema;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/schema/SchemaManager.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/schema/SchemaManager.java b/s4-core/src/main/java/org/apache/s4/schema/SchemaManager.java
new file mode 100644
index 0000000..64bd425
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/schema/SchemaManager.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.schema;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+public class SchemaManager {
+    protected Map<String, String> schemaStringMap = new ConcurrentHashMap<String, String>();
+    private Map<String, String> schemaFilenameMap;
+
+    public void init() {
+        if (schemaFilenameMap == null) {
+            return;
+        }
+
+        for (String schemaName : schemaFilenameMap.keySet()) {
+            String schemaFilename = schemaFilenameMap.get(schemaName);
+            addSchemaFromFile(schemaName, schemaFilename);
+        }
+    }
+
+    public void setSchemas(Map<String, String> schemaFilenameMap) {
+        this.schemaFilenameMap = schemaFilenameMap;
+    }
+
+    public String getSchemaString(String schemaName) {
+        return schemaStringMap.get(schemaName);
+    }
+
+    public void addSchemaFromFile(String schemaName, String schemaFilename) {
+        BufferedReader br = null;
+        FileReader fr = null;
+        try {
+            File schemaFile = new File(schemaFilename);
+            if (schemaFile.exists()) {
+                fr = new FileReader(schemaFilename);
+                br = new BufferedReader(fr);
+                StringBuffer schemaBuffer = new StringBuffer();
+                String line = null;
+                while ((line = br.readLine()) != null) {
+                    schemaBuffer.append(line);
+                }
+                schemaStringMap.put(schemaName, schemaBuffer.toString());
+            } else {
+                Logger.getLogger("s4").error("Missing schema file:"
+                        + schemaFilename + " for type:" + schemaName);
+            }
+        } catch (IOException ioe) {
+            Logger.getLogger("s4").error("Exception reading schema file "
+                                                 + schemaFilename,
+                                         ioe);
+        } finally {
+            if (br != null) {
+                try {
+                    br.close();
+                } catch (Exception e) {
+                }
+            }
+            if (fr != null) {
+                try {
+                    fr.close();
+                } catch (Exception e) {
+                }
+
+            }
+        }
+    }
+
+    public void addSchema(String schemaName, String schema) {
+        schemaStringMap.put(schemaName, schema);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/serialize/KryoSerDeser.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/serialize/KryoSerDeser.java b/s4-core/src/main/java/org/apache/s4/serialize/KryoSerDeser.java
new file mode 100644
index 0000000..4683901
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/serialize/KryoSerDeser.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.serialize;
+
+import java.nio.ByteBuffer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.ObjectBuffer;
+import com.esotericsoftware.kryo.serialize.SimpleSerializer;
+
+public class KryoSerDeser implements SerializerDeserializer {
+
+    private Kryo kryo = new Kryo();
+    
+    private int initialBufferSize = 2048;
+    private int maxBufferSize = 256*1024;
+
+    public void setInitialBufferSize(int initialBufferSize) {
+        this.initialBufferSize = initialBufferSize;
+    }
+
+    public void setMaxBufferSize(int maxBufferSize) {
+        this.maxBufferSize = maxBufferSize;
+    }
+
+    public Kryo getKryo() {
+		return kryo;
+	}
+
+	public KryoSerDeser() {
+        kryo.setRegistrationOptional(true);
+
+        // UUIDs don't have a no-arg constructor.
+        kryo.register(java.util.UUID.class,
+                      new SimpleSerializer<java.util.UUID>() {
+                          @Override
+                          public java.util.UUID read(ByteBuffer buf) {
+                              return new java.util.UUID(buf.getLong(),
+                                                        buf.getLong());
+                          }
+
+                          @Override
+                          public void write(ByteBuffer buf, java.util.UUID uuid) {
+                              buf.putLong(uuid.getMostSignificantBits());
+                              buf.putLong(uuid.getLeastSignificantBits());
+                          }
+
+                      });
+    }
+
+    @Override
+    public Object deserialize(byte[] rawMessage) {
+        ObjectBuffer buffer = new ObjectBuffer(kryo, initialBufferSize, maxBufferSize);
+        return buffer.readClassAndObject(rawMessage);
+    }
+
+    @Override
+    public byte[] serialize(Object message) {
+        ObjectBuffer buffer = new ObjectBuffer(kryo, initialBufferSize, maxBufferSize);
+        return buffer.writeClassAndObject(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/serialize/SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/serialize/SerializerDeserializer.java b/s4-core/src/main/java/org/apache/s4/serialize/SerializerDeserializer.java
new file mode 100644
index 0000000..07427f3
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/serialize/SerializerDeserializer.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.serialize;
+
+public interface SerializerDeserializer {
+    public byte[] serialize(Object message);
+
+    public Object deserialize(byte[] rawMessage);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/test/TestPersisterEventClock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/test/TestPersisterEventClock.java b/s4-core/src/main/java/org/apache/s4/test/TestPersisterEventClock.java
new file mode 100644
index 0000000..d57859a
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/test/TestPersisterEventClock.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.test;
+
+import org.apache.s4.persist.ConMapPersister;
+import org.apache.s4.persist.HashMapPersister;
+import org.apache.s4.persist.Persister;
+import org.apache.s4.util.clock.EventClock;
+
+public class TestPersisterEventClock {
+
+    static EventClock s4Clock;
+    static Persister persister;
+
+    public static void main(String[] args) {
+        TestPersisterEventClock testPersisterClock = new TestPersisterEventClock();
+        s4Clock = new EventClock();
+        s4Clock.updateTime(69990000);
+        TimeUpdaterThread timeUpdater = new TimeUpdaterThread(s4Clock);
+        Thread timeUpdaterThread = new Thread(timeUpdater);
+        timeUpdaterThread.start();
+        persister = new HashMapPersister(s4Clock);
+        testPersisterClock.testPersister(persister);
+        persister = new ConMapPersister(s4Clock);
+        testPersisterClock.testPersister(persister);
+    }
+
+    public void testPersister(Persister persister) {
+        HashMapPersister hp = null;
+        ConMapPersister cp = null;
+        if (persister instanceof HashMapPersister) {
+            hp = (HashMapPersister) persister;
+            hp.init();
+
+            hp.set("mykey1", "Test1", 40);
+            hp.set("mykey2", "Test2", 48);
+            hp.set("mykey3", "Test2", -1);
+
+            try {
+                s4Clock.waitForTime(s4Clock.getCurrentTime() + 1);
+            } catch (Exception e) {
+            }
+
+            System.out.println("mykey1: " + hp.get("mykey1"));
+            System.out.println("mykey2: " + hp.get("mykey2"));
+            System.out.println("mykey3: " + hp.get("mykey3"));
+
+            System.out.println("Going to sleep...");
+            try {
+                s4Clock.waitForTime(s4Clock.getCurrentTime() + 41000);
+            } catch (Exception e) {
+            }
+            System.out.println("Waking up");
+
+            System.out.println("mykey1: " + hp.get("mykey1"));
+            System.out.println("mykey2: " + hp.get("mykey2"));
+            System.out.println("mykey3: " + hp.get("mykey3"));
+
+            System.out.println("Going to sleep...");
+            try {
+                s4Clock.waitForTime(s4Clock.getCurrentTime() + 10000);
+            } catch (Exception e) {
+            }
+            System.out.println("Waking up");
+
+            System.out.println("mykey1: " + hp.get("mykey1"));
+            System.out.println("mykey2: " + hp.get("mykey2"));
+            System.out.println("mykey3: " + hp.get("mykey3"));
+            System.out.println("cleanUp: " + hp.cleanOutGarbage());
+
+        }
+        if (persister instanceof ConMapPersister) {
+            cp = (ConMapPersister) persister;
+            cp.init();
+
+            cp.set("mykey1", "Test1", 40);
+            cp.set("mykey2", "Test2", 48);
+            cp.set("mykey3", "Test2", -1);
+
+            try {
+                s4Clock.waitForTime(s4Clock.getCurrentTime() + 1);
+            } catch (Exception e) {
+            }
+
+            System.out.println("mykey1: " + cp.get("mykey1"));
+            System.out.println("mykey2: " + cp.get("mykey2"));
+            System.out.println("mykey3: " + cp.get("mykey3"));
+
+            System.out.println("Going to sleep...");
+            try {
+                s4Clock.waitForTime(s4Clock.getCurrentTime() + 41000);
+            } catch (Exception e) {
+            }
+            System.out.println("Waking up");
+
+            System.out.println("mykey1: " + cp.get("mykey1"));
+            System.out.println("mykey2: " + cp.get("mykey2"));
+            System.out.println("mykey3: " + cp.get("mykey3"));
+
+            System.out.println("Going to sleep...");
+            try {
+                s4Clock.waitForTime(s4Clock.getCurrentTime() + 10000);
+            } catch (Exception e) {
+            }
+            System.out.println("Waking up");
+
+            System.out.println("mykey1: " + cp.get("mykey1"));
+            System.out.println("mykey2: " + cp.get("mykey2"));
+            System.out.println("mykey3: " + cp.get("mykey3"));
+            System.out.println("cleanUp: " + cp.cleanOutGarbage());
+        }
+    }
+    
+    static class TimeUpdaterThread implements Runnable {
+
+        EventClock s4Clock;
+
+        public TimeUpdaterThread(EventClock s4Clock) {
+            this.s4Clock = s4Clock;
+        }
+
+        @Override
+        public void run() {
+            // TODO Auto-generated method stub
+
+            // tick the clock
+            for (long currentTime = 69996400; currentTime <= 70099900; currentTime += 400) {
+                System.out.println("Setting time to " + currentTime);
+                s4Clock.updateTime(currentTime);
+                try {
+                    Thread.sleep(400);
+                } catch (InterruptedException ie) {
+                }
+            }
+        }
+
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/test/TestPersisterWallClock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/test/TestPersisterWallClock.java b/s4-core/src/main/java/org/apache/s4/test/TestPersisterWallClock.java
new file mode 100644
index 0000000..744eab7
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/test/TestPersisterWallClock.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.test;
+
+import org.apache.s4.persist.ConMapPersister;
+import org.apache.s4.persist.HashMapPersister;
+import org.apache.s4.persist.Persister;
+import org.apache.s4.util.clock.WallClock;
+
+public class TestPersisterWallClock {
+
+    static WallClock s4Clock;
+    static Persister persister;
+
+    public static void main(String[] args) {
+        TestPersisterWallClock testPersisterClock = new TestPersisterWallClock();
+        s4Clock = new WallClock();
+        persister = new HashMapPersister(s4Clock);
+        testPersisterClock.testPersister(persister);
+        persister = new ConMapPersister(s4Clock);
+        testPersisterClock.testPersister(persister);
+    }
+
+    public void testPersister(Persister persister) {
+        HashMapPersister hp = null;
+        ConMapPersister cp = null;
+        if (persister instanceof HashMapPersister) {
+            hp = (HashMapPersister) persister;
+            hp.init();
+
+            hp.set("mykey1", "Test1", 40);
+            hp.set("mykey2", "Test2", 48);
+            hp.set("mykey3", "Test2", -1);
+
+            try {
+                s4Clock.waitForTime(s4Clock.getCurrentTime() + 1);
+            } catch (Exception e) {
+            }
+
+            System.out.println("mykey1: " + hp.get("mykey1"));
+            System.out.println("mykey2: " + hp.get("mykey2"));
+            System.out.println("mykey3: " + hp.get("mykey3"));
+
+            System.out.println("Going to sleep...");
+            try {
+                s4Clock.waitForTime(s4Clock.getCurrentTime() + 41000);
+            } catch (Exception e) {
+            }
+            System.out.println("Waking up");
+
+            System.out.println("mykey1: " + hp.get("mykey1"));
+            System.out.println("mykey2: " + hp.get("mykey2"));
+            System.out.println("mykey3: " + hp.get("mykey3"));
+
+            System.out.println("Going to sleep...");
+            try {
+                s4Clock.waitForTime(s4Clock.getCurrentTime() + 10000);
+            } catch (Exception e) {
+            }
+            System.out.println("Waking up");
+
+            System.out.println("mykey1: " + hp.get("mykey1"));
+            System.out.println("mykey2: " + hp.get("mykey2"));
+            System.out.println("mykey3: " + hp.get("mykey3"));
+            System.out.println("cleanUp: " + hp.cleanOutGarbage());
+
+        }
+        if (persister instanceof ConMapPersister) {
+            cp = (ConMapPersister) persister;
+            cp.init();
+
+            cp.set("mykey1", "Test1", 40);
+            cp.set("mykey2", "Test2", 48);
+            cp.set("mykey3", "Test2", -1);
+
+            try {
+                s4Clock.waitForTime(s4Clock.getCurrentTime() + 1);
+            } catch (Exception e) {
+            }
+
+            System.out.println("mykey1: " + cp.get("mykey1"));
+            System.out.println("mykey2: " + cp.get("mykey2"));
+            System.out.println("mykey3: " + cp.get("mykey3"));
+
+            System.out.println("Going to sleep...");
+            try {
+                s4Clock.waitForTime(s4Clock.getCurrentTime() + 41000);
+            } catch (Exception e) {
+            }
+            System.out.println("Waking up");
+
+            System.out.println("mykey1: " + cp.get("mykey1"));
+            System.out.println("mykey2: " + cp.get("mykey2"));
+            System.out.println("mykey3: " + cp.get("mykey3"));
+
+            System.out.println("Going to sleep...");
+            try {
+                s4Clock.waitForTime(s4Clock.getCurrentTime() + 10000);
+            } catch (Exception e) {
+            }
+            System.out.println("Waking up");
+
+            System.out.println("mykey1: " + cp.get("mykey1"));
+            System.out.println("mykey2: " + cp.get("mykey2"));
+            System.out.println("mykey3: " + cp.get("mykey3"));
+            System.out.println("cleanUp: " + cp.cleanOutGarbage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/ByteArrayIOChannel.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/ByteArrayIOChannel.java b/s4-core/src/main/java/org/apache/s4/util/ByteArrayIOChannel.java
new file mode 100644
index 0000000..6aa2b39
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/ByteArrayIOChannel.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+import org.apache.s4.client.IOChannel;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+public class ByteArrayIOChannel implements IOChannel {
+    private InputStream in;
+    private OutputStream out;
+
+    public ByteArrayIOChannel(Socket socket) throws IOException {
+        in = socket.getInputStream();
+        out = socket.getOutputStream();
+    }
+
+    private void readBytes(byte[] s, int n) throws IOException {
+        int r = 0; // bytes read so far
+
+        do {
+            // keep reading bytes till the required "n" are read
+            int p = in.read(s, r, (n - r));
+
+            if (p == -1) {
+                throw new IOException("reached end of stream after reading "
+                        + r + " bytes. expected " + n + " bytes");
+            }
+
+            r += p;
+
+        } while (r < n);
+    }
+
+    public byte[] recv() throws IOException {
+        // first read size of byte array.
+        // unsigned int, big endian: 0A0B0C0D -> {0A, 0B, 0C, 0D}
+        byte[] s = { 0, 0, 0, 0 };
+        readBytes(s, 4);
+
+        // to allow full range of int, using long for size
+        int size = (int) ( // NOTE: type cast not necessary for int
+        (0xff & s[0]) << 24 | (0xff & s[1]) << 16 | (0xff & s[2]) << 8 | (0xff & s[3]) << 0);
+
+        if (size == 0)
+            return null;
+        
+        // ignore ridiculous sizes
+        // TODO: come up with a better solution than this
+        if (size < 0 || size > (10*1024*1024)) {
+            throw new IOException("Bizarre size "  + size);
+        }
+
+        byte[] v = new byte[size];
+
+        // read the message
+        readBytes(v, size);
+
+        return v;
+    }
+
+    public void send(byte[] v) throws IOException {
+        byte[] s = { 0, 0, 0, 0 };
+        int size = v.length;
+
+        s[3] = (byte) (size & 0xff);
+        size >>= 8;
+        s[2] = (byte) (size & 0xff);
+        size >>= 8;
+        s[1] = (byte) (size & 0xff);
+        size >>= 8;
+        s[0] = (byte) (size & 0xff);
+
+        out.write(s);
+        out.write(v);
+        out.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/Cloner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/Cloner.java b/s4-core/src/main/java/org/apache/s4/util/Cloner.java
new file mode 100644
index 0000000..1c21f18
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/Cloner.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+public interface Cloner {
+    public Object clone(Object o);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/ClonerGenerator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/ClonerGenerator.java b/s4-core/src/main/java/org/apache/s4/util/ClonerGenerator.java
new file mode 100644
index 0000000..7c411b8
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/ClonerGenerator.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Random;
+
+import org.apache.bcel.Constants;
+import org.apache.bcel.classfile.JavaClass;
+import org.apache.bcel.generic.BranchInstruction;
+import org.apache.bcel.generic.ClassGen;
+import org.apache.bcel.generic.ConstantPoolGen;
+import org.apache.bcel.generic.INSTANCEOF;
+import org.apache.bcel.generic.InstructionConstants;
+import org.apache.bcel.generic.InstructionFactory;
+import org.apache.bcel.generic.InstructionHandle;
+import org.apache.bcel.generic.InstructionList;
+import org.apache.bcel.generic.MethodGen;
+import org.apache.bcel.generic.ObjectType;
+import org.apache.bcel.generic.PUSH;
+import org.apache.bcel.generic.Type;
+
+public class ClonerGenerator {
+    public Class generate(Class clazz) {
+        String className = clazz.getName();
+        Random rand = new Random(System.currentTimeMillis());
+        String clonerClassname = "Cloner" + (Math.abs(rand.nextInt() % 3256));
+
+        ClassGen cg = new ClassGen(clonerClassname,
+                                   "java.lang.Object",
+                                   clonerClassname + ".java",
+                                   Constants.ACC_PUBLIC | Constants.ACC_SUPER,
+                                   new String[] { "org.apache.s4.util.Cloner" });
+        ConstantPoolGen cp = cg.getConstantPool();
+        InstructionFactory instFactory = new InstructionFactory(cg, cp);
+
+        InstructionList il = new InstructionList();
+
+        // build constructor method for new class
+        MethodGen constructor = new MethodGen(Constants.ACC_PUBLIC,
+                                              Type.VOID,
+                                              Type.NO_ARGS,
+                                              new String[] {},
+                                              "<init>",
+                                              clonerClassname,
+                                              il,
+                                              cp);
+        il.append(InstructionFactory.createLoad(Type.OBJECT, 0));
+        il.append(instFactory.createInvoke("java.lang.Object",
+                                           "<init>",
+                                           Type.VOID,
+                                           Type.NO_ARGS,
+                                           Constants.INVOKESPECIAL));
+
+        il.append(InstructionFactory.createReturn(Type.VOID));
+        constructor.setMaxStack();
+        constructor.setMaxLocals();
+        cg.addMethod(constructor.getMethod());
+        il.dispose();
+
+        // build clone method
+        il = new InstructionList();
+        MethodGen method = new MethodGen(Constants.ACC_PUBLIC,
+                                         Type.OBJECT,
+                                         new Type[] { Type.OBJECT },
+                                         new String[] { "arg0" },
+                                         "clone",
+                                         clonerClassname,
+                                         il,
+                                         cp);
+
+        il.append(InstructionConstants.ACONST_NULL);
+        il.append(InstructionFactory.createStore(Type.OBJECT, 2));
+        il.append(InstructionFactory.createLoad(Type.OBJECT, 1));
+        il.append(new INSTANCEOF(cp.addClass(new ObjectType(className))));
+        BranchInstruction ifeq_6 = InstructionFactory.createBranchInstruction(Constants.IFEQ,
+                                                                              null);
+        il.append(ifeq_6);
+        il.append(InstructionFactory.createLoad(Type.OBJECT, 1));
+        il.append(instFactory.createCheckCast(new ObjectType(className)));
+        il.append(InstructionFactory.createStore(Type.OBJECT, 2));
+        InstructionHandle ih_14 = il.append(InstructionFactory.createLoad(Type.OBJECT,
+                                                                          2));
+        il.append(new INSTANCEOF(cp.addClass(new ObjectType("java.lang.Cloneable"))));
+        BranchInstruction ifne_18 = InstructionFactory.createBranchInstruction(Constants.IFNE,
+                                                                               null);
+        il.append(ifne_18);
+        il.append(instFactory.createFieldAccess("java.lang.System",
+                                                "out",
+                                                new ObjectType("java.io.PrintStream"),
+                                                Constants.GETSTATIC));
+        il.append(new PUSH(cp, "Not cloneable!"));
+        il.append(instFactory.createInvoke("java.io.PrintStream",
+                                           "println",
+                                           Type.VOID,
+                                           new Type[] { Type.STRING },
+                                           Constants.INVOKEVIRTUAL));
+        il.append(InstructionConstants.ACONST_NULL);
+        il.append(InstructionFactory.createReturn(Type.OBJECT));
+        InstructionHandle ih_31 = il.append(InstructionFactory.createLoad(Type.OBJECT,
+                                                                          2));
+        il.append(instFactory.createInvoke(className,
+                                           "clone",
+                                           Type.OBJECT,
+                                           Type.NO_ARGS,
+                                           Constants.INVOKEVIRTUAL));
+        il.append(InstructionFactory.createReturn(Type.OBJECT));
+        ifeq_6.setTarget(ih_14);
+        ifne_18.setTarget(ih_31);
+        method.setMaxStack();
+        method.setMaxLocals();
+        cg.addMethod(method.getMethod());
+        il.dispose();
+
+        JavaClass jc = cg.getJavaClass();
+        ClonerClassLoader cl = new ClonerClassLoader();
+
+        return cl.loadClassFromBytes(clonerClassname, jc.getBytes());
+    }
+
+    public static class ClonerClassLoader extends URLClassLoader {
+        public ClonerClassLoader() {
+            super(new URL[] {});
+        }
+
+        public Class loadClassFromBytes(String name, byte[] bytes) {
+            try {
+                return this.loadClass(name);
+            } catch (ClassNotFoundException cnfe) {
+                // expected
+            }
+            return this.defineClass(name, bytes, 0, bytes.length);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/DoubleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/DoubleOutputFormatter.java b/s4-core/src/main/java/org/apache/s4/util/DoubleOutputFormatter.java
new file mode 100644
index 0000000..6b795a0
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/DoubleOutputFormatter.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+import org.apache.s4.processor.OutputFormatter;
+
+import org.apache.log4j.Logger;
+
+public class DoubleOutputFormatter implements OutputFormatter {
+
+    @Override
+    public Object format(Object outputValue) {
+        double doubleResult = 0.0;
+
+        if (outputValue instanceof Double) {
+            return outputValue;
+        }
+
+        try {
+            doubleResult = ((Number) outputValue).doubleValue(); // outputValue
+                                                                 // better be
+                                                                 // convertible!!
+        } catch (Exception e) {
+            Logger.getLogger("s4")
+                  .error("Exception converting value to double", e);
+            return null;
+        }
+        return doubleResult;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/GsonUtil.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/GsonUtil.java b/s4-core/src/main/java/org/apache/s4/util/GsonUtil.java
new file mode 100644
index 0000000..5eacda8
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/GsonUtil.java
@@ -0,0 +1,74 @@
+package org.apache.s4.util;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+public class GsonUtil {
+
+    /**
+     * This is a workaround for a bug in Gson:
+     * 
+     * http://code.google.com/p/google-gson/issues/detail?id=279
+     * "Templated collections of collections do not serialize correctly"
+     */
+    private final static JsonSerializer<Object> objectSerializer = new JsonSerializer<Object>() {
+        public JsonElement serialize(Object src, Type typeOfSrc,
+                                     JsonSerializationContext context) {
+
+            if (src.getClass() != Object.class) {
+                return context.serialize(src, src.getClass());
+            }
+
+            return new JsonObject();
+        }
+    };
+
+    private static HashMap<Type, Object> typeAdapters = new HashMap<Type, Object>();
+
+    private volatile static Gson gson = null;
+
+    // build gson ASAP
+    static {
+        build();
+    }
+
+    /**
+     * Add a type adapter to the chain.
+     * 
+     * @param type
+     * @param typeAdapter
+     */
+    public static synchronized void registerTypeAdapter(Type type,
+                                                        Object typeAdapter) {
+        typeAdapters.put(type, typeAdapter);
+        build();
+    }
+
+    /**
+     * Get a Gson instance.
+     * 
+     * @return Gson instance with all the adapters registered.
+     */
+    public static Gson get() {
+        return gson;
+    }
+
+    private static synchronized void build() {
+        GsonBuilder b = (new GsonBuilder()).registerTypeAdapter(Object.class,
+                                                                objectSerializer);
+
+        for (Map.Entry<Type, Object> e : typeAdapters.entrySet()) {
+            b.registerTypeAdapter(e.getKey(), e.getValue());
+        }
+
+        gson = b.create();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/util/KeyUtil.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/KeyUtil.java b/s4-core/src/main/java/org/apache/s4/util/KeyUtil.java
new file mode 100644
index 0000000..082336e
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/util/KeyUtil.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.util;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * from http://github.com/dustin/java-memcached-client/blob/master/src/main/java/net/spy/memcached/KeyUtil.java
+ */
+/**
+ * Utilities for processing key values.
+ */
+public class KeyUtil {
+
+    /**
+     * Get the bytes for a key.
+     * 
+     * @param k
+     *            the key
+     * @return the bytes
+     */
+    public static byte[] getKeyBytes(String k) {
+        try {
+            return k.getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Get the keys in byte form for all of the string keys.
+     * 
+     * @param keys
+     *            a collection of keys
+     * @return return a collection of the byte representations of keys
+     */
+    public static Collection<byte[]> getKeyBytes(Collection<String> keys) {
+        Collection<byte[]> rv = new ArrayList<byte[]>(keys.size());
+        for (String s : keys) {
+            rv.add(getKeyBytes(s));
+        }
+        return rv;
+    }
+}


Mime
View raw message