incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [34/50] [abbrv] Rename packages in preparation for move to Apache
Date Tue, 03 Jan 2012 11:19:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/DoubleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/DoubleOutputFormatter.java b/s4-core/src/main/java/io/s4/util/DoubleOutputFormatter.java
deleted file mode 100644
index 2f35652..0000000
--- a/s4-core/src/main/java/io/s4/util/DoubleOutputFormatter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-import io.s4.processor.OutputFormatter;
-
-import org.apache.log4j.Logger;
-
-public class DoubleOutputFormatter implements OutputFormatter {
-
-    @Override
-    public Object format(Object outputValue) {
-        double doubleResult = 0.0;
-
-        if (outputValue instanceof Double) {
-            return outputValue;
-        }
-
-        try {
-            doubleResult = ((Number) outputValue).doubleValue(); // outputValue
-                                                                 // better be
-                                                                 // convertible!!
-        } catch (Exception e) {
-            Logger.getLogger("s4")
-                  .error("Exception converting value to double", e);
-            return null;
-        }
-        return doubleResult;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/GsonUtil.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/GsonUtil.java b/s4-core/src/main/java/io/s4/util/GsonUtil.java
deleted file mode 100644
index 04b050e..0000000
--- a/s4-core/src/main/java/io/s4/util/GsonUtil.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package io.s4.util;
-
-import java.lang.reflect.Type;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-public class GsonUtil {
-
-    /**
-     * This is a workaround for a bug in Gson:
-     * 
-     * http://code.google.com/p/google-gson/issues/detail?id=279
-     * "Templated collections of collections do not serialize correctly"
-     */
-    private final static JsonSerializer<Object> objectSerializer = new JsonSerializer<Object>() {
-        public JsonElement serialize(Object src, Type typeOfSrc,
-                                     JsonSerializationContext context) {
-
-            if (src.getClass() != Object.class) {
-                return context.serialize(src, src.getClass());
-            }
-
-            return new JsonObject();
-        }
-    };
-
-    private static HashMap<Type, Object> typeAdapters = new HashMap<Type, Object>();
-
-    private volatile static Gson gson = null;
-
-    // build gson ASAP
-    static {
-        build();
-    }
-
-    /**
-     * Add a type adapter to the chain.
-     * 
-     * @param type
-     * @param typeAdapter
-     */
-    public static synchronized void registerTypeAdapter(Type type,
-                                                        Object typeAdapter) {
-        typeAdapters.put(type, typeAdapter);
-        build();
-    }
-
-    /**
-     * Get a Gson instance.
-     * 
-     * @return Gson instance with all the adapters registered.
-     */
-    public static Gson get() {
-        return gson;
-    }
-
-    private static synchronized void build() {
-        GsonBuilder b = (new GsonBuilder()).registerTypeAdapter(Object.class,
-                                                                objectSerializer);
-
-        for (Map.Entry<Type, Object> e : typeAdapters.entrySet()) {
-            b.registerTypeAdapter(e.getKey(), e.getValue());
-        }
-
-        gson = b.create();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/KeyUtil.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/KeyUtil.java b/s4-core/src/main/java/io/s4/util/KeyUtil.java
deleted file mode 100644
index d40ef40..0000000
--- a/s4-core/src/main/java/io/s4/util/KeyUtil.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collection;
-
-/**
- * from http://github.com/dustin/java-memcached-client/blob/master/src/main/java/net/spy/memcached/KeyUtil.java
- */
-/**
- * Utilities for processing key values.
- */
-public class KeyUtil {
-
-    /**
-     * Get the bytes for a key.
-     * 
-     * @param k
-     *            the key
-     * @return the bytes
-     */
-    public static byte[] getKeyBytes(String k) {
-        try {
-            return k.getBytes("UTF-8");
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Get the keys in byte form for all of the string keys.
-     * 
-     * @param keys
-     *            a collection of keys
-     * @return return a collection of the byte representations of keys
-     */
-    public static Collection<byte[]> getKeyBytes(Collection<String> keys) {
-        Collection<byte[]> rv = new ArrayList<byte[]>(keys.size());
-        for (String s : keys) {
-            rv.add(getKeyBytes(s));
-        }
-        return rv;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/LoadGenerator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/LoadGenerator.java b/s4-core/src/main/java/io/s4/util/LoadGenerator.java
deleted file mode 100644
index b0893aa..0000000
--- a/s4-core/src/main/java/io/s4/util/LoadGenerator.java
+++ /dev/null
@@ -1,640 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-import io.s4.collector.EventWrapper;
-import io.s4.dispatcher.partitioner.CompoundKeyInfo;
-import io.s4.emitter.CommLayerEmitter;
-import io.s4.emitter.EventEmitter;
-import io.s4.schema.Schema;
-import io.s4.schema.Schema.Property;
-import io.s4.serialize.KryoSerDeser;
-import io.s4.serialize.SerializerDeserializer;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.lang.reflect.Array;
-import java.lang.reflect.Method;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public class LoadGenerator {
-
-    public static void main(String args[]) {
-        Options options = new Options();
-        boolean warmUp = false;
-
-        options.addOption(OptionBuilder.withArgName("rate")
-                                       .hasArg()
-                                       .withDescription("Rate (events per second)")
-                                       .create("r"));
-
-        options.addOption(OptionBuilder.withArgName("display_rate")
-                                       .hasArg()
-                                       .withDescription("Display Rate at specified second boundary")
-                                       .create("d"));
-
-        options.addOption(OptionBuilder.withArgName("start_boundary")
-                                       .hasArg()
-                                       .withDescription("Start boundary in seconds")
-                                       .create("b"));
-
-        options.addOption(OptionBuilder.withArgName("run_for")
-                                       .hasArg()
-                                       .withDescription("Run for a specified number of seconds")
-                                       .create("x"));
-
-        options.addOption(OptionBuilder.withArgName("cluster_manager")
-                                       .hasArg()
-                                       .withDescription("Cluster manager")
-                                       .create("z"));
-
-        options.addOption(OptionBuilder.withArgName("sender_application_name")
-                                       .hasArg()
-                                       .withDescription("Sender application name")
-                                       .create("a"));
-
-        options.addOption(OptionBuilder.withArgName("listener_application_name")
-                                       .hasArg()
-                                       .withDescription("Listener application name")
-                                       .create("g"));
-
-        options.addOption(OptionBuilder.withArgName("sleep_overhead")
-                                       .hasArg()
-                                       .withDescription("Sleep overhead")
-                                       .create("o"));
-
-        options.addOption(new Option("w", "Warm-up"));
-
-        CommandLineParser parser = new GnuParser();
-
-        CommandLine line = null;
-        try {
-            // parse the command line arguments
-            line = parser.parse(options, args);
-        } catch (ParseException exp) {
-            // oops, something went wrong
-            System.err.println("Parsing failed.  Reason: " + exp.getMessage());
-            System.exit(1);
-        }
-
-        int expectedRate = 250;
-        if (line.hasOption("r")) {
-            try {
-                expectedRate = Integer.parseInt(line.getOptionValue("r"));
-            } catch (Exception e) {
-                System.err.println("Bad expected rate specified "
-                        + line.getOptionValue("r"));
-                System.exit(1);
-            }
-        }
-
-        int displayRateIntervalSeconds = 20;
-        if (line.hasOption("d")) {
-            try {
-                displayRateIntervalSeconds = Integer.parseInt(line.getOptionValue("d"));
-            } catch (Exception e) {
-                System.err.println("Bad display rate value specified "
-                        + line.getOptionValue("d"));
-                System.exit(1);
-            }
-        }
-
-        int startBoundary = 2;
-        if (line.hasOption("b")) {
-            try {
-                startBoundary = Integer.parseInt(line.getOptionValue("b"));
-            } catch (Exception e) {
-                System.err.println("Bad start boundary value specified "
-                        + line.getOptionValue("b"));
-                System.exit(1);
-            }
-        }
-
-        int updateFrequency = 0;
-        if (line.hasOption("f")) {
-            try {
-                updateFrequency = Integer.parseInt(line.getOptionValue("f"));
-            } catch (Exception e) {
-                System.err.println("Bad query udpdate frequency specified "
-                        + line.getOptionValue("f"));
-                System.exit(1);
-            }
-            System.out.printf("Update frequency is %d\n", updateFrequency);
-        }
-
-        int runForTime = 0;
-        if (line.hasOption("x")) {
-            try {
-                runForTime = Integer.parseInt(line.getOptionValue("x"));
-            } catch (Exception e) {
-                System.err.println("Bad run for time specified "
-                        + line.getOptionValue("x"));
-                System.exit(1);
-            }
-            System.out.printf("Run for time is %d\n", runForTime);
-        }
-
-        String clusterManagerAddress = null;
-        if (line.hasOption("z")) {
-            clusterManagerAddress = line.getOptionValue("z");
-        }
-
-        String senderApplicationName = null;
-        if (line.hasOption("a")) {
-            senderApplicationName = line.getOptionValue("a");
-        }
-
-        String listenerApplicationName = null;
-        if (line.hasOption("a")) {
-            listenerApplicationName = line.getOptionValue("g");
-        }
-
-        if (listenerApplicationName == null) {
-            listenerApplicationName = senderApplicationName;
-        }
-
-        long sleepOverheadMicros = -1;
-        if (line.hasOption("o")) {
-            try {
-                sleepOverheadMicros = Long.parseLong(line.getOptionValue("o"));
-            } catch (NumberFormatException e) {
-                System.err.println("Bad sleep overhead specified "
-                        + line.getOptionValue("o"));
-                System.exit(1);
-            }
-            System.out.printf("Specified sleep overhead is %d\n",
-                              sleepOverheadMicros);
-        }
-
-        if (line.hasOption("w")) {
-            warmUp = true;
-        }
-
-        List loArgs = line.getArgList();
-        if (loArgs.size() < 1) {
-            System.err.println("No input file specified");
-            System.exit(1);
-        }
-
-        String inputFilename = (String) loArgs.get(0);
-
-        EventEmitter emitter = null;
-
-        SerializerDeserializer serDeser = new KryoSerDeser();
-
-        CommLayerEmitter clEmitter = new CommLayerEmitter();
-        clEmitter.setAppName(senderApplicationName);
-        clEmitter.setListenerAppName(listenerApplicationName);
-        clEmitter.setClusterManagerAddress(clusterManagerAddress);
-        clEmitter.setSenderId(String.valueOf(System.currentTimeMillis() / 1000));
-        clEmitter.setSerDeser(serDeser);
-        clEmitter.init();
-        emitter = clEmitter;
-
-        long endTime = 0;
-        if (runForTime > 0) {
-            endTime = System.currentTimeMillis() + (runForTime * 1000);
-        }
-
-        LoadGenerator loadGenerator = new LoadGenerator();
-        loadGenerator.setInputFilename(inputFilename);
-        loadGenerator.setEventEmitter(clEmitter);
-        loadGenerator.setDisplayRateInterval(displayRateIntervalSeconds);
-        loadGenerator.setExpectedRate(expectedRate);
-        loadGenerator.run();
-
-        System.exit(0);
-    }
-
-    private EventEmitter eventEmitter;
-    private String inputFilename;
-    private int emitCount;
-    private int displayRateInterval = 0;
-
-    private int expectedRate = 200;
-    private int adjustedExpectedRate = 1;
-    private long sleepOverheadMicros = -1;
-    private static int PROCESS_TIME_LIST_MAX_SIZE = 15;
-    private long[] processTimes = new long[PROCESS_TIME_LIST_MAX_SIZE];
-    private int processTimePointer = 0;
-    private Map<Integer, EventTypeInfo> eventTypeInfoMap = new HashMap<Integer, EventTypeInfo>();
-
-    public int getEmitCount() {
-        return emitCount;
-    }
-
-    public void setEventEmitter(EventEmitter eventEmitter) {
-        this.eventEmitter = eventEmitter;
-    }
-
-    public void setInputFilename(String inputFilename) {
-        this.inputFilename = inputFilename;
-    }
-
-    public void setDisplayRateInterval(int displayRateInterval) {
-        this.displayRateInterval = displayRateInterval;
-    }
-
-    public void setSleepOverheadMicros(long sleepOverheadMicros) {
-        this.sleepOverheadMicros = sleepOverheadMicros;
-    }
-
-    public void setExpectedRate(int expectedRate) {
-        this.expectedRate = expectedRate;
-    }
-
-    private Random rand = new Random(System.currentTimeMillis());
-
-    public LoadGenerator() {
-        if (sleepOverheadMicros == -1) {
-            // calculate sleep overhead
-            long totalSleepOverhead = 0;
-            for (int i = 0; i < 50; i++) {
-                long startTime = System.nanoTime();
-                try {
-                    Thread.sleep(1);
-                } catch (InterruptedException ie) {
-                }
-                totalSleepOverhead += (System.nanoTime() - startTime)
-                        - (1 * 1000 * 1000);
-            }
-            sleepOverheadMicros = (totalSleepOverhead / 50) / 1000;
-        }
-        System.out.println("Sleep overhead is " + sleepOverheadMicros);
-    }
-
-    public void run() {
-        // for now, no warm-up mechanism
-        adjustedExpectedRate = expectedRate;
-
-        long startTime = 0;
-        long intervalStart = 0;
-        int emitCountStart = 0;
-        long[] rateInfo = new long[2];
-        rateInfo[0] = 100; // start with a sleep time of 100
-
-        BufferedReader br = null;
-        Reader inputReader = null;
-        try {
-            if (inputFilename.equals("-")) {
-                inputReader = new InputStreamReader(System.in);
-            } else {
-                inputReader = new FileReader(inputFilename);
-            }
-            br = new BufferedReader(inputReader);
-            String inputLine = null;
-            boolean firstLine = true;
-            EventWrapper eventWrapper = null;
-            for (startTime = System.nanoTime(); (inputLine = br.readLine()) != null; startTime = System.nanoTime()) {
-                if (firstLine) {
-                    JSONObject jsonRecord = new JSONObject(inputLine);
-                    createEventTypeInfo(jsonRecord);
-                    System.out.println(eventTypeInfoMap);
-                    if (eventTypeInfoMap.size() == 0) {
-                        return;
-                    }
-                    firstLine = false;
-                    continue;
-                }
-
-                try {
-                    JSONObject jsonRecord = new JSONObject(inputLine);
-                    int classIndex = jsonRecord.getInt("_index");
-                    EventTypeInfo eventTypeInfo = eventTypeInfoMap.get(classIndex);
-                    if (eventTypeInfo == null) {
-                        System.err.printf("Invalid _index value %d\n",
-                                          classIndex);
-                        return;
-                    }
-
-                    Object event = makeRecord(jsonRecord,
-                                              eventTypeInfo.getSchema());
-                    eventWrapper = new EventWrapper(eventTypeInfo.getStreamName(),
-                                                    event,
-                                                    new ArrayList<CompoundKeyInfo>());
-                    // System.out.println(eventWrapper.getStreamName() + ": " +
-                    // eventWrapper.getEvent());
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    System.err.printf("Bad input data %s\n", inputLine);
-                    continue;
-                }
-
-                int partition = Math.abs(rand.nextInt())
-                        % eventEmitter.getNodeCount();
-
-                eventEmitter.emit(partition, eventWrapper);
-                emitCount++;
-
-                // the rest of the stuff in this block is just to maintain the
-                // rate
-                processTimes[processTimePointer] = System.nanoTime()
-                        - startTime;
-                processTimePointer = (processTimePointer == PROCESS_TIME_LIST_MAX_SIZE - 1) ? 0
-                        : processTimePointer + 1;
-                if (emitCount == 1 || emitCount % 20 == 0) {
-                    rateInfo = getRateInfo(rateInfo);
-                }
-
-                // if it's time, display the actual emit rate
-                if (intervalStart == 0) {
-                    intervalStart = System.currentTimeMillis();
-                } else {
-                    long interval = System.currentTimeMillis() - intervalStart;
-                    if (interval >= (displayRateInterval * 1000)) {
-                        double rate = (emitCount - emitCountStart)
-                                / (interval / 1000.0);
-                        System.out.println("Rate is " + rate);
-                        intervalStart = System.currentTimeMillis();
-                        emitCountStart = emitCount;
-                    }
-                }
-
-                if (rateInfo[1] == 0 || emitCount % rateInfo[1] == 0) {
-                    try {
-                        Thread.sleep(rateInfo[0]);
-                    } catch (InterruptedException ie) {
-                    }
-                }
-            }
-            System.out.printf("Emitted %d events\n", emitCount);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        } finally {
-            try {
-                br.close();
-            } catch (Exception e) {
-            }
-            try {
-                inputReader.close();
-            } catch (Exception e) {
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    public void createEventTypeInfo(JSONObject classInfo) {
-        String className = "";
-        try {
-            for (Iterator it = classInfo.keys(); it.hasNext();) {
-                className = (String) it.next();
-                JSONObject jsonEventTypeInfo = classInfo.getJSONObject(className);
-                int classIndex = (Integer) jsonEventTypeInfo.getInt("classIndex");
-                String streamName = jsonEventTypeInfo.getString("streamName");
-
-                Class clazz = Class.forName(className);
-                Schema schema = new Schema(clazz);
-                eventTypeInfoMap.put(classIndex, new EventTypeInfo(schema,
-                                                                   streamName));
-            }
-        } catch (JSONException je) {
-            je.printStackTrace();
-        } catch (ClassNotFoundException cnfe) {
-            System.err.println("Count not locate class " + className);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-	public static Object makeRecord(JSONObject jsonRecord, Schema schema) {
-        Object event = null;
-        try {
-            event = schema.getType().newInstance();
-
-            for (Iterator it = jsonRecord.keys(); it.hasNext();) {
-                String propertyName = (String) it.next();
-
-                Property property = schema.getProperties().get(propertyName);
-
-                if (property == null) {
-                    continue; // not in schema, just continue
-                }
-
-                Method setterMethod = property.getSetterMethod();
-                Object value = jsonRecord.get(propertyName);
-                if (value.equals(JSONObject.NULL)) {
-                    continue;
-                }
-
-                setterMethod.invoke(event, makeSettableValue(property, value));
-
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-        return event;
-    }
-
-    @SuppressWarnings("unchecked")
-	public static Object makeSettableValue(Property property, Object value) {
-        String propertyName = property.getName();
-        Class propertyType = property.getType();
-
-        if (propertyType.isArray()) {
-            if (!(value instanceof JSONArray)) {
-                System.err.println("Type mismatch for field " + propertyName);
-                return null;
-            }
-            System.out.println("Is array!");
-            return makeArray(property, (JSONArray) value);
-        } else if (property.isList()) {
-            if (!(value instanceof JSONArray)) {
-                System.err.println("Type mismatch for field " + propertyName);
-                return null;
-            }
-            return makeList(property, (JSONArray) value);
-        } else if (propertyType.isPrimitive()) {
-            if (!(value instanceof Number || value instanceof Boolean)) {
-                System.err.println("Type mismatch for field " + propertyName
-                        + "; expected number or boolean, found "
-                        + value.getClass());
-                return null;
-            }
-            return value; // hmm... does this work?
-        } else if (propertyType.equals(String.class)) {
-            if (!(value instanceof String)) {
-                System.err.println("Type mismatch for field " + propertyName
-                        + "; expected String, found " + value.getClass());
-                return null;
-            }
-            return value;
-        } else if (property.isNumber()) {
-            if (!(value instanceof Integer || value instanceof Long
-                    || value instanceof Float || value instanceof Double
-                    || value instanceof BigDecimal || value instanceof BigInteger)) {
-                return null;
-            }
-
-            Number adjustedValue = (Number) value;
-            if (propertyType.equals(Long.class) && !(value instanceof Long)) {
-                adjustedValue = new Long(((Number) value).longValue());
-            } else if (propertyType.equals(Integer.class)
-                    && !(value instanceof Integer)) {
-                adjustedValue = new Integer(((Number) value).intValue());
-            } else if (propertyType.equals(Double.class)
-                    && !(value instanceof Double)) {
-                adjustedValue = new Double(((Number) value).doubleValue());
-            } else if (propertyType.equals(Float.class)
-                    && !(value instanceof Float)) {
-                adjustedValue = new Float(((Number) value).floatValue());
-            } else if (propertyType.equals(BigDecimal.class)) {
-                adjustedValue = new BigDecimal(((Number) value).longValue());
-            } else if (propertyType.equals(BigInteger.class)) {
-                adjustedValue = BigInteger.valueOf(((Number) value).longValue());
-            }
-            return adjustedValue;
-        } else if (value instanceof JSONObject) {
-            return makeRecord((JSONObject) value, property.getSchema());
-        }
-
-        return null;
-    }
-
-	public static Object makeList(Property property, JSONArray jsonArray) {
-        Property componentProperty = property.getComponentProperty();
-
-        int size = jsonArray.length();
-
-        List<Object> list = new ArrayList<Object>(size);
-
-        try {
-            for (int i = 0; i < size; i++) {
-                Object value = jsonArray.get(i);
-                list.add(makeSettableValue(componentProperty, value));
-            }
-        } catch (JSONException je) {
-            throw new RuntimeException(je);
-        }
-
-        return list;
-    }
-
-    @SuppressWarnings("unchecked")
-	public static Object makeArray(Property property, JSONArray jsonArray) {
-        Property componentProperty = property.getComponentProperty();
-        Class clazz = componentProperty.getType();
-
-        int size = jsonArray.length();
-
-        Object array = Array.newInstance(clazz, size);
-
-        try {
-            for (int i = 0; i < size; i++) {
-                Object value = jsonArray.get(i);
-                Object adjustedValue = makeSettableValue(componentProperty,
-                                                         value);
-                Array.set(array, i, adjustedValue);
-            }
-        } catch (JSONException je) {
-            throw new RuntimeException(je);
-        }
-        return array;
-    }
-
-    private long[] getRateInfo(long[] rateInfo) {
-        long totalTimeNanos = 0;
-        int entryCount = 0;
-        for (int i = 0; i < processTimes.length; i++) {
-            if (processTimes[i] == Long.MIN_VALUE) {
-                break;
-            }
-            entryCount++;
-            totalTimeNanos += processTimes[i];
-        }
-        long averageTimeMicros = (long) ((totalTimeNanos / (double) entryCount) / 1000.0);
-        // fudge the time for additional overhead
-        averageTimeMicros += (long) (averageTimeMicros * 0.30);
-
-        if (emitCount % 5000 == 0) {
-            // System.out.println("Average time in micros is " +
-            // averageTimeMicros);
-        }
-
-        long sleepTimeMicros = 0;
-        long millis = 0;
-
-        long timeToMeetRateMicros = adjustedExpectedRate * averageTimeMicros;
-        long leftOver = 1000000 - timeToMeetRateMicros;
-        if (leftOver <= 0) {
-            sleepTimeMicros = 0;
-        } else {
-            sleepTimeMicros = (leftOver / adjustedExpectedRate)
-                    - sleepOverheadMicros;
-        }
-
-        // how many events can be processed in the nanos time?
-        int eventsBeforeSleep = 1;
-        if (sleepTimeMicros < 1000) {
-            sleepTimeMicros = 1000 + sleepOverheadMicros;
-            millis = 1;
-            double numNapsDouble = ((double) leftOver / sleepTimeMicros);
-            int numNaps = (int) Math.ceil(numNapsDouble);
-            if (numNaps > 0) {
-                eventsBeforeSleep = adjustedExpectedRate / numNaps;
-            }
-
-            if (leftOver <= 0) {
-                millis = 0;
-                eventsBeforeSleep = 1000;
-            }
-        } else {
-            millis = sleepTimeMicros / 1000;
-        }
-
-        rateInfo[0] = millis;
-        rateInfo[1] = eventsBeforeSleep;
-        return rateInfo;
-    }
-
-	public static class EventTypeInfo {
-        private Schema schema;
-        private String streamName;
-
-        public EventTypeInfo(Schema schema, String streamName) {
-            this.schema = schema;
-            this.streamName = streamName;
-        }
-
-        public Schema getSchema() {
-            return schema;
-        }
-
-        public String getStreamName() {
-            return streamName;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/MethodInvoker.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/MethodInvoker.java b/s4-core/src/main/java/io/s4/util/MethodInvoker.java
deleted file mode 100644
index cd4e90d..0000000
--- a/s4-core/src/main/java/io/s4/util/MethodInvoker.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Utility class to invoke a method on an arbitrary object, if such a method is
- * defined.
- */
-public class MethodInvoker {
-
-    /**
-     * Find and invoke a getter on an object. A getter for parameter N is a
-     * public method whose name equals "get" + N, ignoring case, and which takes
-     * zero arguments. If no such method is found, an exception is thrown.
-     * 
-     * @param obj
-     *            object on which getter id to be invoked
-     * @param name
-     *            parameter name
-     * @return value returned by the getter method, if such a method is found.
-     *         Null if the object is null.
-     * @throws Exception
-     *             if no suitable getter is found, or if the getter method
-     *             throws an exception. The latter is wrapped in an
-     *             {@link InvocationTargetException}
-     */
-    public static Object invokeGetter(Object obj, String name) throws Exception {
-        if (obj != null) {
-            Method getter = findGetter(obj.getClass(), name);
-
-            if (getter != null) {
-                return getter.invoke(obj);
-
-            } else {
-                throw new NoGetterException(obj.getClass(), name);
-            }
-
-        } else {
-            throw new Exception("Null Target");
-        }
-    }
-
-    private static ConcurrentHashMap<Class<?>, HashMap<String, Method>> gettersMap = new ConcurrentHashMap<Class<?>, HashMap<String, Method>>();
-
-    private static Method findGetter(Class<?> clazz, String name) {
-        HashMap<String, Method> getters = gettersMap.get(clazz);
-
-        if (getters == null) {
-            HashMap<String, Method> newGetters = reflectGetters(clazz);
-
-            getters = gettersMap.putIfAbsent(clazz, newGetters);
-
-            if (getters == null)
-                getters = newGetters;
-        }
-
-        return getters.get(name.toLowerCase());
-    }
-
-    private static HashMap<String, Method> reflectGetters(Class<?> clazz) {
-        HashMap<String, Method> map = new HashMap<String, Method>();
-
-        for (Method m : clazz.getMethods()) {
-            // the method we are interested in should be named get* and take no
-            // arguments.
-            String n = m.getName();
-
-            if (m.getParameterTypes().length == 0 && n.startsWith("get")) {
-                String name = n.substring(3).toLowerCase();
-
-                map.put(name, m);
-            }
-        }
-
-        return map;
-    }
-
-    private static Class<?>[] getTypes(Object[] args) {
-        Class<?>[] aT = new Class<?>[args.length];
-        for (int i = 0; i < args.length; ++i) {
-            aT[i] = args[i].getClass();
-        }
-
-        return aT;
-    }
-
-    public static class NoGetterException extends Exception {
-        public NoGetterException(Class<?> clazz, String name) {
-            super("No Getter for attribute " + clazz.getName() + "." + name);
-        }
-    }
-
-    public static class NoMethodException extends Exception {
-        public NoMethodException(Class<?> clazz, String name, Object[] args) {
-            super("No method found " + clazz.getName() + "." + name + "("
-                    + MethodInvoker.getTypes(args) + ")");
-        }
-    }
-
-    public static class NullTargetException extends Exception {
-        public NullTargetException() {
-            super();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/MetricsName.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/MetricsName.java b/s4-core/src/main/java/io/s4/util/MetricsName.java
deleted file mode 100644
index 1b4b3d6..0000000
--- a/s4-core/src/main/java/io/s4/util/MetricsName.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-public enum MetricsName {
-    // metrics event name
-    S4_APP_METRICS("S4::S4AppMetrics"), S4_EVENT_METRICS("S4::S4EventMetrics"), S4_CORE_METRICS(
-            "S4::S4CoreMetrics"),
-
-    // metrics name
-    low_level_listener_msg_in_ct("lll_in"), low_level_listener_msg_drop_ct(
-            "lll_dr"), low_level_listener_qsz("lll_qsz"), low_level_listener_badmsg_ct(
-            "lll_bad"), // exception can't be caught
-    generic_listener_msg_in_ct("gl_in"), pecontainer_ev_dq_ct("pec_dq"), pecontainer_ev_nq_ct(
-            "pec_nq"), pecontainer_msg_drop_ct("pec_dr"), pecontainer_qsz(
-            "pec_qsz"), pecontainer_qsz_w("pec_qsz_w"), pecontainer_ev_process_ct(
-            "pec_pr"), pecontainer_pe_ct("pec_pe"), pecontainer_ev_err_ct(
-            "pec_err"), // exception can't be caught
-    pecontainer_exec_elapse_time("pec_exec_t"), low_level_emitter_msg_out_ct(
-            "lle_out"), low_level_emitter_out_err_ct("lle_err"), low_level_emitter_qsz(
-            "lle_qsz"), s4_core_exit_ct("s4_ex_ct"), s4_core_free_mem("s4_fmem"), pe_join_ev_ct(
-            "pe_j_ct"), pe_error_count("pe_err");
-
-    private final String eventShortName;
-
-    private MetricsName(String eventShortName) {
-        this.eventShortName = eventShortName;
-    }
-
-    public String toString() {
-        return eventShortName;
-    }
-
-    public static void main(String[] args) {
-        System.out.println(generic_listener_msg_in_ct.toString());
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/MiscConstants.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/MiscConstants.java b/s4-core/src/main/java/io/s4/util/MiscConstants.java
deleted file mode 100644
index ed2321e..0000000
--- a/s4-core/src/main/java/io/s4/util/MiscConstants.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-public class MiscConstants {
-    public final static String EVENT_WRAPPER_SCHEMA_NAME = "EventWrapper";
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/NumberUtils.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/NumberUtils.java b/s4-core/src/main/java/io/s4/util/NumberUtils.java
deleted file mode 100644
index a49efb9..0000000
--- a/s4-core/src/main/java/io/s4/util/NumberUtils.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-import java.math.BigInteger;
-
-public class NumberUtils {
-
-    private final static BigInteger B64 = BigInteger.ZERO.setBit(64);
-
-    public static BigInteger getLongAsUnsignedBigInteger(long number) {
-        if (number >= 0)
-            return BigInteger.valueOf(number);
-        return BigInteger.valueOf(number).add(B64);
-    }
-
-    public static String getUnsignedBigIntegerAsHex(BigInteger bi) {
-        String hexString = bi.toString(16);
-        if (hexString.length() < 16) {
-            String zeroes = "000000000000000";
-            hexString = zeroes.substring(0, 16 - hexString.length())
-                    + hexString;
-        }
-        return hexString.toUpperCase();
-    }
-
-    public static void main(String args[]) {
-        BigInteger bi = getLongAsUnsignedBigInteger(0x00f1200000004561L);
-        System.out.println(getUnsignedBigIntegerAsHex(bi));
-
-        bi = getLongAsUnsignedBigInteger(0x80f12000dd004561L);
-        System.out.println(getUnsignedBigIntegerAsHex(bi));
-
-        bi = getLongAsUnsignedBigInteger(0x1161L);
-        System.out.println(getUnsignedBigIntegerAsHex(bi));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/PreprodLogger.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/PreprodLogger.java b/s4-core/src/main/java/io/s4/util/PreprodLogger.java
deleted file mode 100644
index 4d075b0..0000000
--- a/s4-core/src/main/java/io/s4/util/PreprodLogger.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.text.DecimalFormat;
-
-import org.apache.log4j.Logger;
-
-public class PreprodLogger {
-
-    String filenamePrefix;
-    File file;
-    FileOutputStream fos;
-
-    private DecimalFormat formatter = new DecimalFormat("0000");
-
-    public void setFilenamePrefix(String filenamePrefix) {
-        this.filenamePrefix = filenamePrefix;
-    }
-
-    public PreprodLogger() {
-    }
-
-    public void openNewFile() {
-        for (int count = 1; true; count++) {
-            String countString = formatter.format(count);
-            String filename = filenamePrefix + "." + countString + ".txt";
-            file = new File(filename);
-            if (!file.exists()) {
-                break;
-            }
-        }
-
-        try {
-            fos = new FileOutputStream(file);
-        } catch (IOException ioe) {
-            Logger.getLogger("s4")
-                  .error("Some sort of exception opening event logging file",
-                         ioe);
-            return;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/ReverseDoubleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/ReverseDoubleOutputFormatter.java b/s4-core/src/main/java/io/s4/util/ReverseDoubleOutputFormatter.java
deleted file mode 100644
index 548117c..0000000
--- a/s4-core/src/main/java/io/s4/util/ReverseDoubleOutputFormatter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-import io.s4.processor.OutputFormatter;
-
-public class ReverseDoubleOutputFormatter implements OutputFormatter {
-
-    @Override
-    public Object format(Object outputValue) {
-        Double doubleObject = (Double) outputValue;
-        return String.valueOf(doubleObject);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/ReverseIntegerOutputFormatter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/ReverseIntegerOutputFormatter.java b/s4-core/src/main/java/io/s4/util/ReverseIntegerOutputFormatter.java
deleted file mode 100644
index 5e9b990..0000000
--- a/s4-core/src/main/java/io/s4/util/ReverseIntegerOutputFormatter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-import io.s4.processor.OutputFormatter;
-
-public class ReverseIntegerOutputFormatter implements OutputFormatter {
-
-    @Override
-    public Object format(Object outputValue) {
-        Integer integerObject = (Integer) outputValue;
-        return String.valueOf(integerObject);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/S4Util.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/S4Util.java b/s4-core/src/main/java/io/s4/util/S4Util.java
deleted file mode 100644
index 64ea8f8..0000000
--- a/s4-core/src/main/java/io/s4/util/S4Util.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-public class S4Util {
-    public static long getPID() {
-        String processName = java.lang.management.ManagementFactory.getRuntimeMXBean()
-                                                                   .getName();
-        return Long.parseLong(processName.split("@")[0]);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/SlotUtils.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/SlotUtils.java b/s4-core/src/main/java/io/s4/util/SlotUtils.java
deleted file mode 100644
index d030f77..0000000
--- a/s4-core/src/main/java/io/s4/util/SlotUtils.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-
-public class SlotUtils {
-
-    private int slotSize; // slot size in seconds
-
-    public SlotUtils(int slotSize) {
-        this.slotSize = slotSize;
-    }
-
-    public void setSize(int slotSize) {
-        this.slotSize = slotSize;
-    }
-
-    public Long getSlotAtTime(long time) {
-        return slotSize * (long) Math.floor(time / slotSize);
-    }
-
-    public Long getCurrentSlot() {
-        long currTimeStamp = System.currentTimeMillis() / 1000; // convert to
-        // seconds
-        Long slotTimeStamp = getSlotAtTime(currTimeStamp);
-        return slotTimeStamp;
-    }
-
-    public Long getSlot(int index, long currTimeStamp) {
-        Long slotTimeStamp = getSlotAtTime(currTimeStamp + index * slotSize);
-        return slotTimeStamp;
-    }
-
-    public boolean isOutsideWindow(Long slot, int windowSize, long time) {
-        boolean outside = false;
-        long windowBoundary = getSlotAtTime(time) - windowSize;
-        if (slot.longValue() < windowBoundary) {
-            outside = true;
-        }
-        return outside;
-    }
-
-    public static void main(String[] args) {
-        SlotUtils s = new SlotUtils(300);
-        System.out.printf("%d\n", s.getCurrentSlot());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/ToStringOutputFormatter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/ToStringOutputFormatter.java b/s4-core/src/main/java/io/s4/util/ToStringOutputFormatter.java
deleted file mode 100644
index 908fc43..0000000
--- a/s4-core/src/main/java/io/s4/util/ToStringOutputFormatter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-import io.s4.processor.OutputFormatter;
-
-public class ToStringOutputFormatter implements OutputFormatter {
-
-    @Override
-    public Object format(Object outputValue) {
-        return String.valueOf(outputValue);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/Watcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/Watcher.java b/s4-core/src/main/java/io/s4/util/Watcher.java
deleted file mode 100644
index c106fc3..0000000
--- a/s4-core/src/main/java/io/s4/util/Watcher.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.util;
-
-import static io.s4.util.MetricsName.S4_CORE_METRICS;
-import static io.s4.util.MetricsName.s4_core_exit_ct;
-import static io.s4.util.MetricsName.s4_core_free_mem;
-import io.s4.logger.Monitor;
-import io.s4.persist.Persister;
-import io.s4.processor.AsynchronousEventProcessor;
-
-import java.io.File;
-import java.text.MessageFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-
-import org.apache.log4j.Logger;
-
-public class Watcher implements Runnable {
-    Runtime rt = Runtime.getRuntime();
-    AsynchronousEventProcessor peContainer;
-    Persister persister;
-    Persister localPersister;
-    String configFilename;
-    long configFileTime = -1;
-    Monitor monitor;
-
-    public void setMonitor(Monitor monitor) {
-        this.monitor = monitor;
-    }
-
-    private long minimumMemory = 200 * 1024 * 1024;
-
-    SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
-
-    public void setMinimumMemory(long minimumMemory) {
-        this.minimumMemory = minimumMemory;
-    }
-
-    public void setPeContainer(AsynchronousEventProcessor peContainer) {
-        this.peContainer = peContainer;
-    }
-
-    public void setPersister(Persister persister) {
-        this.persister = persister;
-    }
-
-    public void setLocalPersister(Persister localPersister) {
-        this.localPersister = localPersister;
-    }
-
-    public void setConfigFilename(String configFilename) {
-        this.configFilename = configFilename;
-    }
-
-    public Watcher() {
-
-    }
-
-    public void init() {
-        Thread t = new Thread(this);
-        t.start();
-    }
-
-    public void run() {
-        try {
-            while (true) {
-                String stringTime = dateFormatter.format(new Date());
-
-                String template1 = "{0,number,#######0} waiting processing";
-                Logger.getLogger("s4")
-                      .info(MessageFormat.format(template1,
-                                                 peContainer.getQueueSize()));
-
-                String template2 = "Total: {0,number,#######0}, max {1,number,#######0}, free {2,number,#######0}";
-                Logger.getLogger("s4")
-                      .info(MessageFormat.format(template2,
-                                                 rt.totalMemory(),
-                                                 rt.maxMemory(),
-                                                 rt.freeMemory()));
-                memoryCheck();
-                configCheck();
-                try {
-                    Thread.sleep(15000);
-                } catch (Exception e) {
-                }
-            }
-        } catch (Exception e) {
-            Logger.getLogger("s4")
-                  .warn("Some sort of exception in Watcher thread", e);
-            try {
-                Thread.sleep(30000);
-            } catch (Exception ie) {
-            }
-        }
-    }
-
-    private ArrayList<byte[]> memoryHog = new ArrayList<byte[]>();
-
-    private void memoryCheck() {
-        long total = rt.totalMemory();
-        long max = rt.maxMemory();
-        long free = rt.freeMemory();
-        long actualFree = (max - total) + free;
-
-        try {
-            if (monitor != null) {
-                monitor.set(s4_core_free_mem.toString(),
-                            (int) (actualFree / 1024 / 1024.0),
-                            S4_CORE_METRICS.toString());
-            }
-
-            if (actualFree < minimumMemory) {
-                Logger.getLogger("s4").error("Too little memory remaining: "
-                        + actualFree + ". Exiting so process can be restarted");
-                if (monitor != null) {
-                    monitor.set(s4_core_exit_ct.toString(),
-                                1,
-                                S4_CORE_METRICS.toString());
-                }
-                System.exit(3);
-            }
-        } catch (Exception e) {
-            Logger.getLogger("s4").error("metrics name doesn't exist: ", e);
-        }
-
-        // TODO: Comment this out!!
-        // memoryHog.add(new byte[10*1024*1024]);
-    }
-
-    private void configCheck() {
-        if (configFilename == null) {
-            return;
-        }
-
-        File file = new File(configFilename);
-        if (!file.exists()) {
-            return;
-        }
-        long lastModified = file.lastModified();
-        if (configFileTime == -1) {
-            configFileTime = lastModified;
-            return;
-        }
-
-        if (lastModified > configFileTime) {
-            Logger.getLogger("s4").info("Config file has changed. Exiting!!");
-            try {
-                if (monitor != null) {
-                    monitor.set(s4_core_exit_ct.toString(),
-                                1,
-                                S4_CORE_METRICS.toString());
-                }
-            } catch (Exception e) {
-                Logger.getLogger("s4").error("metrics name doesn't exist: ", e);
-            }
-            System.exit(4);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/clock/Clock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/clock/Clock.java b/s4-core/src/main/java/io/s4/util/clock/Clock.java
deleted file mode 100644
index 3d0969b..0000000
--- a/s4-core/src/main/java/io/s4/util/clock/Clock.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-
-package io.s4.util.clock;
-
-public interface Clock {
-
-    public long waitForTime(long targetTime);
-
-    public long getCurrentTime();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/clock/ClockStreamsLoader.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/clock/ClockStreamsLoader.java b/s4-core/src/main/java/io/s4/util/clock/ClockStreamsLoader.java
deleted file mode 100644
index 90f73ef..0000000
--- a/s4-core/src/main/java/io/s4/util/clock/ClockStreamsLoader.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-
-package io.s4.util.clock;
-
-
-import java.util.HashMap;
-
-import org.apache.log4j.Logger;
-
-public class ClockStreamsLoader {
-
-    private static Logger logger = Logger.getLogger(ClockStreamsLoader.class);
-    Clock s4Clock;
-    HashMap<String, String> streamFieldMap;
-
-    public void setStreamFieldMap(HashMap<String, String> streamFieldMap) {
-        this.streamFieldMap = streamFieldMap;
-    }
-
-    public void setS4Clock(Clock s4Clock) {
-        this.s4Clock = s4Clock;
-    }
-
-    public void addStreams() {
-        if (s4Clock instanceof EventClock) {
-            EventClock eventClock = (EventClock) s4Clock;
-            for (String streamName : streamFieldMap.keySet()) {
-                eventClock.addEventClockStream(streamName,
-                        streamFieldMap.get(streamName));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/clock/DrivenClock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/clock/DrivenClock.java b/s4-core/src/main/java/io/s4/util/clock/DrivenClock.java
deleted file mode 100644
index f55c7ec..0000000
--- a/s4-core/src/main/java/io/s4/util/clock/DrivenClock.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-
-package io.s4.util.clock;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-public class DrivenClock implements Clock {
-    
-    private volatile long currentTime;
-    private NavigableMap<Long, List<TimerRequest>> timerRequests = new TreeMap<Long, List<TimerRequest>>();
-
-    public void updateTime(long newCurrentTime) {
-        if (newCurrentTime < currentTime) {
-            return;
-        }
-        List<TimerRequest> relevantRequests = null;
-        synchronized (timerRequests) {
-            currentTime = newCurrentTime;
-            while (true) {
-                // inspect the top of the timer request list and see if any
-                // request
-                // is
-                // satisfied by the new current time
-                Entry<Long, List<TimerRequest>> entry = timerRequests
-                        .firstEntry();
-                if (entry == null || entry.getKey() > newCurrentTime) {
-                    break;
-                }
-                relevantRequests = timerRequests.remove(entry.getKey());
-            }
-            if (relevantRequests != null) {
-                for (TimerRequest timerRequest : relevantRequests) {
-                    timerRequest.wakeUp(newCurrentTime);
-                }
-            }
-        }
-    }
-
-    public long waitForTime(long targetTime) {
-        TimerRequest timerRequest = null;
-        synchronized (timerRequests) {
-            if (targetTime <= currentTime) {
-                return currentTime;
-            }
-            timerRequest = new TimerRequest(targetTime);
-            List<TimerRequest> requestsForTargetTime = timerRequests.get(targetTime);
-            if (requestsForTargetTime == null) {
-                requestsForTargetTime = new ArrayList<TimerRequest>();
-                timerRequests.put(targetTime, requestsForTargetTime);
-            }
-            requestsForTargetTime.add(timerRequest);
-        }
-        return timerRequest.waitForTargetTime();
-    }
-
-
-    public long getCurrentTime() {
-        return getCurrentTime(true);
-    }
-
-    public long getCurrentTime(boolean waitOnInitialization) {
-        if (currentTime == 0 && waitOnInitialization) {
-            // if tick has never been called, wait for it to be called once
-            this.waitForTime(1);
-        }
-        return currentTime;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/clock/EventClock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/clock/EventClock.java b/s4-core/src/main/java/io/s4/util/clock/EventClock.java
deleted file mode 100644
index 2dbdfed..0000000
--- a/s4-core/src/main/java/io/s4/util/clock/EventClock.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-
-package io.s4.util.clock;
-
-import io.s4.collector.EventWrapper;
-import io.s4.schema.Schema;
-import io.s4.schema.Schema.Property;
-import io.s4.schema.SchemaContainer;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class EventClock extends DrivenClock {
-
-    private static Logger logger = Logger.getLogger(EventClock.class);
-
-    Map<String, String> eventClockStreamsMap = new HashMap<String, String>();
-    SchemaContainer schemaContainer = new SchemaContainer();
-
-    public void update(EventWrapper eventWrapper) {
-        long eventTime = -1;
-        String streamName = eventWrapper.getStreamName();
-        String fieldName = eventClockStreamsMap.get(streamName);
-        if (fieldName != null) {
-            Object event = eventWrapper.getEvent();
-            Schema schema = schemaContainer.getSchema(event.getClass());
-            Property property = schema.getProperties().get(fieldName);
-            if (property != null
-                    && (property.getType().equals(Long.TYPE) || property
-                            .getType().equals(Long.class))) {
-                try {
-                    eventTime = (Long) property.getGetterMethod().invoke(event);
-                    updateTime(eventTime);
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        } 
-    }
-
-    public void addEventClockStream(String streamName, String fieldName) {
-        String fieldNameInStream = eventClockStreamsMap.get(streamName);
-        if (fieldNameInStream != null) {
-            if (!fieldNameInStream.equals(fieldName)) {
-                // we can add an runtime exception over error messages for
-                // making debugging easy
-                logger.error("Stream " + streamName
-                        + " already has a timestamp field defined "
-                        + eventClockStreamsMap.get(streamName));
-                logger.error("Stream " + streamName
-                        + " is updating the timestamp field to " + fieldName);
-                eventClockStreamsMap.put(streamName, fieldName);
-            }
-        } else {
-            eventClockStreamsMap.put(streamName, fieldName);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/clock/TimerRequest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/clock/TimerRequest.java b/s4-core/src/main/java/io/s4/util/clock/TimerRequest.java
deleted file mode 100644
index fad181e..0000000
--- a/s4-core/src/main/java/io/s4/util/clock/TimerRequest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-
-package io.s4.util.clock;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class TimerRequest {
-    private long targetTime;
-    private BlockingQueue<Long> blockingQueue = new LinkedBlockingQueue<Long>();
-
-    public TimerRequest(long targetTime) {
-        this.targetTime = targetTime;
-    }
-
-    public long getTargetTime() {
-        return targetTime;
-    }
-
-    public void wakeUp(long currentTime) {
-        blockingQueue.add(currentTime);
-    }
-
-    public long waitForTargetTime() {
-        try {
-            return blockingQueue.take();
-        } catch (InterruptedException ie) {
-            return -1;
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/util/clock/WallClock.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/util/clock/WallClock.java b/s4-core/src/main/java/io/s4/util/clock/WallClock.java
deleted file mode 100644
index b707157..0000000
--- a/s4-core/src/main/java/io/s4/util/clock/WallClock.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-
-package io.s4.util.clock;
-
-
-public class WallClock implements Clock {
-
-    @Override
-    public long waitForTime(long targetTime) {
-        long interval = (targetTime - getCurrentTime());
-        if (interval > 0) {
-            try {
-                Thread.sleep(interval);
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            }
-        }
-        return getCurrentTime();
-    }
-
-    @Override
-    public long getCurrentTime() {
-        // TODO Auto-generated method stub
-        return System.currentTimeMillis();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/MainApp.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/MainApp.java b/s4-core/src/main/java/org/apache/s4/MainApp.java
new file mode 100644
index 0000000..a88e7a5
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/MainApp.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package io.s4;
+
+import org.apache.s4.ft.SafeKeeper;
+import org.apache.s4.processor.AbstractPE;
+import org.apache.s4.processor.PEContainer;
+import org.apache.s4.util.S4Util;
+import org.apache.s4.util.Watcher;
+import org.apache.s4.util.clock.Clock;
+import org.apache.s4.util.clock.EventClock;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.FileSystemXmlApplicationContext;
+import org.springframework.core.io.ClassPathResource;
+
+
+public class MainApp {
+
+    private static String coreHome = "../s4-core";
+    private static String appsHome = "../s4-apps";
+    private static String extsHome = "../s4-exts";
+
+    public static void main(String args[]) throws Exception {
+        Options options = new Options();
+
+        options.addOption(OptionBuilder.withArgName("corehome")
+                                       .hasArg()
+                                       .withDescription("core home")
+                                       .create("c"));
+
+        options.addOption(OptionBuilder.withArgName("appshome")
+                                       .hasArg()
+                                       .withDescription("applications home")
+                                       .create("a"));
+
+        options.addOption(OptionBuilder.withArgName("s4clock")
+                                       .hasArg()
+                                       .withDescription("s4 clock")
+                                       .create("d"));
+
+        options.addOption(OptionBuilder.withArgName("seedtime")
+                                       .hasArg()
+                                       .withDescription("event clock initialization time")
+                                       .create("s"));        
+        
+        options.addOption(OptionBuilder.withArgName("extshome")
+                                       .hasArg()
+                                       .withDescription("extensions home")
+                                       .create("e"));
+
+        options.addOption(OptionBuilder.withArgName("instanceid")
+                                       .hasArg()
+                                       .withDescription("instance id")
+                                       .create("i"));
+
+        options.addOption(OptionBuilder.withArgName("configtype")
+                                       .hasArg()
+                                       .withDescription("configuration type")
+                                       .create("t"));
+
+        CommandLineParser parser = new GnuParser();
+        CommandLine commandLine = null;
+        String clockType = "wall";
+
+        try {
+            commandLine = parser.parse(options, args);
+        } catch (ParseException pe) {
+            System.err.println(pe.getLocalizedMessage());
+            System.exit(1);
+        }
+
+        int instanceId = -1;
+        if (commandLine.hasOption("i")) {
+            String instanceIdStr = commandLine.getOptionValue("i");
+            try {
+                instanceId = Integer.parseInt(instanceIdStr);
+            } catch (NumberFormatException nfe) {
+                System.err.println("Bad instance id: %s" + instanceIdStr);
+                System.exit(1);
+            }
+        }
+
+        if (commandLine.hasOption("c")) {
+            coreHome = commandLine.getOptionValue("c");
+        }
+
+        if (commandLine.hasOption("a")) {
+            appsHome = commandLine.getOptionValue("a");
+        }
+        
+        if (commandLine.hasOption("d")) {
+            clockType = commandLine.getOptionValue("d");
+        }
+
+        if (commandLine.hasOption("e")) {
+            extsHome = commandLine.getOptionValue("e");
+        }
+
+        String configType = "typical";
+        if (commandLine.hasOption("t")) {
+            configType = commandLine.getOptionValue("t");
+        }
+        
+        long seedTime = 0;
+        if (commandLine.hasOption("s")) {
+            seedTime = Long.parseLong(commandLine.getOptionValue("s"));
+        }
+
+        File coreHomeFile = new File(coreHome);
+        if (!coreHomeFile.isDirectory()) {
+            System.err.println("Bad core home: " + coreHome);
+            System.exit(1);
+        }
+
+        File appsHomeFile = new File(appsHome);
+        if (!appsHomeFile.isDirectory()) {
+            System.err.println("Bad applications home: " + appsHome);
+            System.exit(1);
+        }
+
+        if (instanceId > -1) {
+            System.setProperty("instanceId", "" + instanceId);
+        } else {
+            System.setProperty("instanceId", "" + S4Util.getPID());
+        }
+
+        List loArgs = commandLine.getArgList();
+
+        if (loArgs.size() < 1) {
+            // System.err.println("No bean configuration file specified");
+            // System.exit(1);
+        }
+
+        // String s4ConfigXml = (String) loArgs.get(0);
+        // System.out.println("s4ConfigXml is " + s4ConfigXml);
+
+        ClassPathResource propResource = new ClassPathResource("s4-core.properties");
+        Properties prop = new Properties();
+        if (propResource.exists()) {
+            prop.load(propResource.getInputStream());
+        } else {
+            System.err.println("Unable to find s4-core.properties. It must be available in classpath");
+            System.exit(1);
+        }
+
+        ApplicationContext coreContext = null;
+        String configBase = coreHome + File.separatorChar + "conf"
+                + File.separatorChar + configType;
+        String configPath = "";
+        List<String> coreConfigUrls = new ArrayList<String>(); 
+        File configFile = null;
+
+        // load clock configuration
+        configPath = configBase + File.separatorChar + clockType + "-clock.xml";            
+        coreConfigUrls.add(configPath);
+
+        // load core config xml
+        configPath = configBase + File.separatorChar + "s4-core-conf.xml";
+        configFile = new File(configPath);
+        if (!configFile.exists()) {
+            System.err.printf("S4 core config file %s does not exist\n",
+                    configPath);
+            System.exit(1);
+        }
+		
+        coreConfigUrls.add(configPath);
+        String[] coreConfigFiles = new String[coreConfigUrls.size()];
+        coreConfigUrls.toArray(coreConfigFiles);
+
+        String[] coreConfigFileUrls = new String[coreConfigFiles.length];
+        for (int i = 0; i < coreConfigFiles.length; i++) {
+            coreConfigFileUrls[i] = "file:" + coreConfigFiles[i];
+        }
+
+        coreContext = new FileSystemXmlApplicationContext(coreConfigFileUrls, coreContext);
+        ApplicationContext context = coreContext;        
+        
+        Clock clock = (Clock) context.getBean("clock");
+        if (clock instanceof EventClock && seedTime > 0) {
+            EventClock s4EventClock = (EventClock)clock;
+            s4EventClock.updateTime(seedTime);
+            System.out.println("Intializing event clock time with seed time " + s4EventClock.getCurrentTime());
+        }
+        
+        PEContainer peContainer = (PEContainer) context.getBean("peContainer");
+
+        Watcher w = (Watcher) context.getBean("watcher");
+        w.setConfigFilename(configPath);
+
+        
+        // load extension modules
+        String[] configFileNames = getModuleConfigFiles(extsHome, prop);
+        if (configFileNames.length > 0) {
+            String[] configFileUrls = new String[configFileNames.length];
+            for (int i = 0; i < configFileNames.length; i++) {
+                configFileUrls[i] = "file:" + configFileNames[i];
+            }
+            context = new FileSystemXmlApplicationContext(configFileUrls,
+                                                          context);
+        }
+
+        // load application modules
+        configFileNames = getModuleConfigFiles(appsHome, prop);
+        if (configFileNames.length > 0) {
+            String[] configFileUrls = new String[configFileNames.length];
+            for (int i = 0; i < configFileNames.length; i++) {
+                configFileUrls[i] = "file:" + configFileNames[i];
+            }
+            context = new FileSystemXmlApplicationContext(configFileUrls,
+                                                          context);
+            // attach any beans that implement ProcessingElement to the PE
+            // Container
+            String[] processingElementBeanNames = context.getBeanNamesForType(AbstractPE.class);
+            for (String processingElementBeanName : processingElementBeanNames) {
+                AbstractPE bean = (AbstractPE) context.getBean(processingElementBeanName);
+                bean.setClock(clock);
+                try {
+                    bean.setSafeKeeper((SafeKeeper) context.getBean("safeKeeper"));
+                } catch (NoSuchBeanDefinitionException ignored) {
+                    // no safe keeper = no checkpointing / recovery
+                }
+                // if the application did not specify an id, use the Spring bean name
+                if (bean.getId() == null) {
+                    bean.setId(processingElementBeanName);
+                }
+                System.out.println("Adding processing element with bean name "
+                        + processingElementBeanName + ", id "
+                        + ((AbstractPE) bean).getId());
+                peContainer.addProcessor((AbstractPE) bean);
+            }
+        }  
+    }
+
+    /**
+     * 
+     * @param prop
+     * @return
+     */
+    private static String[] getModuleConfigFiles(String moduleBase, Properties prop) {
+        List<String> configFileList = new ArrayList<String>();
+        File moduleBaseFile = new File(moduleBase);
+
+        // list applications
+        File[] moduleDirs = moduleBaseFile.listFiles();
+        for (File moduleDir : moduleDirs) {
+            if (moduleDir.isDirectory()) {
+                String confFileName = moduleDir.getAbsolutePath() + "/"
+                        + moduleDir.getName() + "-conf.xml";
+                File appsConfFile = new File(confFileName);
+                if (appsConfFile.exists()) {
+                    configFileList.add(appsConfFile.getAbsolutePath());
+                } else {
+                    System.err.println("Invalid application: " + moduleDir);
+                }
+            }
+        }
+        String[] ret = new String[configFileList.size()];
+        configFileList.toArray(ret);
+        System.out.println(Arrays.toString(ret));
+        return ret;
+    }    
+}


Mime
View raw message