incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [33/50] [abbrv] Rename packages in preparation for move to Apache
Date Tue, 03 Jan 2012 11:19:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/OverloadDispatcherGenerator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/OverloadDispatcherGenerator.java b/s4-core/src/main/java/io/s4/processor/OverloadDispatcherGenerator.java
deleted file mode 100644
index 33c9b54..0000000
--- a/s4-core/src/main/java/io/s4/processor/OverloadDispatcherGenerator.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-import java.io.FileOutputStream;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.bcel.Constants;
-import org.apache.bcel.classfile.JavaClass;
-import org.apache.bcel.generic.BranchInstruction;
-import org.apache.bcel.generic.ClassGen;
-import org.apache.bcel.generic.ConstantPoolGen;
-import org.apache.bcel.generic.INSTANCEOF;
-import org.apache.bcel.generic.InstructionFactory;
-import org.apache.bcel.generic.InstructionHandle;
-import org.apache.bcel.generic.InstructionList;
-import org.apache.bcel.generic.MethodGen;
-import org.apache.bcel.generic.ObjectType;
-import org.apache.bcel.generic.Type;
-
-public class OverloadDispatcherGenerator {
-    private List<Hierarchy> hierarchies = new ArrayList<Hierarchy>();
-    private Class<?> targetClass;
-    private boolean forSlot = false;
-    private ObjectType abstractWindowingPEType = new ObjectType("io.s4.processor.AbstractWindowingPE");
-    private String classDumpFile;
-
-    public void setClassDumpFile(String classDumpFile) {
-        this.classDumpFile = classDumpFile;
-    }
-
-    public OverloadDispatcherGenerator(Class targetClass) {
-        this(targetClass, false);
-    }
-
-    public OverloadDispatcherGenerator(Class targetClass, boolean forSlot) {
-        this.targetClass = targetClass;
-        this.forSlot = forSlot;
-
-        for (Method method : targetClass.getMethods()) {
-            if (method.getName().equals("processEvent")
-                    && method.getReturnType().equals(Void.TYPE)) {
-                this.addHierarchy((Class<?>) method.getParameterTypes()[0]);
-            }
-        }
-        Collections.sort(hierarchies);
-    }
-
-    public void addHierarchy(Class<?> clazz) {
-        hierarchies.add(new Hierarchy(clazz));
-    }
-
-    public Class<Object> generate() {
-        Random rand = new Random(System.currentTimeMillis());
-        String dispatcherClassName = "OverloadDispatcher"
-                + (Math.abs(rand.nextInt() % 3256));
-
-        String interfaceName = "io.s4.processor.OverloadDispatcher";
-        if (forSlot) {
-            interfaceName = "io.s4.processor.OverloadDispatcherSlot";
-        }
-
-        ClassGen cg = new ClassGen(dispatcherClassName,
-                                   "java.lang.Object",
-                                   dispatcherClassName + ".java",
-                                   Constants.ACC_PUBLIC | Constants.ACC_SUPER,
-                                   new String[] { interfaceName });
-        ConstantPoolGen cp = cg.getConstantPool();
-        InstructionFactory instFactory = new InstructionFactory(cg, cp);
-
-        InstructionList il = new InstructionList();
-
-        // build constructor method for new class
-        MethodGen constructor = new MethodGen(Constants.ACC_PUBLIC,
-                                              Type.VOID,
-                                              Type.NO_ARGS,
-                                              new String[] {},
-                                              "<init>",
-                                              dispatcherClassName,
-                                              il,
-                                              cp);
-        il.append(InstructionFactory.createLoad(Type.OBJECT, 0));
-        il.append(instFactory.createInvoke("java.lang.Object",
-                                           "<init>",
-                                           Type.VOID,
-                                           Type.NO_ARGS,
-                                           Constants.INVOKESPECIAL));
-
-        il.append(InstructionFactory.createReturn(Type.VOID));
-        constructor.setMaxStack();
-        constructor.setMaxLocals();
-        cg.addMethod(constructor.getMethod());
-        il.dispose();
-
-        // build dispatch method
-        il = new InstructionList();
-
-        Type[] dispatchArgumentTypes = null;
-        String[] dispatchArgumentNames = null;
-        int postArgumentVariableSlot = 3;
-        if (forSlot) {
-            dispatchArgumentTypes = new Type[] { ObjectType.OBJECT,
-                    ObjectType.OBJECT, ObjectType.LONG, abstractWindowingPEType };
-            dispatchArgumentNames = new String[] { "slot", "event", "slotTime",
-                    "pe" };
-            postArgumentVariableSlot = 6;
-        } else {
-            dispatchArgumentTypes = new Type[] { ObjectType.OBJECT,
-                    ObjectType.OBJECT };
-            dispatchArgumentNames = new String[] { "pe", "event" };
-
-        }
-
-        MethodGen method = new MethodGen(Constants.ACC_PUBLIC,
-                                         Type.VOID,
-                                         dispatchArgumentTypes,
-                                         dispatchArgumentNames,
-                                         "dispatch",
-                                         dispatcherClassName,
-                                         il,
-                                         cp);
-
-        List<InstructionHandle> targetInstructions = new ArrayList<InstructionHandle>();
-        List<BranchInstruction> branchInstructions = new ArrayList<BranchInstruction>();
-        List<BranchInstruction> gotoInstructions = new ArrayList<BranchInstruction>();
-
-        ObjectType peType = new ObjectType(targetClass.getName());
-
-        il.append(InstructionFactory.createLoad(Type.OBJECT, 1));
-        il.append(instFactory.createCheckCast(peType));
-        il.append(InstructionFactory.createStore(peType,
-                                                 postArgumentVariableSlot));
-
-        for (int i = 0; i < hierarchies.size(); i++) {
-            Hierarchy hierarchy = hierarchies.get(i);
-
-            ObjectType hierarchyTop = new ObjectType(hierarchy.getTop()
-                                                              .getName());
-
-            InstructionHandle ih = il.append(InstructionFactory.createLoad(Type.OBJECT,
-                                                                           2));
-            if (i > 0) {
-                targetInstructions.add(ih);
-            }
-
-            il.append(new INSTANCEOF(cp.addClass(hierarchyTop)));
-            BranchInstruction bi = InstructionFactory.createBranchInstruction(Constants.IFEQ,
-                                                                              null);
-            il.append(bi);
-            branchInstructions.add(bi);
-
-            il.append(InstructionFactory.createLoad(peType,
-                                                    postArgumentVariableSlot));
-            il.append(InstructionFactory.createLoad(hierarchyTop, 2));
-            il.append(instFactory.createCheckCast(hierarchyTop));
-            if (forSlot) {
-                il.append(InstructionFactory.createLoad(ObjectType.LONG, 3));
-                il.append(InstructionFactory.createLoad(abstractWindowingPEType,
-                                                        5));
-            }
-
-            Type[] argumentTypes = null;
-            if (forSlot) {
-                argumentTypes = new Type[] { hierarchyTop, ObjectType.LONG,
-                        abstractWindowingPEType };
-            } else {
-                argumentTypes = new Type[] { hierarchyTop };
-            }
-            il.append(instFactory.createInvoke(targetClass.getName(),
-                                               "processEvent",
-                                               Type.VOID,
-                                               argumentTypes,
-                                               Constants.INVOKEVIRTUAL));
-
-            // no branch needed for last check
-            if (i < (hierarchies.size() - 1)) {
-                bi = InstructionFactory.createBranchInstruction(Constants.GOTO,
-                                                                null);
-                il.append(bi);
-                gotoInstructions.add(bi);
-            }
-        }
-
-        InstructionHandle returnInstruction = il.append(InstructionFactory.createReturn(Type.VOID));
-
-        for (int i = 0; i < targetInstructions.size(); i++) {
-            branchInstructions.get(i).setTarget(targetInstructions.get(i));
-        }
-
-        branchInstructions.get(branchInstructions.size() - 1)
-                          .setTarget(returnInstruction);
-
-        for (BranchInstruction gotoInstruction : gotoInstructions) {
-            gotoInstruction.setTarget(returnInstruction);
-        }
-
-        method.setMaxStack();
-        method.setMaxLocals();
-        cg.addMethod(method.getMethod());
-        il.dispose();
-
-        JavaClass jc = cg.getJavaClass();
-        OverloadDispatcherClassLoader cl = new OverloadDispatcherClassLoader();
-
-        // debug
-        if (classDumpFile != null) {
-            FileOutputStream fos = null;
-            try {
-                fos = new FileOutputStream(classDumpFile);
-                fos.write(jc.getBytes());
-            } catch (Exception e) {
-                e.printStackTrace();
-            } finally {
-                if (fos != null)
-                    try {
-                        fos.close();
-                    } catch (Exception e) {
-                    }
-            }
-        }
-
-        return cl.loadClassFromBytes(dispatcherClassName, jc.getBytes());
-
-    }
-
-    public static class Hierarchy implements Comparable<Hierarchy> {
-        private List<Class<?>> classes = new ArrayList<Class<?>>();
-
-        public Hierarchy(Class<?> clazz) {
-            for (Class<?> currentClass = clazz; currentClass != null; currentClass = (Class) currentClass.getSuperclass()) {
-                classes.add(currentClass);
-            }
-        }
-
-        public Class<?> getTop() {
-            if (classes.size() < 1) {
-                return null;
-            }
-            return classes.get(0);
-        }
-
-        public List<Class<?>> getClasses() {
-            return classes;
-        }
-
-        public boolean equals(Hierarchy other) {
-            if (classes.size() != other.classes.size()) {
-                return false;
-            }
-
-            for (int i = 0; i < classes.size(); i++) {
-                if (!classes.get(i).equals(other.classes.get(i))) {
-                    return false;
-                }
-            }
-
-            return true;
-        }
-
-        public int compareTo(Hierarchy other) {
-            if (this.equals(other)) {
-                return 0;
-            } else if (this.containsClass(other.getTop())) {
-                return -1;
-            }
-
-            return 1;
-        }
-
-        private boolean containsClass(Class<?> other) {
-            for (Class<?> clazz : classes) {
-                if (clazz.equals(other)) {
-                    return true;
-                }
-            }
-            return false;
-        }
-    }
-
-    public static class OverloadDispatcherClassLoader extends URLClassLoader {
-        public OverloadDispatcherClassLoader() {
-            super(new URL[] {});
-        }
-
-        public Class loadClassFromBytes(String name, byte[] bytes) {
-            try {
-                return this.loadClass(name);
-            } catch (ClassNotFoundException cnfe) {
-                // expected
-            }
-            return this.defineClass(name, bytes, 0, bytes.length);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/OverloadDispatcherSlot.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/OverloadDispatcherSlot.java b/s4-core/src/main/java/io/s4/processor/OverloadDispatcherSlot.java
deleted file mode 100644
index c96518c..0000000
--- a/s4-core/src/main/java/io/s4/processor/OverloadDispatcherSlot.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-public interface OverloadDispatcherSlot {
-    public void dispatch(Object slot, Object event, long slotTime, AbstractWindowingPE pe);
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/PEContainer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/PEContainer.java b/s4-core/src/main/java/io/s4/processor/PEContainer.java
deleted file mode 100644
index 6e994e8..0000000
--- a/s4-core/src/main/java/io/s4/processor/PEContainer.java
+++ /dev/null
@@ -1,431 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-import static io.s4.util.MetricsName.S4_APP_METRICS;
-import static io.s4.util.MetricsName.S4_CORE_METRICS;
-import static io.s4.util.MetricsName.pecontainer_ev_dq_ct;
-import static io.s4.util.MetricsName.pecontainer_ev_err_ct;
-import static io.s4.util.MetricsName.pecontainer_ev_nq_ct;
-import static io.s4.util.MetricsName.pecontainer_ev_process_ct;
-import static io.s4.util.MetricsName.pecontainer_exec_elapse_time;
-import static io.s4.util.MetricsName.pecontainer_msg_drop_ct;
-import static io.s4.util.MetricsName.pecontainer_pe_ct;
-import static io.s4.util.MetricsName.pecontainer_qsz;
-import static io.s4.util.MetricsName.pecontainer_qsz_w;
-import io.s4.collector.EventWrapper;
-import io.s4.dispatcher.partitioner.CompoundKeyInfo;
-import io.s4.ft.CheckpointingEvent;
-import io.s4.ft.SafeKeeper;
-import io.s4.logger.Monitor;
-import io.s4.util.clock.Clock;
-import io.s4.util.clock.EventClock;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.log4j.Logger;
-
-public class PEContainer implements Runnable, AsynchronousEventProcessor {
-    private static Logger logger = Logger.getLogger(PEContainer.class);
-    BlockingQueue<EventWrapper> workQueue;
-    private List<PrototypeWrapper> prototypeWrappers = new ArrayList<PrototypeWrapper>();
-    private Monitor monitor;
-    private Clock clock;
-    private int maxQueueSize = 1000;
-    private boolean trackByKey;
-    private Map<String, Integer> countByEventType = Collections.synchronizedMap(new HashMap<String, Integer>());
-	private SafeKeeper safeKeeper;
-
-    private ControlEventProcessor controlEventProcessor = null;
-
-    public void setMaxQueueSize(int maxQueueSize) {
-        this.maxQueueSize = maxQueueSize;
-    }
-
-    public void setMonitor(Monitor monitor) {
-        this.monitor = monitor;
-    }
-
-    public void setClock(Clock s4Clock) {
-        this.clock = s4Clock;
-    }
-
-    public Clock getClock() {
-        return clock;
-    }
-
-    public void setTrackByKey(boolean trackByKey) {
-        this.trackByKey = trackByKey;
-    }
-
-	public void setSafeKeeper(SafeKeeper sk) {
-		this.safeKeeper = sk;
-	}
-
-    public void addProcessor(AbstractPE processor) {
-        System.out.println("adding pe: " + processor);
-        PrototypeWrapper pw = new PrototypeWrapper(processor, clock);
-        prototypeWrappers.add(pw);
-        adviceLists.add(pw.advise());
-    }
-
-    public void setProcessors(AbstractPE[] processors) {
-        for (int i = 0; i < processors.length; i++) {
-            addProcessor(processors[i]);
-        }
-    }
-
-    public void setControlEventProcessor(ControlEventProcessor cep) {
-        this.controlEventProcessor = cep;
-    }
-
-    public PEContainer() {
-
-    }
-
-    List<List<EventAdvice>> adviceLists = new ArrayList<List<EventAdvice>>();
-
-    public void init() {
-        workQueue = new LinkedBlockingQueue<EventWrapper>(maxQueueSize);
-        for (PrototypeWrapper pw : prototypeWrappers) {
-            adviceLists.add(pw.advise());
-        }
-        Thread t = new Thread(this, "PEContainer");
-        t.start();
-        t = new Thread(new Watcher());
-        t.start();
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * io.s4.processor.AsynchronousEventProcessor#queueWork(io.s4.collector.
-     * EventWrapper)
-     */
-    @Override
-    public void queueWork(EventWrapper eventWrapper) {
-        boolean isAddSucceed = false;
-
-        try {
-            isAddSucceed = workQueue.offer(eventWrapper);
-            if (monitor != null) {
-                if (isAddSucceed) {
-                    monitor.increment(pecontainer_ev_nq_ct.toString(),
-                                      1,
-                                      S4_CORE_METRICS.toString());
-                } else {
-                    monitor.increment(pecontainer_msg_drop_ct.toString(),
-                                      1,
-                                      S4_CORE_METRICS.toString());
-                }
-                monitor.set(pecontainer_qsz.toString(),
-                            getQueueSize(),
-                            S4_CORE_METRICS.toString());
-            }
-        } catch (Exception e) {
-            logger.error("metrics name doesn't exist", e);
-        }
-    }
-
-    // This will always be called by a different thread than the one executing
-    // run()
-    /*
-     * (non-Javadoc)
-     * 
-     * @see io.s4.processor.AsynchronousEventProcessor#getQueueSize()
-     */
-    @Override
-    public int getQueueSize() {
-        return workQueue.size();
-    }
-
-    /**
-     * An event is a control event if its stream name begins with the character
-     * '#'.
-     * 
-     * Control events are handled specially.
-     * 
-     * @param e
-     *            the event wrapper to test
-     * @return true if and only if e is a control message.
-     */
-    private boolean testControlEvent(EventWrapper e) {
-        String streamName = e.getStreamName();
-
-        if (streamName.length() > 0 && streamName.charAt(0) == '#')
-            return true;
-
-        return false;
-    }
-
-    public void run() {
-        long startTime, endTime;
-        while (true) {
-            EventWrapper eventWrapper = null;
-            try {
-                eventWrapper = workQueue.take();
-                if (clock instanceof EventClock) {
-                    EventClock eventClock = (EventClock) clock;
-                    eventClock.update(eventWrapper);
-                    // To what time to update the clock
-                }
-                if (trackByKey) {
-                    boolean foundOne = false;
-                    for (CompoundKeyInfo compoundKeyInfo : eventWrapper.getCompoundKeys()) {
-                        foundOne = true;
-                        updateCount(eventWrapper.getStreamName() + " "
-                                + compoundKeyInfo.getCompoundKey());
-                    }
-
-                    if (!foundOne) {
-                        updateCount(eventWrapper.getStreamName() + " *");
-                    }
-                }
-
-                startTime = System.currentTimeMillis();
-                if (logger.isDebugEnabled()) {
-                    logger.debug("STEP 5 (PEContainer): workQueue.take - "
-                            + eventWrapper.toString());
-                }
-                // Logger.getLogger("s4").debug(
-                // "Incoming: " + event.getEventName());
-                if (monitor != null) {
-                    monitor.increment(pecontainer_ev_dq_ct.toString(),
-                                      1,
-                                      S4_CORE_METRICS.toString());
-                }
-                // printPlainPartitionInfoList(event.getCompoundKeyList());
-
-                if (eventWrapper.getStreamName().endsWith("_checkpointing")
-                        || eventWrapper.getStreamName().endsWith("_recovery")) {
-                    // in that case, we don't need to iterate over all prototypes and advises: 
-                    // the target PE is specified in the event
-                    handleCheckpointingOrRecovery(eventWrapper);
-                } else {
-
-                    boolean ctrlEvent = testControlEvent(eventWrapper);
-
-                    // execute the PEs interested in this event
-                    for (int i = 0; i < prototypeWrappers.size(); i++) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("STEP 6 (PEContainer): prototypeWrappers("
-                                    + i
-                                    + ") - "
-                                    + prototypeWrappers.get(i).toString()
-                                    + " - " + eventWrapper.getStreamName());
-                        }
-
-                        // first check if this is a control message and handle
-                        // it if
-                        // so.
-                        if (ctrlEvent) {
-                            if (controlEventProcessor != null) {
-                                controlEventProcessor.process(eventWrapper,
-                                        prototypeWrappers.get(i));
-                            }
-
-                            continue;
-                        }
-
-                        // otherwise, continue processing event.
-                        List<EventAdvice> adviceList = adviceLists.get(i);
-                        for (EventAdvice eventAdvice : adviceList) {
-                            if (eventAdvice.getEventName().equals("*")
-                                    || eventAdvice.getEventName().equals(
-                                            eventWrapper.getStreamName())) {
-                                // event name matches
-                            } else {
-                                continue;
-                            }
-
-                            if (eventAdvice.getKey().equals("*")) {
-                                invokePE(prototypeWrappers.get(i).getPE("*"),
-                                        eventWrapper, null);
-                                continue;
-                            }
-                            
-                            for (CompoundKeyInfo compoundKeyInfo : eventWrapper
-                                    .getCompoundKeys()) {
-                                if (eventAdvice.getKey().equals(
-                                        compoundKeyInfo.getCompoundKey())) {
-                                    invokePE(
-                                            prototypeWrappers
-                                                    .get(i)
-                                                    .getPE(compoundKeyInfo
-                                                            .getCompoundValue()),
-                                            eventWrapper, compoundKeyInfo);
-                                }
-                            }
-                        }
-                    }
-                }
-
-                endTime = System.currentTimeMillis();
-                if (monitor != null) {
-                    // TODO: need to be changed for more accurate calc
-                    monitor.increment(pecontainer_exec_elapse_time.toString(),
-                                      (int) (endTime - startTime),
-                                      S4_CORE_METRICS.toString());
-                }
-            } catch (InterruptedException ie) {
-                Logger.getLogger("s4").warn("PEContainer is interrupted", ie);
-                return;
-            } catch (Exception e) {
-                Logger.getLogger("s4")
-                      .error("Exception choosing processing element to run", e);
-            }
-        }
-    }
-
-    private void handleCheckpointingOrRecovery(EventWrapper eventWrapper) {
-        CheckpointingEvent checkpointingEvent = null;
-        try {
-            checkpointingEvent = (CheckpointingEvent) eventWrapper.getEvent();
-        } catch (ClassCastException e) {
-            logger.error("Checkpointing stream ["
-                    + eventWrapper.getStreamName()
-                    + "] can only handle checkpointing events. Received event is not a checkpointing event; it will be ignored.");
-            return;
-        }
-        // 1. event is targeted towards PE prototype whose name is given by the
-        // name of the stream
-        // 2. PE id is given by the event
-        for (int i = 0; i < prototypeWrappers.size(); i++) {
-            if (checkpointingEvent.getSafeKeeperId().getPrototypeId()
-                    .equals(prototypeWrappers.get(i).getId())) {
-                
-                // check that PE is subscribed to checkpointing stream
-                List<EventAdvice> advices = adviceLists.get(i);
-                for (EventAdvice eventAdvice : advices) {
-                    if (eventAdvice.getEventName().equals(eventWrapper.getStreamName())){
-                        invokePE(
-                                prototypeWrappers.get(i).getPE(
-                                        checkpointingEvent.getSafeKeeperId().getKey()),
-                                        eventWrapper, null);
-                        break;
-                    }
-                }
-            }
-        }
-        
-
-    }
-
-    private void invokePE(AbstractPE pe, EventWrapper eventWrapper,
-                          CompoundKeyInfo compoundKeyInfo) {
-        try {
-            long startTime = System.currentTimeMillis();
-            pe.execute(eventWrapper.getStreamName(),
-                       compoundKeyInfo,
-                       eventWrapper.getEvent());
-            long endTime = System.currentTimeMillis();
-            if (monitor != null) {
-                monitor.increment(pecontainer_ev_process_ct.toString(),
-                                  1,
-                                  S4_CORE_METRICS.toString());
-                monitor.increment(pecontainer_ev_process_ct.toString(),
-                                  1,
-                                  S4_APP_METRICS.toString(),
-                                  "at",
-                                  pe.getId());
-                monitor.increment(pecontainer_exec_elapse_time.toString(),
-                                  (int) (endTime - startTime),
-                                  S4_APP_METRICS.toString(),
-                                  "at",
-                                  eventWrapper.getStreamName());
-            }
-        } catch (Exception e) {
-            if (monitor != null) {
-                monitor.increment(pecontainer_ev_err_ct.toString(),
-                                  1,
-                                  S4_CORE_METRICS.toString());
-                monitor.increment(pecontainer_ev_err_ct.toString(),
-                                  1,
-                                  S4_APP_METRICS.toString(),
-                                  "at",
-                                  pe.getId());
-            }
-            Logger.getLogger("s4")
-                  .error("Exception running processing element", e);
-        }
-
-    }
-
-    private void updateCount(String key) {
-        Integer countObj = countByEventType.get(key);
-        if (countObj == null) {
-            countObj = 0;
-        }
-        countObj++;
-        countByEventType.put(key, countObj);
-    }
-
-    class Watcher implements Runnable {
-        public void run() {
-            while (!Thread.interrupted()) {
-                try {
-                    int peCount = 0;
-                    for (PrototypeWrapper pw : prototypeWrappers) {
-                        peCount += pw.getPECount();
-                        if (monitor != null) {
-                            monitor.set(pecontainer_pe_ct.toString(),
-                                        pw.getPECount(),
-                                        S4_APP_METRICS.toString(),
-                                        "at",
-                                        pw.getId());
-                        }
-                    }
-
-                    Logger.getLogger("s4").info("PE count " + peCount);
-                    if (monitor != null) {
-                        monitor.set(pecontainer_pe_ct.toString(),
-                                    peCount,
-                                    S4_CORE_METRICS.toString());
-                        monitor.set(pecontainer_qsz_w.toString(),
-                                    getQueueSize(),
-                                    S4_CORE_METRICS.toString());
-                    }
-
-                    if (trackByKey) {
-                        for (String key : countByEventType.keySet()) {
-                            Integer countObj = countByEventType.get(key);
-                            if (countObj != null) {
-                                Logger.getLogger("s4").info("Count by " + key
-                                        + ": " + countObj);
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    Logger.getLogger("s4")
-                          .error("Exception running PEContainer watcher", e);
-                } finally {
-                    try {
-                        Thread.sleep(10000);
-                    } catch (InterruptedException ie) {
-                        Thread.currentThread().interrupt();
-                    }
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/PrintEventPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/PrintEventPE.java b/s4-core/src/main/java/io/s4/processor/PrintEventPE.java
deleted file mode 100644
index 546f53e..0000000
--- a/s4-core/src/main/java/io/s4/processor/PrintEventPE.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-import org.apache.log4j.Logger;
-
-public class PrintEventPE extends AbstractPE {
-
-    @Override
-    public void output() {
-        // TODO Auto-generated method stub
-
-    }
-
-    public void processEvent(Object event) {
-        Logger.getLogger("s4").info(event);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/PrototypeWrapper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/PrototypeWrapper.java b/s4-core/src/main/java/io/s4/processor/PrototypeWrapper.java
deleted file mode 100644
index 93a2d16..0000000
--- a/s4-core/src/main/java/io/s4/processor/PrototypeWrapper.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-import io.s4.ft.SafeKeeper;
-import io.s4.persist.ConMapPersister;
-import io.s4.persist.Persister;
-import io.s4.util.clock.Clock;
-
-import java.util.List;
-
-import org.apache.log4j.Logger;
-
-public class PrototypeWrapper {
-
-    private static Logger logger = Logger.getLogger(PrototypeWrapper.class);
-    private AbstractPE prototype;
-    Persister lookupTable;
-	SafeKeeper safeKeeper;
-
-    public String getId() {
-        return prototype.getId();
-    }
-
-    public PrototypeWrapper(AbstractPE prototype, Clock s4Clock) {
-        this.prototype = prototype;
-        lookupTable = new ConMapPersister(s4Clock);
-        // TODO lookup table with PEIds
-        System.out.println("Using ConMapPersister ..");
-        // this bit of reflection is not a performance issue because it is only
-        // invoked at configuration time
-        try {
-            ((ConMapPersister) lookupTable).setSelfClean(true);
-            ((ConMapPersister) lookupTable).init();
-            // set the persister in prototype
-            prototype.setLookupTable(lookupTable);
-            prototype.setPrototypeWrapper(this);
-        } catch (Exception e) {
-            // this is not expected
-            Logger.getLogger("s4")
-                  .error("Exception invoking setLookupTable on prototype", e);
-        }
-    }
-
-
-    /**
-     * Find PE corresponding to keyValue. If no such PE exists, then a new one
-     * is created by cloning the prototype and this is returned. As a
-     * side-effect, the last update time for the PE in the lookup table is
-     * modified.
-     * 
-     * @param keyValue
-     *            key value
-     * @return PE corresponding to keyValue.
-     */
-    public AbstractPE getPE(String keyValue) {
-        AbstractPE pe = null;
-        try {
-            pe = (AbstractPE) lookupTable.get(keyValue);
-            if (pe == null) {
-                pe = (AbstractPE) prototype.clone();
-                pe.initInstance();
-            }
-            // update the last update time on the entry
-            lookupTable.set(keyValue, pe, prototype.getTtl());
-
-        } catch (Exception e) {
-            logger.error("exception when looking up pe for key:" + keyValue, e);
-        }
-
-        return pe;
-    }
-
-    /**
-     * Find PE corresponding to keyValue. If no such PE exists, then null is
-     * returned. Note: the last update time is not modified in the lookup table.
-     * 
-     * @param keyValue
-     *            key value
-     * @return PE corresponding to keyValue, if such a PE exists. Null
-     *         otherwise.
-     */
-    public AbstractPE lookupPE(String keyValue) {
-        AbstractPE pe = null;
-
-        try {
-            pe = (AbstractPE) lookupTable.get(keyValue);
-
-        } catch (Exception e) {
-            logger.error("exception when looking up pe for key:" + keyValue, e);
-        }
-
-        return pe;
-    }
-    
-    public void expire(String keyValue) {
-        try {
-           lookupTable.set(keyValue, null, 0);
-        } catch (Exception e) {
-            logger.error("exception when removing pe for key:" + keyValue, e);
-        }
-   
-    }
-
-    public int getPECount() {
-        return lookupTable.keySet().size();
-    }
-
-    public List<EventAdvice> advise() {
-        return prototype.advise();
-    }
-
-	public void setSafeKeeper(SafeKeeper safeKeeper) {
-		this.safeKeeper = safeKeeper;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/ReroutePE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/ReroutePE.java b/s4-core/src/main/java/io/s4/processor/ReroutePE.java
deleted file mode 100644
index 64b51ba..0000000
--- a/s4-core/src/main/java/io/s4/processor/ReroutePE.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-import io.s4.dispatcher.EventDispatcher;
-import io.s4.dispatcher.transformer.Transformer;
-import io.s4.util.Cloner;
-import io.s4.util.ClonerGenerator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class ReroutePE extends AbstractPE {
-    private static Logger logger = Logger.getLogger(ReroutePE.class);
-    private EventDispatcher dispatcher;
-    private Transformer[] transformers = new Transformer[0];
-    // private List<EventAdvice> keys;
-    private String outputStreamName;
-
-    public void setDispatcher(EventDispatcher dispatcher) {
-        this.dispatcher = dispatcher;
-    }
-
-    public void setTransformers(Transformer[] transformers) {
-        this.transformers = transformers;
-    }
-
-    // public void setEventNames(String[] eventNames) {
-    // keys = new ArrayList<EventAdvice>();
-    // for (String eventName : eventNames) {
-    // keys.add(new EventAdvice(eventName, "*"));
-    // }
-    // }
-
-    // @Override
-    // public List<EventAdvice> advise() {
-    // return keys;
-    // }
-
-    public void setOutputStreamName(String outputStreamName) {
-        this.outputStreamName = outputStreamName;
-    }
-
-    private Map<String, Cloner> clonerMap = new HashMap<String, Cloner>();
-
-    public void processEvent(Object event) {
-        Object newEvent = event;
-        if (transformers != null && transformers.length > 0) {
-            Cloner cloner = clonerMap.get(event.getClass().getName());
-            if (cloner == null) {
-                ClonerGenerator cg = new ClonerGenerator();
-                // generate byte code that knows how to call the clone method on
-                // this event
-                Class clonerClass = cg.generate(event.getClass());
-                try {
-                    cloner = (Cloner) clonerClass.newInstance();
-                    clonerMap.put(event.getClass().getName(), cloner);
-                } catch (InstantiationException ie) {
-                    Logger.getLogger(this.getClass()).error(ie);
-                    throw new RuntimeException(ie);
-                } catch (IllegalAccessException ias) {
-                    Logger.getLogger(this.getClass()).error(ias);
-                    throw new RuntimeException(ias);
-                }
-            }
-            newEvent = cloner.clone(event);
-        }
-
-        for (Transformer transformer : transformers) {
-            newEvent = transformer.transform(newEvent);
-            if (newEvent == null) {
-                return;
-            }
-        }
-
-        dispatcher.dispatchEvent(outputStreamName, newEvent);
-    }
-
-    public int getTtl() {
-        return -1; // never die
-    }
-
-    @Override
-    public void output() {
-        // TODO Auto-generated method stub
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/SimpleCountingPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/SimpleCountingPE.java b/s4-core/src/main/java/io/s4/processor/SimpleCountingPE.java
deleted file mode 100644
index 5aea05c..0000000
--- a/s4-core/src/main/java/io/s4/processor/SimpleCountingPE.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-import io.s4.persist.Persister;
-
-import java.util.List;
-
-public class SimpleCountingPE extends AbstractPE {
-    private boolean clearOnOutput;
-    private OutputFormatter outputFormatter;
-    private Persister persister;
-    private int persistTime;
-    private String keyPrefix = "s4:counter";
-    private boolean dirty = false;
-
-    public void setClearOnOutput(boolean clearOnOutput) {
-        this.clearOnOutput = clearOnOutput;
-    }
-
-    public void setOutputFormatter(OutputFormatter outputFormatter) {
-        this.outputFormatter = outputFormatter;
-    }
-
-    public void setPersister(Persister persister) {
-        this.persister = persister;
-    }
-
-    public void setPersistTime(int persistTime) {
-        this.persistTime = persistTime;
-    }
-
-    public void setKeyPrefix(String keyPrefix) {
-        this.keyPrefix = keyPrefix;
-    }
-
-    private long counter = 0;
-
-    @Override
-    public void output() {
-        Object outputValue = null;
-        synchronized (this) {
-            if (!dirty) {
-                return;
-            }
-            outputValue = new Long(counter);
-            if (clearOnOutput) {
-                counter = 0;
-            }
-            dirty = false;
-        }
-
-        List<Object> simpleKeyValues = this.getKeyValue();
-        StringBuffer keyBuffer = new StringBuffer(keyPrefix);
-        for (Object simpleKeyValue : simpleKeyValues) {
-            keyBuffer.append(":");
-            keyBuffer.append(String.valueOf(simpleKeyValue));
-        }
-
-        if (outputFormatter != null) {
-            outputValue = outputFormatter.format(outputValue);
-        }
-
-        try {
-            persister.set(keyBuffer.toString(), outputValue, persistTime);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        }
-
-    }
-
-    public void processEvent(Object event) {
-        synchronized (this) {
-            counter++;
-            dirty = true;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/schema/Schema.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/schema/Schema.java b/s4-core/src/main/java/io/s4/schema/Schema.java
deleted file mode 100644
index 31582db..0000000
--- a/s4-core/src/main/java/io/s4/schema/Schema.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.schema;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class Schema {
-    private Map<String, Property> properties = new HashMap<String, Property>();
-    @SuppressWarnings("unchecked")
-    private Class type;
-
-    public Map<String, Property> getProperties() {
-        return Collections.unmodifiableMap(properties);
-    }
-
-    @SuppressWarnings("unchecked")
-    public Class getType() {
-        return type;
-    }
-
-    @SuppressWarnings("unchecked")
-    public Schema(Class clazz) {
-        type = clazz;
-        for (Method method : clazz.getMethods()) {
-            if (!method.getName().startsWith("set")
-		|| !method.getReturnType().equals(Void.TYPE)
-		|| method.getParameterTypes().length != 1) {
-                continue;
-            }
-
-            String getterMethodName = method.getName().replaceFirst("set",
-                                                                    "get");
-
-            String propertyName = "";
-            if (method.getName().length() > 4) {
-                propertyName = method.getName().substring(4);
-            }
-            propertyName = method.getName().substring(3, 4).toLowerCase()
-                    + propertyName;
-
-            Type parameterType = method.getGenericParameterTypes()[0];
-
-            Method getterMethod = null;
-            try {
-                getterMethod = clazz.getMethod(getterMethodName, new Class[] {});
-                /*
-                 * if (!getterMethod.getReturnType().equals(parameterType)) {
-                 * getterMethod = null; }
-                 */
-            } catch (NoSuchMethodException nsme) {
-                // this is an acceptable possibility, ignore
-                // nsme.printStackTrace();
-            }
-
-            Property property = Property.getProperty(propertyName,
-                                                     getterMethod,
-                                                     method,
-                                                     parameterType);
-            if (property != null) {
-                properties.put(propertyName, property);
-            }
-        }
-    }
-
-    public String toString() {
-        return this.toString("");
-    }
-
-    public String toString(String indent) {
-        StringBuffer sb = new StringBuffer();
-        sb.append(indent).append("{\n");
-        for (Property property : properties.values()) {
-            sb.append(property.toString(indent + "   "));
-        }
-
-        sb.append(indent).append("}\n");
-        return sb.toString();
-    }
-
-    static public class Property {
-        @SuppressWarnings("unchecked")
-        private static List<Class> nonBeanClasses = new ArrayList<Class>();
-        static {
-            nonBeanClasses.add(String.class);
-            nonBeanClasses.add(Number.class);
-        }
-        @SuppressWarnings("unchecked")
-        private Class type;
-        private Property componentProperty;
-        private Schema schema;
-        private String name;
-        private Method getterMethod;
-        private Method setterMethod;
-        private boolean isList;
-        private boolean isNumber;
-
-        @SuppressWarnings("unchecked")
-        public Class getType() {
-            return type;
-        }
-
-        public Schema getSchema() {
-            return schema;
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public Method getGetterMethod() {
-            return getterMethod;
-        }
-
-        public Method getSetterMethod() {
-            return setterMethod;
-        }
-
-        public boolean isList() {
-            return isList;
-        }
-
-        public boolean isNumber() {
-            return isNumber;
-        }
-
-        public Property getComponentProperty() {
-            return componentProperty;
-        }
-
-        public static Property getProperty(String propertyName, Method getterMethod, Method setterMethod, Type parameterType) {
-            if (parameterType instanceof Class) {
-                Class ptClass = (Class) parameterType;
-
-                if (ptClass.isArray()) {
-                    Class componentType = ptClass.getComponentType();
-                    return new Property(propertyName,
-                                        getterMethod,
-                                        setterMethod,
-                                        ptClass,
-                                        getProperty("component",
-                                                    null,
-                                                    null,
-                                                    componentType));
-                } else if (ptClass.isPrimitive()) {
-                    return new Property(propertyName,
-                                        getterMethod,
-                                        setterMethod,
-                                        ptClass);
-                }
-
-                List<Class> hierarchy = getHieararchy(ptClass);
-
-                for (Class nonBeanClass : nonBeanClasses) {
-                    if (hierarchy.contains(nonBeanClass)) {
-                        return new Property(propertyName,
-                                            getterMethod,
-                                            setterMethod,
-                                            ptClass);
-                    }
-                }
-
-                // if here, then must have no-arg constructor and some bean-like
-                // properties
-                boolean noArgConstructorFound = false;
-                for (Constructor constructor : ptClass.getConstructors()) {
-                    if (constructor.getGenericParameterTypes().length == 0) {
-                        noArgConstructorFound = true;
-                        break;
-                    }
-                }
-
-                if (!noArgConstructorFound) {
-                    return null;
-                }
-
-                Schema schema = new Schema(ptClass);
-                if (schema.getProperties().size() == 0) {
-                    return null;
-                }
-
-                return new Property(propertyName,
-                                    getterMethod,
-                                    setterMethod,
-                                    ptClass,
-                                    schema);
-            } else if (parameterType instanceof ParameterizedType) {
-                ParameterizedType pt = (ParameterizedType) parameterType;
-                Class rawType = (Class) pt.getRawType();
-
-                Class[] interfaces = rawType.getInterfaces();
-                if (!rawType.equals(List.class)
-                        && !Arrays.asList(interfaces).contains(List.class)) {
-                    return null;
-                }
-
-                Type[] parameterInstantiations = pt.getActualTypeArguments();
-                if (parameterInstantiations.length != 1) {
-                    return null;
-                }
-
-                Type pIType = parameterInstantiations[0];
-
-                return new Property(propertyName,
-                                    getterMethod,
-                                    setterMethod,
-                                    rawType,
-                                    getProperty("component", null, null, pIType));
-            }
-
-            return null;
-        }
-
-        private static List<Class> getHieararchy(Class clazz) {
-            List<Class> hierarchy = new ArrayList<Class>();
-
-            for (Class level = clazz; level != null; level = level.getSuperclass()) {
-                hierarchy.add(level);
-            }
-            return hierarchy;
-        }
-
-        private Property(String name, Method getterMethod, Method setterMethod,
-                Class type) {
-            this.name = name;
-            this.getterMethod = getterMethod;
-            this.setterMethod = setterMethod;
-            this.type = type;
-
-            Class[] interfaces = type.getInterfaces();
-            if (type.equals(List.class)
-                    || Arrays.asList(interfaces).contains(List.class)) {
-                isList = true;
-            } else if (getHieararchy(type).contains(Number.class)) {
-                isNumber = true;
-            }
-        }
-
-        private Property(String name, Method getterMethod, Method setterMethod,
-                Class type, Schema schema) {
-            this(name, getterMethod, setterMethod, type);
-            this.schema = schema;
-        }
-
-        private Property(String name, Method getterMethod, Method setterMethod,
-                Class type, Property componentProperty) {
-            this(name, getterMethod, setterMethod, type);
-            this.componentProperty = componentProperty;
-        }
-
-        public String toString() {
-            return toString("");
-        }
-
-        public String toString(String indent) {
-            StringBuffer sb = new StringBuffer();
-            sb.append(indent)
-              .append(type.getName())
-              .append(" ")
-              .append(name)
-              .append("\n");
-            if (schema != null) {
-                sb.append(schema.toString(indent));
-            }
-            if (componentProperty != null) {
-                sb.append(componentProperty.toString(indent));
-            }
-
-            return sb.toString();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/schema/SchemaContainer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/schema/SchemaContainer.java b/s4-core/src/main/java/io/s4/schema/SchemaContainer.java
deleted file mode 100644
index 105e189..0000000
--- a/s4-core/src/main/java/io/s4/schema/SchemaContainer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.schema;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class SchemaContainer {
-    private Map<String, Schema> schemaMap = new ConcurrentHashMap<String, Schema>();
-
-    public Schema getSchema(Class<?> clazz) {
-        String schemaKey = clazz.getClassLoader().toString() + ":"
-                + clazz.getName();
-
-        Schema schema = schemaMap.get(schemaKey);
-        if (schema == null) {
-            schema = new Schema(clazz);
-            schemaMap.put(schemaKey, schema);
-        }
-
-        return schema;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/schema/SchemaManager.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/schema/SchemaManager.java b/s4-core/src/main/java/io/s4/schema/SchemaManager.java
deleted file mode 100644
index da0ad2e..0000000
--- a/s4-core/src/main/java/io/s4/schema/SchemaManager.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.schema;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-
-public class SchemaManager {
-    protected Map<String, String> schemaStringMap = new ConcurrentHashMap<String, String>();
-    private Map<String, String> schemaFilenameMap;
-
-    public void init() {
-        if (schemaFilenameMap == null) {
-            return;
-        }
-
-        for (String schemaName : schemaFilenameMap.keySet()) {
-            String schemaFilename = schemaFilenameMap.get(schemaName);
-            addSchemaFromFile(schemaName, schemaFilename);
-        }
-    }
-
-    public void setSchemas(Map<String, String> schemaFilenameMap) {
-        this.schemaFilenameMap = schemaFilenameMap;
-    }
-
-    public String getSchemaString(String schemaName) {
-        return schemaStringMap.get(schemaName);
-    }
-
-    public void addSchemaFromFile(String schemaName, String schemaFilename) {
-        BufferedReader br = null;
-        FileReader fr = null;
-        try {
-            File schemaFile = new File(schemaFilename);
-            if (schemaFile.exists()) {
-                fr = new FileReader(schemaFilename);
-                br = new BufferedReader(fr);
-                StringBuffer schemaBuffer = new StringBuffer();
-                String line = null;
-                while ((line = br.readLine()) != null) {
-                    schemaBuffer.append(line);
-                }
-                schemaStringMap.put(schemaName, schemaBuffer.toString());
-            } else {
-                Logger.getLogger("s4").error("Missing schema file:"
-                        + schemaFilename + " for type:" + schemaName);
-            }
-        } catch (IOException ioe) {
-            Logger.getLogger("s4").error("Exception reading schema file "
-                                                 + schemaFilename,
-                                         ioe);
-        } finally {
-            if (br != null) {
-                try {
-                    br.close();
-                } catch (Exception e) {
-                }
-            }
-            if (fr != null) {
-                try {
-                    fr.close();
-                } catch (Exception e) {
-                }
-
-            }
-        }
-    }
-
-    public void addSchema(String schemaName, String schema) {
-        schemaStringMap.put(schemaName, schema);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/serialize/KryoSerDeser.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/serialize/KryoSerDeser.java b/s4-core/src/main/java/io/s4/serialize/KryoSerDeser.java
deleted file mode 100644
index 8586cab..0000000
--- a/s4-core/src/main/java/io/s4/serialize/KryoSerDeser.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.serialize;
-
-import java.nio.ByteBuffer;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.ObjectBuffer;
-import com.esotericsoftware.kryo.serialize.SimpleSerializer;
-
-public class KryoSerDeser implements SerializerDeserializer {
-
-    private Kryo kryo = new Kryo();
-    
-    private int initialBufferSize = 2048;
-    private int maxBufferSize = 256*1024;
-
-    public void setInitialBufferSize(int initialBufferSize) {
-        this.initialBufferSize = initialBufferSize;
-    }
-
-    public void setMaxBufferSize(int maxBufferSize) {
-        this.maxBufferSize = maxBufferSize;
-    }
-
-    public Kryo getKryo() {
-		return kryo;
-	}
-
-	public KryoSerDeser() {
-        kryo.setRegistrationOptional(true);
-
-        // UUIDs don't have a no-arg constructor.
-        kryo.register(java.util.UUID.class,
-                      new SimpleSerializer<java.util.UUID>() {
-                          @Override
-                          public java.util.UUID read(ByteBuffer buf) {
-                              return new java.util.UUID(buf.getLong(),
-                                                        buf.getLong());
-                          }
-
-                          @Override
-                          public void write(ByteBuffer buf, java.util.UUID uuid) {
-                              buf.putLong(uuid.getMostSignificantBits());
-                              buf.putLong(uuid.getLeastSignificantBits());
-                          }
-
-                      });
-    }
-
-    @Override
-    public Object deserialize(byte[] rawMessage) {
-        ObjectBuffer buffer = new ObjectBuffer(kryo, initialBufferSize, maxBufferSize);
-        return buffer.readClassAndObject(rawMessage);
-    }
-
-    @Override
-    public byte[] serialize(Object message) {
-        ObjectBuffer buffer = new ObjectBuffer(kryo, initialBufferSize, maxBufferSize);
-        return buffer.writeClassAndObject(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/serialize/SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/serialize/SerializerDeserializer.java b/s4-core/src/main/java/io/s4/serialize/SerializerDeserializer.java
deleted file mode 100644
index 3b0e322..0000000
--- a/s4-core/src/main/java/io/s4/serialize/SerializerDeserializer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.serialize;
-
-public interface SerializerDeserializer {
-    public byte[] serialize(Object message);
-
-    public Object deserialize(byte[] rawMessage);
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/test/TestPersisterEventClock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/test/TestPersisterEventClock.java b/s4-core/src/main/java/io/s4/test/TestPersisterEventClock.java
deleted file mode 100644
index bc45ffc..0000000
--- a/s4-core/src/main/java/io/s4/test/TestPersisterEventClock.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.test;
-
-import io.s4.persist.ConMapPersister;
-import io.s4.persist.HashMapPersister;
-import io.s4.persist.Persister;
-import io.s4.util.clock.EventClock;
-
-public class TestPersisterEventClock {
-
-    static EventClock s4Clock;
-    static Persister persister;
-
-    public static void main(String[] args) {
-        TestPersisterEventClock testPersisterClock = new TestPersisterEventClock();
-        s4Clock = new EventClock();
-        s4Clock.updateTime(69990000);
-        TimeUpdaterThread timeUpdater = new TimeUpdaterThread(s4Clock);
-        Thread timeUpdaterThread = new Thread(timeUpdater);
-        timeUpdaterThread.start();
-        persister = new HashMapPersister(s4Clock);
-        testPersisterClock.testPersister(persister);
-        persister = new ConMapPersister(s4Clock);
-        testPersisterClock.testPersister(persister);
-    }
-
-    public void testPersister(Persister persister) {
-        HashMapPersister hp = null;
-        ConMapPersister cp = null;
-        if (persister instanceof HashMapPersister) {
-            hp = (HashMapPersister) persister;
-            hp.init();
-
-            hp.set("mykey1", "Test1", 40);
-            hp.set("mykey2", "Test2", 48);
-            hp.set("mykey3", "Test2", -1);
-
-            try {
-                s4Clock.waitForTime(s4Clock.getCurrentTime() + 1);
-            } catch (Exception e) {
-            }
-
-            System.out.println("mykey1: " + hp.get("mykey1"));
-            System.out.println("mykey2: " + hp.get("mykey2"));
-            System.out.println("mykey3: " + hp.get("mykey3"));
-
-            System.out.println("Going to sleep...");
-            try {
-                s4Clock.waitForTime(s4Clock.getCurrentTime() + 41000);
-            } catch (Exception e) {
-            }
-            System.out.println("Waking up");
-
-            System.out.println("mykey1: " + hp.get("mykey1"));
-            System.out.println("mykey2: " + hp.get("mykey2"));
-            System.out.println("mykey3: " + hp.get("mykey3"));
-
-            System.out.println("Going to sleep...");
-            try {
-                s4Clock.waitForTime(s4Clock.getCurrentTime() + 10000);
-            } catch (Exception e) {
-            }
-            System.out.println("Waking up");
-
-            System.out.println("mykey1: " + hp.get("mykey1"));
-            System.out.println("mykey2: " + hp.get("mykey2"));
-            System.out.println("mykey3: " + hp.get("mykey3"));
-            System.out.println("cleanUp: " + hp.cleanOutGarbage());
-
-        }
-        if (persister instanceof ConMapPersister) {
-            cp = (ConMapPersister) persister;
-            cp.init();
-
-            cp.set("mykey1", "Test1", 40);
-            cp.set("mykey2", "Test2", 48);
-            cp.set("mykey3", "Test2", -1);
-
-            try {
-                s4Clock.waitForTime(s4Clock.getCurrentTime() + 1);
-            } catch (Exception e) {
-            }
-
-            System.out.println("mykey1: " + cp.get("mykey1"));
-            System.out.println("mykey2: " + cp.get("mykey2"));
-            System.out.println("mykey3: " + cp.get("mykey3"));
-
-            System.out.println("Going to sleep...");
-            try {
-                s4Clock.waitForTime(s4Clock.getCurrentTime() + 41000);
-            } catch (Exception e) {
-            }
-            System.out.println("Waking up");
-
-            System.out.println("mykey1: " + cp.get("mykey1"));
-            System.out.println("mykey2: " + cp.get("mykey2"));
-            System.out.println("mykey3: " + cp.get("mykey3"));
-
-            System.out.println("Going to sleep...");
-            try {
-                s4Clock.waitForTime(s4Clock.getCurrentTime() + 10000);
-            } catch (Exception e) {
-            }
-            System.out.println("Waking up");
-
-            System.out.println("mykey1: " + cp.get("mykey1"));
-            System.out.println("mykey2: " + cp.get("mykey2"));
-            System.out.println("mykey3: " + cp.get("mykey3"));
-            System.out.println("cleanUp: " + cp.cleanOutGarbage());
-        }
-    }
-    
-    static class TimeUpdaterThread implements Runnable {
-
-        EventClock s4Clock;
-
-        public TimeUpdaterThread(EventClock s4Clock) {
-            this.s4Clock = s4Clock;
-        }
-
-        @Override
-        public void run() {
-            // TODO Auto-generated method stub
-
-            // tick the clock
-            for (long currentTime = 69996400; currentTime <= 70099900; currentTime += 400) {
-                System.out.println("Setting time to " + currentTime);
-                s4Clock.updateTime(currentTime);
-                try {
-                    Thread.sleep(400);
-                } catch (InterruptedException ie) {
-                }
-            }
-        }
-
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/test/TestPersisterWallClock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/test/TestPersisterWallClock.java b/s4-core/src/main/java/io/s4/test/TestPersisterWallClock.java
deleted file mode 100644
index ce97d57..0000000
--- a/s4-core/src/main/java/io/s4/test/TestPersisterWallClock.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.test;
-
-import io.s4.persist.ConMapPersister;
-import io.s4.persist.HashMapPersister;
-import io.s4.persist.Persister;
-import io.s4.util.clock.WallClock;
-
-public class TestPersisterWallClock {
-
-    static WallClock s4Clock;
-    static Persister persister;
-
-    public static void main(String[] args) {
-        TestPersisterWallClock testPersisterClock = new TestPersisterWallClock();
-        s4Clock = new WallClock();
-        persister = new HashMapPersister(s4Clock);
-        testPersisterClock.testPersister(persister);
-        persister = new ConMapPersister(s4Clock);
-        testPersisterClock.testPersister(persister);
-    }
-
-    public void testPersister(Persister persister) {
-        HashMapPersister hp = null;
-        ConMapPersister cp = null;
-        if (persister instanceof HashMapPersister) {
-            hp = (HashMapPersister) persister;
-            hp.init();
-
-            hp.set("mykey1", "Test1", 40);
-            hp.set("mykey2", "Test2", 48);
-            hp.set("mykey3", "Test2", -1);
-
-            try {
-                s4Clock.waitForTime(s4Clock.getCurrentTime() + 1);
-            } catch (Exception e) {
-            }
-
-            System.out.println("mykey1: " + hp.get("mykey1"));
-            System.out.println("mykey2: " + hp.get("mykey2"));
-            System.out.println("mykey3: " + hp.get("mykey3"));
-
-            System.out.println("Going to sleep...");
-            try {
-                s4Clock.waitForTime(s4Clock.getCurrentTime() + 41000);
-            } catch (Exception e) {
-            }
-            System.out.println("Waking up");
-
-            System.out.println("mykey1: " + hp.get("mykey1"));
-            System.out.println("mykey2: " + hp.get("mykey2"));
-            System.out.println("mykey3: " + hp.get("mykey3"));
-
-            System.out.println("Going to sleep...");
-            try {
-                s4Clock.waitForTime(s4Clock.getCurrentTime() + 10000);
-            } catch (Exception e) {
-            }
-            System.out.println("Waking up");
-
-            System.out.println("mykey1: " + hp.get("mykey1"));
-            System.out.println("mykey2: " + hp.get("mykey2"));
-            System.out.println("mykey3: " + hp.get("mykey3"));
-            System.out.println("cleanUp: " + hp.cleanOutGarbage());
-
-        }
-        if (persister instanceof ConMapPersister) {
-            cp = (ConMapPersister) persister;
-            cp.init();
-
-            cp.set("mykey1", "Test1", 40);
-            cp.set("mykey2", "Test2", 48);
-            cp.set("mykey3", "Test2", -1);
-
-            try {
-                s4Clock.waitForTime(s4Clock.getCurrentTime() + 1);
-            } catch (Exception e) {
-            }
-
-            System.out.println("mykey1: " + cp.get("mykey1"));
-            System.out.println("mykey2: " + cp.get("mykey2"));
-            System.out.println("mykey3: " + cp.get("mykey3"));
-
-            System.out.println("Going to sleep...");
-            try {
-                s4Clock.waitForTime(s4Clock.getCurrentTime() + 41000);
-            } catch (Exception e) {
-            }
-            System.out.println("Waking up");
-
-            System.out.println("mykey1: " + cp.get("mykey1"));
-            System.out.println("mykey2: " + cp.get("mykey2"));
-            System.out.println("mykey3: " + cp.get("mykey3"));
-
-            System.out.println("Going to sleep...");
-            try {
-                s4Clock.waitForTime(s4Clock.getCurrentTime() + 10000);
-            } catch (Exception e) {
-            }
-            System.out.println("Waking up");
-
-            System.out.println("mykey1: " + cp.get("mykey1"));
-            System.out.println("mykey2: " + cp.get("mykey2"));
-            System.out.println("mykey3: " + cp.get("mykey3"));
-            System.out.println("cleanUp: " + cp.cleanOutGarbage());
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/ByteArrayIOChannel.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/ByteArrayIOChannel.java b/s4-core/src/main/java/io/s4/util/ByteArrayIOChannel.java
deleted file mode 100644
index f7861b4..0000000
--- a/s4-core/src/main/java/io/s4/util/ByteArrayIOChannel.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-import io.s4.client.IOChannel;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-
-public class ByteArrayIOChannel implements IOChannel {
-    private InputStream in;
-    private OutputStream out;
-
-    public ByteArrayIOChannel(Socket socket) throws IOException {
-        in = socket.getInputStream();
-        out = socket.getOutputStream();
-    }
-
-    private void readBytes(byte[] s, int n) throws IOException {
-        int r = 0; // bytes read so far
-
-        do {
-            // keep reading bytes till the required "n" are read
-            int p = in.read(s, r, (n - r));
-
-            if (p == -1) {
-                throw new IOException("reached end of stream after reading "
-                        + r + " bytes. expected " + n + " bytes");
-            }
-
-            r += p;
-
-        } while (r < n);
-    }
-
-    public byte[] recv() throws IOException {
-        // first read size of byte array.
-        // unsigned int, big endian: 0A0B0C0D -> {0A, 0B, 0C, 0D}
-        byte[] s = { 0, 0, 0, 0 };
-        readBytes(s, 4);
-
-        // to allow full range of int, using long for size
-        int size = (int) ( // NOTE: type cast not necessary for int
-        (0xff & s[0]) << 24 | (0xff & s[1]) << 16 | (0xff & s[2]) << 8 | (0xff & s[3]) << 0);
-
-        if (size == 0)
-            return null;
-        
-        // ignore ridiculous sizes
-        // TODO: come up with a better solution than this
-        if (size < 0 || size > (10*1024*1024)) {
-            throw new IOException("Bizarre size "  + size);
-        }
-
-        byte[] v = new byte[size];
-
-        // read the message
-        readBytes(v, size);
-
-        return v;
-    }
-
-    public void send(byte[] v) throws IOException {
-        byte[] s = { 0, 0, 0, 0 };
-        int size = v.length;
-
-        s[3] = (byte) (size & 0xff);
-        size >>= 8;
-        s[2] = (byte) (size & 0xff);
-        size >>= 8;
-        s[1] = (byte) (size & 0xff);
-        size >>= 8;
-        s[0] = (byte) (size & 0xff);
-
-        out.write(s);
-        out.write(v);
-        out.flush();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/Cloner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/Cloner.java b/s4-core/src/main/java/io/s4/util/Cloner.java
deleted file mode 100644
index fd7cf59..0000000
--- a/s4-core/src/main/java/io/s4/util/Cloner.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-public interface Cloner {
-    public Object clone(Object o);
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/ClonerGenerator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/ClonerGenerator.java b/s4-core/src/main/java/io/s4/util/ClonerGenerator.java
deleted file mode 100644
index fd8d245..0000000
--- a/s4-core/src/main/java/io/s4/util/ClonerGenerator.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.Random;
-
-import org.apache.bcel.Constants;
-import org.apache.bcel.classfile.JavaClass;
-import org.apache.bcel.generic.BranchInstruction;
-import org.apache.bcel.generic.ClassGen;
-import org.apache.bcel.generic.ConstantPoolGen;
-import org.apache.bcel.generic.INSTANCEOF;
-import org.apache.bcel.generic.InstructionConstants;
-import org.apache.bcel.generic.InstructionFactory;
-import org.apache.bcel.generic.InstructionHandle;
-import org.apache.bcel.generic.InstructionList;
-import org.apache.bcel.generic.MethodGen;
-import org.apache.bcel.generic.ObjectType;
-import org.apache.bcel.generic.PUSH;
-import org.apache.bcel.generic.Type;
-
-public class ClonerGenerator {
-    public Class generate(Class clazz) {
-        String className = clazz.getName();
-        Random rand = new Random(System.currentTimeMillis());
-        String clonerClassname = "Cloner" + (Math.abs(rand.nextInt() % 3256));
-
-        ClassGen cg = new ClassGen(clonerClassname,
-                                   "java.lang.Object",
-                                   clonerClassname + ".java",
-                                   Constants.ACC_PUBLIC | Constants.ACC_SUPER,
-                                   new String[] { "io.s4.util.Cloner" });
-        ConstantPoolGen cp = cg.getConstantPool();
-        InstructionFactory instFactory = new InstructionFactory(cg, cp);
-
-        InstructionList il = new InstructionList();
-
-        // build constructor method for new class
-        MethodGen constructor = new MethodGen(Constants.ACC_PUBLIC,
-                                              Type.VOID,
-                                              Type.NO_ARGS,
-                                              new String[] {},
-                                              "<init>",
-                                              clonerClassname,
-                                              il,
-                                              cp);
-        il.append(InstructionFactory.createLoad(Type.OBJECT, 0));
-        il.append(instFactory.createInvoke("java.lang.Object",
-                                           "<init>",
-                                           Type.VOID,
-                                           Type.NO_ARGS,
-                                           Constants.INVOKESPECIAL));
-
-        il.append(InstructionFactory.createReturn(Type.VOID));
-        constructor.setMaxStack();
-        constructor.setMaxLocals();
-        cg.addMethod(constructor.getMethod());
-        il.dispose();
-
-        // build clone method
-        il = new InstructionList();
-        MethodGen method = new MethodGen(Constants.ACC_PUBLIC,
-                                         Type.OBJECT,
-                                         new Type[] { Type.OBJECT },
-                                         new String[] { "arg0" },
-                                         "clone",
-                                         clonerClassname,
-                                         il,
-                                         cp);
-
-        il.append(InstructionConstants.ACONST_NULL);
-        il.append(InstructionFactory.createStore(Type.OBJECT, 2));
-        il.append(InstructionFactory.createLoad(Type.OBJECT, 1));
-        il.append(new INSTANCEOF(cp.addClass(new ObjectType(className))));
-        BranchInstruction ifeq_6 = InstructionFactory.createBranchInstruction(Constants.IFEQ,
-                                                                              null);
-        il.append(ifeq_6);
-        il.append(InstructionFactory.createLoad(Type.OBJECT, 1));
-        il.append(instFactory.createCheckCast(new ObjectType(className)));
-        il.append(InstructionFactory.createStore(Type.OBJECT, 2));
-        InstructionHandle ih_14 = il.append(InstructionFactory.createLoad(Type.OBJECT,
-                                                                          2));
-        il.append(new INSTANCEOF(cp.addClass(new ObjectType("java.lang.Cloneable"))));
-        BranchInstruction ifne_18 = InstructionFactory.createBranchInstruction(Constants.IFNE,
-                                                                               null);
-        il.append(ifne_18);
-        il.append(instFactory.createFieldAccess("java.lang.System",
-                                                "out",
-                                                new ObjectType("java.io.PrintStream"),
-                                                Constants.GETSTATIC));
-        il.append(new PUSH(cp, "Not cloneable!"));
-        il.append(instFactory.createInvoke("java.io.PrintStream",
-                                           "println",
-                                           Type.VOID,
-                                           new Type[] { Type.STRING },
-                                           Constants.INVOKEVIRTUAL));
-        il.append(InstructionConstants.ACONST_NULL);
-        il.append(InstructionFactory.createReturn(Type.OBJECT));
-        InstructionHandle ih_31 = il.append(InstructionFactory.createLoad(Type.OBJECT,
-                                                                          2));
-        il.append(instFactory.createInvoke(className,
-                                           "clone",
-                                           Type.OBJECT,
-                                           Type.NO_ARGS,
-                                           Constants.INVOKEVIRTUAL));
-        il.append(InstructionFactory.createReturn(Type.OBJECT));
-        ifeq_6.setTarget(ih_14);
-        ifne_18.setTarget(ih_31);
-        method.setMaxStack();
-        method.setMaxLocals();
-        cg.addMethod(method.getMethod());
-        il.dispose();
-
-        JavaClass jc = cg.getJavaClass();
-        ClonerClassLoader cl = new ClonerClassLoader();
-
-        return cl.loadClassFromBytes(clonerClassname, jc.getBytes());
-    }
-
-    public static class ClonerClassLoader extends URLClassLoader {
-        public ClonerClassLoader() {
-            super(new URL[] {});
-        }
-
-        public Class loadClassFromBytes(String name, byte[] bytes) {
-            try {
-                return this.loadClass(name);
-            } catch (ClassNotFoundException cnfe) {
-                // expected
-            }
-            return this.defineClass(name, bytes, 0, bytes.length);
-        }
-    }
-}


Mime
View raw message