incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [31/50] [abbrv] Rename packages in preparation for move to Apache
Date Tue, 03 Jan 2012 11:19:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/transformer/Transformer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/transformer/Transformer.java b/s4-core/src/main/java/io/s4/dispatcher/transformer/Transformer.java
deleted file mode 100644
index 3f851c5..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/transformer/Transformer.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher.transformer;
-
-
-public interface Transformer {
-    public Object transform(Object event);
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java b/s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java
deleted file mode 100644
index 8318b78..0000000
--- a/s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.emitter;
-
-import static io.s4.util.MetricsName.S4_CORE_METRICS;
-import static io.s4.util.MetricsName.S4_EVENT_METRICS;
-import static io.s4.util.MetricsName.low_level_emitter_msg_out_ct;
-import static io.s4.util.MetricsName.low_level_emitter_out_err_ct;
-import static io.s4.util.MetricsName.low_level_emitter_qsz;
-import io.s4.collector.EventWrapper;
-import io.s4.comm.core.SenderProcess;
-import io.s4.comm.core.Serializer;
-import io.s4.listener.CommLayerListener;
-import io.s4.logger.Monitor;
-import io.s4.message.Request;
-import io.s4.serialize.SerializerDeserializer;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.log4j.Logger;
-
-public class CommLayerEmitter implements EventEmitter, Runnable {
-    private static Logger logger = Logger.getLogger(CommLayerEmitter.class);
-
-    // config for emitter is coupled with listener, if there is a non-null
-    // listener. This prevents 2 tasks from being checked out of the cluster
-    // config for the same node: one for the listener and another for the
-    // emitter.
-    private CommLayerListener listener;
-
-    private SenderProcess sender;
-    private int nodeCount;
-    private BlockingQueue<MessageHolder> messageQueue = new LinkedBlockingDeque<MessageHolder>();
-    private String senderId;
-    private String clusterManagerAddress;
-    private String appName;
-    private String listenerAppName = null;
-    private Monitor monitor;
-    private SerializerDeserializer serDeser;
-
-    public void setSerDeser(SerializerDeserializer serDeser) {
-        this.serDeser = serDeser;
-    }
-
-    public void setMonitor(Monitor monitor) {
-        this.monitor = monitor;
-    }
-
-    public void setSenderId(String senderId) {
-        this.senderId = senderId;
-    }
-
-    public void setClusterManagerAddress(String clusterManagerAddress) {
-        this.clusterManagerAddress = clusterManagerAddress;
-    }
-
-    public void setAppName(String appName) {
-        this.appName = appName;
-    }
-
-    public void setListenerAppName(String listenerAppName) {
-        this.listenerAppName = listenerAppName;
-    }
-
-    public void setListener(CommLayerListener listener) {
-        this.listener = listener;
-    }
-    
-    public CommLayerListener getListener() {
-        return this.listener;
-    }
-
-    public void init() {
-
-        Thread t = new Thread(this, "CommLayerEmitter");
-        t.start();
-    }
-
-    public void queueMessage(MessageHolder messageHolder) {
-        messageQueue.add(messageHolder);
-        try {
-            if (monitor != null) {
-                monitor.set(low_level_emitter_qsz.toString(),
-                            messageQueue.size(),
-                            S4_CORE_METRICS.toString());
-            }
-        } catch (Exception e) {
-            logger.error("Exception in monitor metrics on thread "
-                    + Thread.currentThread().getId(), e);
-        }
-    }
-
-    @Override
-    public void emit(int partitionId, EventWrapper eventWrapper) {
-
-        // Special handling required for Requests
-        if (eventWrapper.getEvent() instanceof Request) {
-            decorateRequest((Request) eventWrapper.getEvent());
-        }
-
-        try {
-        	MessageHolder mh = new MessageHolder(partitionId, eventWrapper);            
-            queueMessage(mh);
-        } catch (RuntimeException rte) {
-            if (monitor != null) {
-                monitor.increment(low_level_emitter_out_err_ct.toString(),
-                                  1,
-                                  S4_EVENT_METRICS.toString(),
-                                  "et",
-                                  eventWrapper.getStreamName());
-            }
-            Logger.getLogger("s4").error("Error serializing or emitting event "
-                                                 + eventWrapper.getEvent(),
-                                         rte);
-            throw rte;
-        }
-    }
-
-    // Add partition id of sender
-    private void decorateRequest(Request r) {
-        Request.RInfo rinfo = r.getRInfo();
-
-        if (rinfo != null && listener != null)
-            rinfo.setPartition(listener.getId());
-    }
-
-    @Override
-    public int getNodeCount() {
-        if (listener == null) {
-            return 1;
-        }
-        return nodeCount;
-    }
-
-    @Override
-    public void run() {
-        if (listener == null) {
-            if (listenerAppName == null) {
-                listenerAppName = appName;
-            }
-            sender = new SenderProcess(clusterManagerAddress,
-                                       appName,
-                                       listenerAppName);
-            Map<String, String> map = new HashMap<String, String>();
-            map.put("SenderId", "" + senderId);
-            sender.setSerializer(new PassThroughSerializer());
-            sender.acquireTaskAndCreateSender(map);
-        } else {
-            Object listenerConfig = null;
-            try {
-                listenerConfig = listener.getListenerConfig();
-                if (listenerConfig == null) {
-                    logger.info("CommLayerEmitter going to wait for listener to acquire task");
-                    synchronized (listener) {
-                        listenerConfig = listener.getListenerConfig();
-                        if (listenerConfig == null) {
-                            listener.wait();
-                            listenerConfig = listener.getListenerConfig();
-                        }
-                    }
-                }
-            } catch (Exception e) {
-                logger.info("Exception in CommLayerEmitter.run()", e);
-            }
-            logger.info("Creating sender process with " + listenerConfig);
-
-            String destinationAppName = (listenerAppName != null
-                    ? listenerAppName : listener.getAppName());
-
-            sender = new SenderProcess(listener.getClusterManagerAddress(),
-                                       listener.getAppName(),
-                                       destinationAppName);
-
-            sender.setSerializer(new PassThroughSerializer());
-            sender.createSenderFromConfig(listenerConfig);
-            nodeCount = sender.getNumOfPartitions();
-        }
-        boolean isSent = false;
-        while (!Thread.interrupted()) {
-            isSent = false;
-            try {
-                MessageHolder mh = messageQueue.take();
-                byte[] rawMessage = serDeser.serialize(mh.getEventWrapper());
-                if (listener == null) {
-                    isSent = sender.send(rawMessage);
-                } else {
-                    isSent = sender.sendToPartition(mh.getPartitionId(),
-                                                    rawMessage);
-                }
-
-                if (isSent) {
-                    if (monitor != null) {
-                        monitor.increment(low_level_emitter_msg_out_ct.toString(),
-                                          1,
-                                          S4_CORE_METRICS.toString());
-                    }
-                } else {
-                    if (monitor != null) {
-                        monitor.increment(low_level_emitter_out_err_ct.toString(),
-                                          1,
-                                          S4_CORE_METRICS.toString());
-                    }
-                    logger.warn("commlayer emit failed ...");
-                }
-            } catch (InterruptedException ie) {
-                if (monitor != null) {
-                    monitor.increment(low_level_emitter_out_err_ct.toString(),
-                                      1,
-                                      S4_CORE_METRICS.toString());
-                }
-                Thread.currentThread().interrupt();
-            } catch (Exception e) {
-                Logger.getLogger("s4").error("Error emitting message", e);
-            }
-        }
-    }
-
-    public class PassThroughSerializer implements Serializer {
-        public byte[] serialize(Object input) {
-            if (input instanceof byte[]) {
-                return (byte[]) input;
-            } else {
-                return null;
-            }
-        }
-    }
-
-    class MessageHolder {
-        private int partitionId;
-        private EventWrapper eventWrapper;
-        
-        MessageHolder(int partitionId, EventWrapper eventWrapper) {
-            this.partitionId = partitionId;
-            this.eventWrapper = eventWrapper;
-        }
-
-        int getPartitionId() {
-            return partitionId;
-        }
-
-        EventWrapper getEventWrapper() {
-            return eventWrapper;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/emitter/EventEmitter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/emitter/EventEmitter.java b/s4-core/src/main/java/io/s4/emitter/EventEmitter.java
deleted file mode 100644
index 461672c..0000000
--- a/s4-core/src/main/java/io/s4/emitter/EventEmitter.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.emitter;
-
-import io.s4.collector.EventWrapper;
-
-public interface EventEmitter {
-    public void emit(int partitionId, EventWrapper eventWrapper);
-
-    public int getNodeCount();
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/ft/CheckpointingEvent.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/ft/CheckpointingEvent.java b/s4-core/src/main/java/io/s4/ft/CheckpointingEvent.java
deleted file mode 100644
index 299a7dc..0000000
--- a/s4-core/src/main/java/io/s4/ft/CheckpointingEvent.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.ft;
-
-/**
- * 
- * <p>
- * This class defines a checkpointing event (either a request for checkpoint or for recovery).
- * </p>
- * <p>
- * Checkpointing events are queued in the PE event queue and processed according the the PE processor scheduler (FIFO).
- * </p>
- */
-public abstract class CheckpointingEvent {
-
-    private SafeKeeperId safeKeeperId;
-
-    /**
-     * This is a requirement of the serialization framework
-     */
-    public CheckpointingEvent() {
-    }
-
-    /**
-     * Constructor identifying the PE subject to checkpointing or recovery
-     * @param safeKeeperId safeKeeperId
-     */
-    public CheckpointingEvent(SafeKeeperId safeKeeperId) {
-        this.safeKeeperId = safeKeeperId;
-    }
-
-    public SafeKeeperId getSafeKeeperId() {
-        return safeKeeperId;
-    }
-
-    public void setSafeKeeperId(SafeKeeperId id) {
-        this.safeKeeperId = id;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/ft/DefaultFileSystemStateStorage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/ft/DefaultFileSystemStateStorage.java b/s4-core/src/main/java/io/s4/ft/DefaultFileSystemStateStorage.java
deleted file mode 100644
index 52fd4a5..0000000
--- a/s4-core/src/main/java/io/s4/ft/DefaultFileSystemStateStorage.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.ft;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.log4j.Logger;
-
-/**
- * <p>
- * Implementation of a file system backend storage to persist checkpoints.
- * </p>
- * <p>
- * The file system may be the default local file system when running on a single
- * machine, but should be a distributed file system such as NFS when running on
- * a cluster.
- * </p>
- * <p>
- * Checkpoints are stored in individual files (1 file = 1 safeKeeperId) in
- * directories according to the following structure:
- * <code>(storageRootpath)/prototypeId/safeKeeperId</code>
- * </p>
- * 
- */
-public class DefaultFileSystemStateStorage implements StateStorage {
-
-    private static org.apache.log4j.Logger logger = Logger.getLogger(DefaultFileSystemStateStorage.class);
-    private String storageRootPath;
-
-    public DefaultFileSystemStateStorage() {
-    }
-
-    /**
-     * <p>
-     * Called by the dependency injection framework, after construction.
-     * <p/>
-     */
-    public void init() {
-        checkStorageDir();
-    }
-
-    @Override
-    public byte[] fetchState(SafeKeeperId key) {
-        File file = safeKeeperID2File(key, storageRootPath);
-        if (file != null && file.exists()) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Fetching " + file.getAbsolutePath() + "for : " + key);
-            }
-            // TODO use commons-io or guava
-            FileInputStream in = null;
-            try {
-                in = new FileInputStream(file);
-
-                long length = file.length();
-
-                /*
-                 * Arrays can only be created using int types, so ensure that
-                 * the file size is not too big before we downcast to create the
-                 * array.
-                 */
-                if (length > Integer.MAX_VALUE) {
-                    throw new IOException("Error file is too large: " + file.getName() + " " + length + " bytes");
-                }
-
-                byte[] buffer = new byte[(int) length];
-                int offSet = 0;
-                int numRead = 0;
-
-                while (offSet < buffer.length && (numRead = in.read(buffer, offSet, buffer.length - offSet)) >= 0) {
-                    offSet += numRead;
-                }
-
-                if (offSet < buffer.length) {
-                    throw new IOException("Error, could not read entire file: " + file.getName() + " " + offSet + "/"
-                            + buffer.length + " bytes read");
-                }
-
-                in.close();
-                return buffer;
-            } catch (FileNotFoundException e1) {
-                logger.error(e1.getMessage(), e1);
-            } catch (IOException e2) {
-                logger.error(e2.getMessage(), e2);
-            } finally {
-                if (in != null) {
-                    try {
-                        in.close();
-                    } catch (Exception e) {
-                        logger.warn(e.getMessage(), e);
-                    }
-                }
-            }
-        }
-        return null;
-
-    }
-
-    @Override
-    public Set<SafeKeeperId> fetchStoredKeys() {
-        Set<SafeKeeperId> keys = new HashSet<SafeKeeperId>();
-        File rootDir = new File(storageRootPath);
-        File[] dirs = rootDir.listFiles(new FileFilter() {
-            @Override
-            public boolean accept(File file) {
-                return file.isDirectory();
-            }
-        });
-        for (File dir : dirs) {
-            File[] files = dir.listFiles(new FileFilter() {
-                @Override
-                public boolean accept(File file) {
-                    return (file.isFile());
-                }
-            });
-            for (File file : files) {
-                keys.add(file2SafeKeeperID(file));
-            }
-        }
-        return keys;
-    }
-
-    // files kept as : root/<prototypeId>/encodedKeyWithFullInfo
-    private static File safeKeeperID2File(SafeKeeperId key, String storageRootPath) {
-
-        return new File(storageRootPath + File.separator + key.getPrototypeId() + File.separator
-                + Base64.encodeBase64URLSafeString(key.getStringRepresentation().getBytes()));
-    }
-
-    private static SafeKeeperId file2SafeKeeperID(File file) {
-        SafeKeeperId id = null;
-        id = new SafeKeeperId(new String(Base64.decodeBase64(file.getName())));
-        return id;
-    }
-
-    public String getStorageRootPath() {
-        return storageRootPath;
-    }
-
-    public void setStorageRootPath(String storageRootPath) {
-        this.storageRootPath = storageRootPath;
-        File rootPathFile = new File(storageRootPath);
-        if (!rootPathFile.exists()) {
-            if (!rootPathFile.mkdirs()) {
-                logger.error("could not create root storage directory : " + storageRootPath);
-            }
-
-        }
-    }
-
-    public void checkStorageDir() {
-        if (storageRootPath == null) {
-
-            File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
-                    + "storage");
-            storageRootPath = defaultStorageDir.getAbsolutePath();
-            if (logger.isInfoEnabled()) {
-                logger.info("Unspecified storage dir; using default dir: " + defaultStorageDir.getAbsolutePath());
-            }
-            if (!defaultStorageDir.exists()) {
-                if (!(defaultStorageDir.mkdirs())) {
-                    logger.error("Storage directory not specified, and cannot create default storage directory : "
-                            + defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
-                }
-            }
-        }
-    }
-
-    @Override
-    public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback) {
-        File f = safeKeeperID2File(key, storageRootPath);
-        if (logger.isDebugEnabled()) {
-            logger.debug("Checkpointing [" + key + "] into file: [" + f.getAbsolutePath() + "]");
-        }
-        if (!f.exists()) {
-            if (!f.getParentFile().exists()) {
-                // parent file has prototype id
-                if (!f.getParentFile().mkdir()) {
-                    callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
-                            "Cannot create directory for storing PE [" + key.toString() + "] for prototype: "
-                                    + f.getParentFile().getAbsolutePath());
-                    return ;
-                }
-            }
-            // TODO handle IO exception
-            try {
-                f.createNewFile();
-            } catch (IOException e) {
-                callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
-                        key.toString() + " : " + e.getMessage());
-                return ;
-            }
-        } else {
-            if (!f.delete()) {
-                callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
-                        "Cannot delete previously saved checkpoint file [" + f.getParentFile().getAbsolutePath() + "]");
-                return ;
-            }
-        }
-        FileOutputStream fos = null;
-        try {
-            fos = new FileOutputStream(f);
-            fos.write(state);
-            callback.storageOperationResult(SafeKeeper.StorageResultCode.SUCCESS, key.toString());
-        } catch (FileNotFoundException e) {
-            callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
-                    key.toString() + " : " + e.getMessage());
-        } catch (IOException e) {
-            callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
-                    key.toString() + " : " + e.getMessage());
-        } finally {
-            try {
-                if (fos != null) {
-                    fos.close();
-                }
-            } catch (IOException e) {
-                logger.error(e.getMessage(), e);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/ft/InitiateCheckpointingEvent.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/ft/InitiateCheckpointingEvent.java b/s4-core/src/main/java/io/s4/ft/InitiateCheckpointingEvent.java
deleted file mode 100644
index 13270d9..0000000
--- a/s4-core/src/main/java/io/s4/ft/InitiateCheckpointingEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.ft;
-
-/**
- * 
- * Event that triggers a checkpoint.
- *
- */
-public class InitiateCheckpointingEvent extends CheckpointingEvent {
-
-    public InitiateCheckpointingEvent() {
-        // as required by default kryo serializer
-    }
-
-    public InitiateCheckpointingEvent(SafeKeeperId safeKeeperId) {
-        super(safeKeeperId);
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/ft/LoggingStorageCallbackFactory.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/ft/LoggingStorageCallbackFactory.java b/s4-core/src/main/java/io/s4/ft/LoggingStorageCallbackFactory.java
deleted file mode 100644
index b7d5128..0000000
--- a/s4-core/src/main/java/io/s4/ft/LoggingStorageCallbackFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.ft;
-
-import io.s4.ft.SafeKeeper.StorageResultCode;
-
-import org.apache.log4j.Logger;
-
-/**
- * A factory for creating storage callbacks that simply log callback results
- * 
- * 
- */
-public class LoggingStorageCallbackFactory implements StorageCallbackFactory {
-
-    @Override
-    public StorageCallback createStorageCallback() {
-        return new StorageCallbackLogger();
-    }
-
-    /**
-     * A basic storage callback that simply logs results from storage operations
-     * 
-     */
-    static class StorageCallbackLogger implements StorageCallback {
-
-        private static Logger logger = Logger.getLogger("s4-ft");
-
-        @Override
-        public void storageOperationResult(StorageResultCode code, Object message) {
-            if (StorageResultCode.SUCCESS == code) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Callback from storage: " + StorageResultCode.SUCCESS.name() + " : " + message);
-                }
-            } else {
-                logger.warn("Callback from storage: " + StorageResultCode.FAILURE.name() + " : " + message);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/ft/RecoveryEvent.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/ft/RecoveryEvent.java b/s4-core/src/main/java/io/s4/ft/RecoveryEvent.java
deleted file mode 100644
index 4481ca3..0000000
--- a/s4-core/src/main/java/io/s4/ft/RecoveryEvent.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.ft;
-
-/**
- * 
- * Event that triggers the recovery of a checkpoint. 
- *
- */
-public class RecoveryEvent extends CheckpointingEvent {
-
-    public RecoveryEvent() {
-        // as required by default kryo serializer
-    }
-
-    public RecoveryEvent(SafeKeeperId safeKeeperId) {
-        super(safeKeeperId);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/ft/RedisStateStorage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/ft/RedisStateStorage.java b/s4-core/src/main/java/io/s4/ft/RedisStateStorage.java
deleted file mode 100644
index bbdf272..0000000
--- a/s4-core/src/main/java/io/s4/ft/RedisStateStorage.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.ft;
-
-import io.s4.ft.SafeKeeper.StorageResultCode;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.JedisPoolConfig;
-
-/**
- * <p>
- * This class implements a storage backend based on Redis. Redis is a key-value
- * store.
- * </p>
- * <p>
- * See {@link http://redis.io/} for more information.
- * </p>
- * <p>
- * Redis must be running as an external service. References to this external
- * services must be injected during the initialization of the S4 platform.
- * </p>
- * 
- * 
- */
-public class RedisStateStorage implements StateStorage {
-
-    static Logger logger = Logger.getLogger("s4-ft");
-    private JedisPool jedisPool;
-    private String redisHost;
-    private int redisPort;
-
-    public void clear() {
-        Jedis jedis = jedisPool.getResource();
-        try {
-            jedis.flushAll();
-        } finally {
-            jedisPool.returnResource(jedis);
-        }
-    }
-
-    public void init() {
-        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
-        // TODO optional parameterization
-        jedisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort);
-    }
-
-    @Override
-    public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback) {
-        Jedis jedis = jedisPool.getResource();
-        String statusCode = "UNKNOWN";
-        try {
-            statusCode = jedis.set(key.getStringRepresentation().getBytes(), state);
-        } finally {
-            jedisPool.returnResource(jedis);
-        }
-        if ("OK".equals(statusCode)) {
-            callback.storageOperationResult(StorageResultCode.SUCCESS, "Redis result code is [" + statusCode + "] for key [" + key.toString() +"]");
-        } else {
-            callback.storageOperationResult(StorageResultCode.FAILURE, "Unexpected redis result code : [" + statusCode + "] for key [" + key.toString() +"]");
-        }
-    }
-
-    @Override
-    public byte[] fetchState(SafeKeeperId key) {
-        Jedis jedis = jedisPool.getResource();
-        try {
-            return jedis.get(key.getStringRepresentation().getBytes());
-        } finally {
-            jedisPool.returnResource(jedis);
-        }
-    }
-
-    @Override
-    public Set<SafeKeeperId> fetchStoredKeys() {
-        Jedis jedis = jedisPool.getResource();
-        try {
-            Set<String> keys = jedis.keys("*");
-            Set<SafeKeeperId> result = new HashSet<SafeKeeperId>(keys.size());
-            for (String s : keys)
-                result.add(new SafeKeeperId(s));
-            return result;
-        } finally {
-            jedisPool.returnResource(jedis);
-        }
-
-    }
-    
-    public String getRedisHost() {
-        return redisHost;
-    }
-
-    public void setRedisHost(String redisHost) {
-        this.redisHost = redisHost;
-    }
-
-    public int getRedisPort() {
-        return redisPort;
-    }
-
-    public void setRedisPort(int redisPort) {
-        this.redisPort = redisPort;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/ft/SafeKeeper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/ft/SafeKeeper.java b/s4-core/src/main/java/io/s4/ft/SafeKeeper.java
deleted file mode 100644
index fb8c605..0000000
--- a/s4-core/src/main/java/io/s4/ft/SafeKeeper.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.ft;
-
-import io.s4.dispatcher.Dispatcher;
-import io.s4.dispatcher.partitioner.Hasher;
-import io.s4.emitter.CommLayerEmitter;
-import io.s4.processor.AbstractPE;
-import io.s4.serialize.SerializerDeserializer;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-
-/**
- * 
- * <p>
- * This class is responsible for coordinating interactions between the S4 event
- * processor and the checkpoint storage backend. In particular, it schedules
- * asynchronous save tasks to be executed on the backend.
- * </p>
- * 
- * 
- * 
- */
-public class SafeKeeper {
-
-    public enum StorageResultCode {
-        SUCCESS, FAILURE
-    }
-
-    private static Logger logger = Logger.getLogger("s4-ft");
-    private StateStorage stateStorage;
-    private Dispatcher loopbackDispatcher;
-    private SerializerDeserializer serializer;
-    private Hasher hasher;
-    // monitor field injection through a latch
-    private CountDownLatch signalReady = new CountDownLatch(2);
-    private CountDownLatch signalNodesAvailable = new CountDownLatch(1);
-    private StorageCallbackFactory storageCallbackFactory = new LoggingStorageCallbackFactory();
-
-    ThreadPoolExecutor threadPool;
-
-    int maxWriteThreads = 1;
-    int writeThreadKeepAliveSeconds = 120;
-    int maxOutstandingWriteRequests = 1000;
-
-    public SafeKeeper() {
-    }
-
-    /**
-     * <p>
-     * This init() method <b>must</b> be called by the dependency injection
-     * framework. It waits until all required dependencies are injected in
-     * SafeKeeper, and until the node count is accessible from the communication
-     * layer.
-     * </p>
-     */
-    public void init() {
-        try {
-            getReadySignal().await();
-        } catch (InterruptedException e1) {
-            e1.printStackTrace();
-        }
-        threadPool = new ThreadPoolExecutor(1, maxWriteThreads, writeThreadKeepAliveSeconds, TimeUnit.SECONDS,
-                new ArrayBlockingQueue<Runnable>(maxOutstandingWriteRequests));
-        logger.debug("Started thread pool with maxWriteThreads=[" + maxWriteThreads
-                + "], writeThreadKeepAliveSeconds=[" + writeThreadKeepAliveSeconds + "], maxOutsandingWriteRequests=["
-                + maxOutstandingWriteRequests + "]");
-
-        int nodeCount = getLoopbackDispatcher().getEventEmitter().getNodeCount();
-        // required wait until nodes are available
-        while (nodeCount == 0) {
-            try {
-                Thread.sleep(500);
-            } catch (InterruptedException ignored) {
-            }
-            nodeCount = getLoopbackDispatcher().getEventEmitter().getNodeCount();
-        }
-
-        signalNodesAvailable.countDown();
-    }
-
-    /**
-     * Forwards a call to checkpoint a PE to the backend storage.
-     * 
-     * @param key
-     *            safeKeeperId
-     * @param state
-     *            checkpoint data
-     */
-    public void saveState(SafeKeeperId safeKeeperId, byte[] serializedState) {
-        StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
-        try {
-            threadPool.submit(createSaveStateTask(safeKeeperId, serializedState));
-        } catch (RejectedExecutionException e) {
-            storageCallback.storageOperationResult(StorageResultCode.FAILURE,
-                    "Could not submit task to persist checkpoint. Remaining capacity for task queue is ["
-                            + threadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
-                            + threadPool.getQueue().size() + "] ; maximum capacity is [" + maxOutstandingWriteRequests
-                            + "]");
-        }
-    }
-
-    private SaveStateTask createSaveStateTask(SafeKeeperId safeKeeperId, byte[] serializedState) {
-        return new SaveStateTask(safeKeeperId, serializedState, storageCallbackFactory.createStorageCallback(),
-                stateStorage);
-    }
-
-    /**
-     * Fetches checkpoint data from storage for a given PE
-     * 
-     * @param key
-     *            safeKeeperId
-     * @return checkpoint data
-     */
-    public byte[] fetchSerializedState(SafeKeeperId key) {
-
-        try {
-            signalNodesAvailable.await();
-        } catch (InterruptedException ignored) {
-        }
-        byte[] result = null;
-        result = stateStorage.fetchState(key);
-        return result;
-    }
-
-    /**
-     * Generates a checkpoint event for a given PE, and enqueues it in the local
-     * event queue.
-     * 
-     * @param pe
-     *            reference to a PE
-     */
-    public void generateCheckpoint(AbstractPE pe) {
-        InitiateCheckpointingEvent initiateCheckpointingEvent = new InitiateCheckpointingEvent(pe.getSafeKeeperId());
-
-        List<List<String>> compoundKeyNames;
-        if (pe.getKeyValueString() == null) {
-            logger.warn("Only keyed PEs can be checkpointed. Current PE [" + pe.getSafeKeeperId()
-                    + "] will not be checkpointed.");
-        } else {
-            List<String> list = new ArrayList<String>(1);
-            list.add("");
-            compoundKeyNames = new ArrayList<List<String>>(1);
-            compoundKeyNames.add(list);
-            loopbackDispatcher.dispatchEvent(pe.getId() + "_checkpointing", compoundKeyNames,
-                    initiateCheckpointingEvent);
-        }
-    }
-
-    /**
-     * Generates a recovery event, and enqueues it in the local event queue.<br/>
-     * This can be used for an eager recovery mechanism.
-     * 
-     * @param safeKeeperId
-     *            safeKeeperId to recover
-     */
-    public void initiateRecovery(SafeKeeperId safeKeeperId) {
-        RecoveryEvent recoveryEvent = new RecoveryEvent(safeKeeperId);
-        loopbackDispatcher.dispatchEvent(safeKeeperId.getPrototypeId() + "_recovery", recoveryEvent);
-    }
-
-    public void setSerializer(SerializerDeserializer serializer) {
-        this.serializer = serializer;
-    }
-
-    public SerializerDeserializer getSerializer() {
-        return serializer;
-    }
-
-    public int getPartitionId() {
-        return ((CommLayerEmitter) loopbackDispatcher.getEventEmitter()).getListener().getId();
-    }
-
-    public void setHasher(Hasher hasher) {
-        this.hasher = hasher;
-        signalReady.countDown();
-    }
-
-    public Hasher getHasher() {
-        return hasher;
-    }
-
-    public void setStateStorage(StateStorage stateStorage) {
-        this.stateStorage = stateStorage;
-    }
-
-    public StateStorage getStateStorage() {
-        return stateStorage;
-    }
-
-    public void setLoopbackDispatcher(Dispatcher dispatcher) {
-        this.loopbackDispatcher = dispatcher;
-        signalReady.countDown();
-    }
-
-    public Dispatcher getLoopbackDispatcher() {
-        return this.loopbackDispatcher;
-    }
-
-    public CountDownLatch getReadySignal() {
-        return signalReady;
-    }
-
-    public StorageCallbackFactory getStorageCallbackFactory() {
-        return storageCallbackFactory;
-    }
-
-    public void setStorageCallbackFactory(StorageCallbackFactory storageCallbackFactory) {
-        this.storageCallbackFactory = storageCallbackFactory;
-    }
-
-    public int getMaxWriteThreads() {
-        return maxWriteThreads;
-    }
-
-    public void setMaxWriteThreads(int maxWriteThreads) {
-        this.maxWriteThreads = maxWriteThreads;
-    }
-
-    public int getWriteThreadKeepAliveSeconds() {
-        return writeThreadKeepAliveSeconds;
-    }
-
-    public void setWriteThreadKeepAliveSeconds(int writeThreadKeepAliveSeconds) {
-        this.writeThreadKeepAliveSeconds = writeThreadKeepAliveSeconds;
-    }
-
-    public int getMaxOutstandingWriteRequests() {
-        return maxOutstandingWriteRequests;
-    }
-
-    public void setMaxOutstandingWriteRequests(int maxOutstandingWriteRequests) {
-        this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/ft/SafeKeeperId.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/ft/SafeKeeperId.java b/s4-core/src/main/java/io/s4/ft/SafeKeeperId.java
deleted file mode 100644
index df1f2d7..0000000
--- a/s4-core/src/main/java/io/s4/ft/SafeKeeperId.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.ft;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * <p>
- * Identifier of PEs. It is used to identify checkpointed PEs in the storage
- * backend.
- * </p>
- * <p>
- * The storage backend is responsible for converting this identifier to whatever
- * internal representation is most suitable for it.
- * </p>
- * <p>
- * This class provides methods for getting a compact String representation of
- * the identifier and for creating an identifier from a String representation.
- * </p>
- * 
- */
-public class SafeKeeperId {
-
-    private String prototypeId;
-    private String keyed;
-
-    private static final Pattern STRING_REPRESENTATION_PATTERN = Pattern
-            .compile("\\[(\\S*)\\];\\[(\\S*)\\]");
-
-    public SafeKeeperId() {
-    }
-
-    /**
-     * 
-     * @param prototypeID
-     *            id of the PE as returned by {@link ProcessingElement#getId()
-     *            getId()} method
-     * @param keyed
-     *            keyed attribute(s)
-     */
-    public SafeKeeperId(String prototypeID, String keyed) {
-        super();
-        this.prototypeId = prototypeID;
-        this.keyed = keyed;
-    }
-
-    public SafeKeeperId(String keyAsString) {
-        Matcher matcher = STRING_REPRESENTATION_PATTERN.matcher(keyAsString);
-
-        try {
-            matcher.find();
-            prototypeId = "".equals(matcher.group(1)) ? null : matcher.group(1);
-            keyed = "".equals(matcher.group(2)) ? null : matcher.group(2);
-        } catch (IndexOutOfBoundsException e) {
-
-        }
-
-    }
-
-    public String getKey() {
-        return keyed;
-    }
-
-    public String getPrototypeId() {
-        return prototypeId;
-    }
-
-    public String toString() {
-        return "[PROTO_ID];[KEYED] --> " + getStringRepresentation();
-    }
-
-    public String getStringRepresentation() {
-        return "[" + (prototypeId == null ? "" : prototypeId) + "];["
-                + (keyed == null ? "" : keyed) + "]";
-    }
-
-    @Override
-    public int hashCode() {
-        return getStringRepresentation().hashCode();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if ((obj == null) || (getClass() != obj.getClass())) {
-            return false;
-        }
-
-        SafeKeeperId other = (SafeKeeperId) obj;
-        if (keyed == null) {
-            if (other.keyed != null)
-                return false;
-        } else if (!keyed.equals(other.keyed))
-            return false;
-        if (prototypeId == null) {
-            if (other.prototypeId != null)
-                return false;
-        } else if (!prototypeId.equals(other.prototypeId))
-            return false;
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/ft/SaveStateTask.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/ft/SaveStateTask.java b/s4-core/src/main/java/io/s4/ft/SaveStateTask.java
deleted file mode 100644
index 23c6c38..0000000
--- a/s4-core/src/main/java/io/s4/ft/SaveStateTask.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.ft;
-
-
-/**
- * 
- * Encapsulates a checkpoint request. It is scheduled by the checkpointing framework.
- *
- */
-public class SaveStateTask implements Runnable {
-    
-    SafeKeeperId safeKeeperId;
-    byte[] state;
-    StorageCallback storageCallback;
-    StateStorage stateStorage;
-    
-    public SaveStateTask(SafeKeeperId safeKeeperId, byte[] state, StorageCallback storageCallback, StateStorage stateStorage) {
-        super();
-        this.safeKeeperId = safeKeeperId;
-        this.state = state;
-        this.storageCallback = storageCallback;
-        this.stateStorage = stateStorage;
-    }
-    
-    @Override
-    public void run() {
-        stateStorage.saveState(safeKeeperId, state, storageCallback);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/ft/StateStorage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/ft/StateStorage.java b/s4-core/src/main/java/io/s4/ft/StateStorage.java
deleted file mode 100644
index 52a0a46..0000000
--- a/s4-core/src/main/java/io/s4/ft/StateStorage.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.ft;
-
-import java.util.Set;
-
-/**
- * <p>
- * Defines the methods that must be implemented by backend storage for
- * checkpoints.
- * </p>
- * 
- */
-public interface StateStorage {
-
-    /**
-     * Stores a checkpoint.
-     * 
-     * <p>
-     * NOTE: we don't handle any failure/success return value, because all
-     * failure/success notifications go through the StorageCallback reference
-     * </p>
-     * @param key
-     *            safeKeeperId
-     * @param state
-     *            checkpoint data as a byte array
-     * @param callback
-     *            callback for receiving notifications of storage operations.
-     *            This callback is configurable
-     */
-    public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback);
-
-    /**
-     * Fetches data for a stored checkpoint.
-     * <p>
-     * Must return null if storage does not contain this key.
-     * </p>
-     * 
-     * @param key
-     *            safeKeeperId for this checkpoint
-     * 
-     * @return stored checkpoint data, or null if the storage does not contain
-     *         data for the given key
-     */
-    public byte[] fetchState(SafeKeeperId key);
-
-    /**
-     * Fetches all stored safeKeeper Ids.
-     * 
-     * @return all stored safeKeeper Ids.
-     */
-    public Set<SafeKeeperId> fetchStoredKeys();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/ft/StorageCallback.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/ft/StorageCallback.java b/s4-core/src/main/java/io/s4/ft/StorageCallback.java
deleted file mode 100644
index 67b7235..0000000
--- a/s4-core/src/main/java/io/s4/ft/StorageCallback.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.ft;
-
-/**
- * 
- * Callback for reporting the result of an asynchronous storage operation
- *
- */
-public interface StorageCallback {
-	
-    /**
-     * Notifies the result of a storage operation
-     * 
-     * @param resultCode code for the result : {@link SafeKeeper.StorageResultCode SafeKeeper.StorageResultCode}
-     * @param message whatever message object is suitable
-     */
-	public void storageOperationResult(SafeKeeper.StorageResultCode resultCode,
-			Object message);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/ft/StorageCallbackFactory.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/ft/StorageCallbackFactory.java b/s4-core/src/main/java/io/s4/ft/StorageCallbackFactory.java
deleted file mode 100644
index 1e760d0..0000000
--- a/s4-core/src/main/java/io/s4/ft/StorageCallbackFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.ft;
-
-/**
- * A factory for creating storage callbacks. Storage callback implementations
- * that can take specific actions upon success or failure of asynchronous
- * storage operations.
- * 
- */
-public interface StorageCallbackFactory {
-
-    /**
-     * Factory method
-     * 
-     * @return returns a StorageCallback instance
-     */
-    public StorageCallback createStorageCallback();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/ft/package.html
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/ft/package.html b/s4-core/src/main/java/io/s4/ft/package.html
deleted file mode 100644
index 37fe51d..0000000
--- a/s4-core/src/main/java/io/s4/ft/package.html
+++ /dev/null
@@ -1,23 +0,0 @@
-<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
-<html>
-<head>
-</head>
-<body bgcolor="white">
-	<p>This package contains classes for providing some fault tolerance
-		to S4 PEs.</p>
-	<p>The current approach is based on checkpointing.</p>
-	<p>Checkpoints are taken periodically (configurable by time or
-		frequency of application events), and when restarting an S4 node,
-		saved checkpoints are automatically and lazily restored.</p>
-	<p>Lazy restoration is triggered by an application event to a PE
-		that has not yet been restored.</p>
-	<p>Checkpoints are stored in storage backends. Storage backends may
-		implement eager techniques to prefetch checkpoint data to be
-		recovered.
-	<p>
-		The application programmer must take care of marking as <b>transient</b>
-		the fields that do not have to be persisted (or cannot be persisted).
-	<p>Storage backends are pluggable and we provide some default
-		implementations in this package</p>
-</body>
-</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/listener/CommLayerListener.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/listener/CommLayerListener.java b/s4-core/src/main/java/io/s4/listener/CommLayerListener.java
deleted file mode 100644
index dc9389c..0000000
--- a/s4-core/src/main/java/io/s4/listener/CommLayerListener.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.listener;
-
-import static io.s4.util.MetricsName.S4_CORE_METRICS;
-import static io.s4.util.MetricsName.low_level_listener_badmsg_ct;
-import static io.s4.util.MetricsName.low_level_listener_msg_drop_ct;
-import static io.s4.util.MetricsName.low_level_listener_msg_in_ct;
-import static io.s4.util.MetricsName.low_level_listener_qsz;
-import static io.s4.util.MetricsName.s4_core_exit_ct;
-import io.s4.collector.EventWrapper;
-import io.s4.comm.core.CommEventCallback;
-import io.s4.comm.core.CommLayerState;
-import io.s4.comm.core.Deserializer;
-import io.s4.comm.core.ListenerProcess;
-import io.s4.logger.Monitor;
-import io.s4.serialize.SerializerDeserializer;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.log4j.Logger;
-
-public class CommLayerListener implements EventListener, Runnable {
-    private static Logger logger = Logger.getLogger(CommLayerListener.class);
-    private int dequeuerCount = 12;
-    private Set<EventHandler> handlers = new HashSet<EventHandler>();
-    ListenerProcess process;
-    private BlockingQueue<Object> messageQueue;
-    private int maxQueueSize = 1000;
-    private String clusterManagerAddress;
-    private String appName;
-    private Object listenerConfig;
-    private Monitor monitor;
-    private int partitionId = -1;
-    private int zkConnected = 1;
-    private SerializerDeserializer serDeser;
-
-    public void setSerDeser(SerializerDeserializer serDeser) {
-        this.serDeser = serDeser;
-    }
-
-    public void setMonitor(Monitor monitor) {
-        this.monitor = monitor;
-        monitor.setDefaultValue("tid", partitionId);
-    }
-
-    public void setMaxQueueSize(int maxQueueSize) {
-        this.maxQueueSize = maxQueueSize;
-    }
-
-    @Override
-    public int getId() {
-        return partitionId;
-    }
-
-    @Override
-    public String getAppName() {
-        return appName;
-    }
-
-    public void setAppName(String appName) {
-        this.appName = appName;
-    }
-
-    public String getClusterManagerAddress() {
-        return clusterManagerAddress;
-    }
-
-    public void setClusterManagerAddress(String clusterManagerAddress) {
-        this.clusterManagerAddress = clusterManagerAddress;
-    }
-
-    @Override
-    public void addHandler(EventHandler handler) {
-        handlers.add(handler);
-    }
-
-    @Override
-    public boolean removeHandler(EventHandler handler) {
-        return handlers.remove(handler);
-    }
-
-    public Object getListenerConfig() {
-        return this.listenerConfig;
-    }
-
-    public void init() {
-        System.err.println("appName=" + appName);
-        process = new ListenerProcess(clusterManagerAddress, appName);
-        process.setDeserializer(new PassThroughDeserializer());
-        CommEventCallback callbackHandler = new CommEventCallback() {
-            @Override
-            public void handleCallback(Map<String, Object> event) {
-                if (event != null) {
-                    CommLayerState state = (CommLayerState) event.get("state");
-                    if (state != null) {
-                        if (state == CommLayerState.INITIALIZED) {
-                            logger.info("Communication layer initialized: source:"
-                                    + event.get("source"));
-                        } else if (state == CommLayerState.BROKEN) {
-                            logger.error("Communication layer broken: source:"
-                                    + event.get("source"));
-                            logger.error("System exiting so that process can restart.");
-                            if (monitor != null) {
-                                monitor.set(s4_core_exit_ct.toString(),
-                                            1,
-                                            S4_CORE_METRICS.toString());
-                            }
-                            // should flush stats before exiting
-                            monitor.flushStats();
-                            try {
-                                Thread.sleep(1000);
-                            } catch (InterruptedException e) {
-                            }
-                            System.exit(3);
-                        }
-                    }
-                }
-            }
-        };
-        process.setCallbackHandler(callbackHandler);
-
-        messageQueue = new LinkedBlockingQueue<Object>(maxQueueSize);
-
-        // listenerConfig = process.acquireTaskAndCreateListener(map);
-        Thread t = new Thread(this);
-        t.setPriority(Thread.MAX_PRIORITY);
-        t.start();
-
-        if (System.getProperty("DequeuerCount") != null) {
-            dequeuerCount = Integer.parseInt(System.getProperty("DequeuerCount"));
-        }
-
-        System.out.println("dequeuer number: " + dequeuerCount);
-
-        for (int i = 0; i < dequeuerCount; i++) {
-            t = new Thread(new Dequeuer(this, i));
-            // t.setPriority(Thread.MIN_PRIORITY);
-            t.start();
-        }
-    }
-
-    // This is the actual raw listener, which simply listens for messages on the
-    // socket
-    public void run() {
-        boolean isAddMessageSucceeded = false;
-        // acquire a task to do
-        synchronized (this) {
-            Map<String, String> map = new HashMap<String, String>();
-            try {
-                map.put("ListenerId", InetAddress.getLocalHost().getHostName()
-                        + "_" + System.getProperty("pid") + "_"
-                        + Thread.currentThread().getId());
-                map.put("address", InetAddress.getLocalHost().getHostAddress());
-            } catch (UnknownHostException e) {
-                e.printStackTrace();
-                throw new RuntimeException(e);
-            }
-            logger.info("Waiting to acquire task");
-            listenerConfig = process.acquireTaskAndCreateListener(map);
-            logger.info("acquired task with config:" + listenerConfig);
-            Map<String, String> configMap = (Map<String, String>) listenerConfig;
-            String partition = configMap.get("partition");
-            if (partition != null) {
-                partitionId = Integer.parseInt(partition);
-                monitor.setDefaultValue("tid", partitionId);
-                logger.info("tid is set to " + partitionId);
-            }
-            this.notify();
-        }
-        while (!Thread.interrupted()) {
-            byte[] message = (byte[]) process.listen();
-
-            try {
-                isAddMessageSucceeded = messageQueue.offer(message);
-                if (monitor != null) {
-                    monitor.set(low_level_listener_qsz.toString(),
-                                messageQueue.size(),
-                                S4_CORE_METRICS.toString());
-                    if (isAddMessageSucceeded) {
-                        monitor.increment(low_level_listener_msg_in_ct.toString(),
-                                          1,
-                                          S4_CORE_METRICS.toString());
-                    } else {
-                        monitor.increment(low_level_listener_msg_drop_ct.toString(),
-                                          1,
-                                          S4_CORE_METRICS.toString());
-                    }
-                }
-            } catch (Exception e) {
-                Logger.getLogger("s4")
-                      .error("Exception in monitor metrics on thread "
-                                     + Thread.currentThread().getId(),
-                             e);
-            }
-        }
-    }
-
-    public Object takeMessage() throws InterruptedException {
-        return messageQueue.take();
-    }
-
-    class Dequeuer implements Runnable {
-        private int id;
-        private CommLayerListener rawListener;
-
-        public Dequeuer(CommLayerListener rawListener, int id) {
-            this.id = id;
-            this.rawListener = rawListener;
-        }
-
-        public void run() {
-            while (!Thread.interrupted()) {
-                try {
-                    byte[] rawMessage = (byte[]) rawListener.takeMessage();
-                    processMessage(rawMessage);
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
-
-        public void processMessage(byte[] rawMessage) {
-            // convert the byte array into an event object
-            EventWrapper eventWrapper = null;
-            try {
-                eventWrapper = (EventWrapper) serDeser.deserialize(rawMessage);
-
-            } catch (RuntimeException rte) {
-                Logger.getLogger("s4")
-                      .error("Error converting message to an event: ", rte);
-                if (monitor != null) {
-                    monitor.increment(low_level_listener_badmsg_ct.toString(),
-                                      1,
-                                      S4_CORE_METRICS.toString());
-                }
-                return;
-            }
-
-            if (eventWrapper != null) {
-                for (EventHandler handler : handlers) {
-                    try {
-                        handler.processEvent(eventWrapper);
-                    } catch (Exception e) {
-                        Logger.getLogger("s4")
-                              .error("Error calling processEvent on handler", e);
-                    }
-                }
-            }
-        }
-
-    }
-
-    public class PassThroughDeserializer implements Deserializer {
-        public Object deserialize(byte[] input) {
-            return input;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/listener/EventHandler.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/listener/EventHandler.java b/s4-core/src/main/java/io/s4/listener/EventHandler.java
deleted file mode 100644
index 2fcba3d..0000000
--- a/s4-core/src/main/java/io/s4/listener/EventHandler.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.listener;
-
-import io.s4.collector.EventWrapper;
-
-public interface EventHandler {
-    void processEvent(EventWrapper eventWrapper);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/listener/EventListener.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/listener/EventListener.java b/s4-core/src/main/java/io/s4/listener/EventListener.java
deleted file mode 100644
index ab7acc2..0000000
--- a/s4-core/src/main/java/io/s4/listener/EventListener.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.listener;
-
-public interface EventListener extends EventProducer {
-
-    int getId();
-
-    String getAppName();
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/listener/EventProducer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/listener/EventProducer.java b/s4-core/src/main/java/io/s4/listener/EventProducer.java
deleted file mode 100644
index e38acf6..0000000
--- a/s4-core/src/main/java/io/s4/listener/EventProducer.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.listener;
-
-public interface EventProducer {
-
-    void addHandler(EventHandler handler);
-
-    boolean removeHandler(EventHandler handler);
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/logger/Log4jMonitor.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/logger/Log4jMonitor.java b/s4-core/src/main/java/io/s4/logger/Log4jMonitor.java
deleted file mode 100644
index b464624..0000000
--- a/s4-core/src/main/java/io/s4/logger/Log4jMonitor.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.logger;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class Log4jMonitor extends TimerTask implements Monitor {
-    Map<String, Integer> metricMap = new ConcurrentHashMap<String, Integer>();
-    private String loggerName = "s4";
-    private int flushInterval = 600; // default is every 10 minutes
-
-    private Timer timer = new Timer();
-    private Map<String, Integer> defaultMap = new HashMap<String, Integer>();
-
-    public void setLoggerName(String loggerName) {
-        this.loggerName = loggerName;
-    }
-
-    public void setFlushInterval(int flushInterval) {
-        this.flushInterval = flushInterval;
-    }
-
-    public void init() {
-        if (flushInterval > 0) {
-            timer.scheduleAtFixedRate(this,
-                                      flushInterval * 1000,
-                                      flushInterval * 1000);
-        }
-    }
-
-    // TODO: this will be removed after changing above functions
-    public void set(String metricName, int value) {
-        metricMap.put(metricName, value);
-    }
-
-    public void flushStats() {
-        org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(loggerName);
-        for (String key : metricMap.keySet()) {
-            String message = key + " = " + metricMap.get(key);
-            logger.info(message);
-            metricMap.remove(key);
-        }
-        if (defaultMap != null) {
-            for (String key : defaultMap.keySet()) {
-                // TODO: need to be changed
-                set(key, defaultMap.get(key));
-            }
-        }
-    }
-
-    public void run() {
-        flushStats();
-    }
-
-    @Override
-    public void increment(String metricName, int increment) {
-        Integer currValue = metricMap.get(metricName);
-        if (currValue == null) {
-            currValue = 0;
-        }
-        currValue += increment;
-        metricMap.put(metricName, currValue);
-    }
-
-    @Override
-    public void setDefaultValue(String key, int val) {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void increment(String metricName, int increment, String metricEventName, String... furtherDistinctions) {
-        increment(buildMetricName(metricName,
-                                  metricEventName,
-                                  furtherDistinctions),
-                  increment);
-
-    }
-
-    @Override
-    public void set(String metricName, int value, String metricEventName, String... furtherDistinctions) {
-        metricMap.put(buildMetricName(metricName,
-                                      metricEventName,
-                                      furtherDistinctions),
-                      value);
-    }
-
-    private String buildMetricName(String metricName, String metricEventName, String[] furtherDistinctions) {
-        StringBuffer sb = new StringBuffer(metricEventName);
-        sb.append(":");
-        sb.append(metricName);
-        if (furtherDistinctions != null) {
-            for (String furtherDistinction : furtherDistinctions) {
-                sb.append(":");
-                sb.append(furtherDistinction);
-            }
-        }
-        return sb.toString().intern();
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/logger/Monitor.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/logger/Monitor.java b/s4-core/src/main/java/io/s4/logger/Monitor.java
deleted file mode 100644
index 870493e..0000000
--- a/s4-core/src/main/java/io/s4/logger/Monitor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.logger;
-
-public interface Monitor {
-    public void increment(String metricName, int increment, String metricEventName, String... aggKeys);
-
-    public void increment(String metricName, int increment);
-
-    public void set(String metricName, int value, String metricEventName, String... aggKeys);
-
-    public void set(String metricName, int value);
-
-    public void flushStats();
-
-    public void setDefaultValue(String key, int val);
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/logger/TraceMessage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/logger/TraceMessage.java b/s4-core/src/main/java/io/s4/logger/TraceMessage.java
deleted file mode 100644
index ffa24ae..0000000
--- a/s4-core/src/main/java/io/s4/logger/TraceMessage.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.logger;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class TraceMessage {
-    private long traceId;
-    private Map<String, String> propertyMap = new HashMap<String, String>();
-
-    public void setTraceId(long traceId) {
-        this.traceId = traceId;
-    }
-
-    public long getTraceId() {
-        return this.traceId;
-    }
-
-    public void setProperty(String name, String value) {
-        this.propertyMap.put(name, value);
-    }
-
-    public String toString() {
-        return this.traceId + "; " + this.propertyMap.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/message/PrototypeRequest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/message/PrototypeRequest.java b/s4-core/src/main/java/io/s4/message/PrototypeRequest.java
deleted file mode 100644
index 65ebdef..0000000
--- a/s4-core/src/main/java/io/s4/message/PrototypeRequest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.message;
-
-import io.s4.dispatcher.partitioner.CompoundKeyInfo;
-import io.s4.dispatcher.partitioner.Hasher;
-import io.s4.processor.PrototypeWrapper;
-import io.s4.util.MethodInvoker;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * A request for a value from the prototype of PEs.
- */
-public class PrototypeRequest extends Request {
-
-    private final List<String> query;
-
-    public PrototypeRequest(List<String> query, RInfo info) {
-        this.query = query;
-        this.rinfo = info;
-    }
-
-    public PrototypeRequest(List<String> query) {
-        this.query = query;
-        this.rinfo = null;
-    }
-
-    public PrototypeRequest() {
-        this.query = Collections.<String> emptyList();
-        this.rinfo = null;
-    }
-
-    public String toString() {
-        return "PROTOTYPE: query=[" + query + "] info=[" + rinfo + "]";
-    }
-
-    /**
-     * List of queries to execute.
-     * 
-     * @return list of queries
-     */
-    public List<String> getQuery() {
-        return query;
-    }
-
-    /**
-     * Evaluate Request on a particular PE Prototype.
-     * 
-     * @param pw
-     *            prototype
-     * @return Response object.
-     */
-    public Response evaluate(PrototypeWrapper pw) {
-
-        HashMap<String, Object> results = new HashMap<String, Object>();
-        HashMap<String, String> exceptions = new HashMap<String, String>();
-
-        for (String q : query) {
-            if (q.startsWith("$")) {
-                // requests for getters should be of the form $fieldA.
-                // Responds with pe.getFieldA()
-                try {
-                    Object res = MethodInvoker.invokeGetter(pw, q.substring(1));
-                    results.put(q, res);
-
-                } catch (Exception e) {
-                    exceptions.put(q, e.toString());
-                }
-
-            } else if (q.equalsIgnoreCase("count")) {
-                // Some aggregate operators
-                results.put(q, pw.getPECount());
-
-            } else {
-                exceptions.put(q, "Query Parse Error");
-            }
-        }
-
-        return new Response(results, exceptions, this);
-    }
-
-    public List<CompoundKeyInfo> partition(Hasher h, String delim, int partCount) {
-        // send to all partitions
-        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
-
-        for (int i = 0; i < partCount; ++i) {
-            CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
-            partitionInfo.setPartitionId(i);
-            partitionInfoList.add(partitionInfo);
-        }
-
-        return partitionInfoList;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/message/Request.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/message/Request.java b/s4-core/src/main/java/io/s4/message/Request.java
deleted file mode 100644
index 3c7de07..0000000
--- a/s4-core/src/main/java/io/s4/message/Request.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.message;
-
-import io.s4.dispatcher.partitioner.CompoundKeyInfo;
-import io.s4.dispatcher.partitioner.Hasher;
-import io.s4.util.GsonUtil;
-
-import java.lang.reflect.Type;
-import java.util.List;
-import java.util.UUID;
-
-import com.google.gson.InstanceCreator;
-
-abstract public class Request {
-
-    protected RInfo rinfo = nullRInfo;
-
-    public final static RInfo nullRInfo = new NullRInfo();
-
-    /**
-     * Requester/Return information
-     */
-    abstract public static class RInfo {
-
-        private long id = 0;
-
-        /**
-         * Identity of request. This is typically specified by the requester.
-         */
-        public long getId() {
-            return id;
-        }
-
-        public void setId(int id) {
-            this.id = id;
-        }
-
-        private String stream;
-
-        /**
-         * Stream name on which response should be sent.
-         * 
-         * @return stream name.
-         */
-        public String getStream() {
-            return stream;
-        }
-
-        public void setStream(String stream) {
-            this.stream = stream;
-        }
-
-        private int partition;
-
-        /**
-         * Partition Id from which this request originated. This may be used to
-         * return a response to the same partition.
-         * 
-         * @return partition id
-         */
-        public int getPartition() {
-            return partition;
-        }
-
-        public void setPartition(int partition) {
-            this.partition = partition;
-        }
-
-        // Tell Gson how to instantiate one of these: create a ClientRInfo
-        static {
-            InstanceCreator<RInfo> creator = new InstanceCreator<RInfo>() {
-                public io.s4.message.Request.RInfo createInstance(Type type) {
-                    return new io.s4.message.Request.ClientRInfo();
-                }
-            };
-
-            GsonUtil.registerTypeAdapter(RInfo.class, creator);
-        }
-        
-    }
-
-    public static class ClientRInfo extends RInfo {
-        private UUID requesterUUID = null;
-
-        /**
-         * Identity of requesting client. This is used to send the response back
-         * to the client.
-         * 
-         * @return UUID of the client from which the request originated.
-         */
-        public UUID getRequesterUUID() {
-            return requesterUUID;
-        }
-
-        public void setRequesterUUID(UUID requesterUUID) {
-            this.requesterUUID = requesterUUID;
-        }
-
-        public String toString() {
-            return "(id:" + getId() + " requester:" + getRequesterUUID()
-                    + " partition:" + getPartition() + " stream:" + getStream()
-                    + ")";
-        }
-    }
-
-    public static class PERInfo extends RInfo {
-        private String requesterKey = null;
-
-        /**
-         * Identity of requesting PE. This is used to route the response back to
-         * the originating PE.
-         * 
-         * @return key value of the PE from which the request originated.
-         */
-        public String getRequesterKey() {
-            return requesterKey;
-        }
-
-        public void setRequesterKey(String requesterKey) {
-            this.requesterKey = requesterKey;
-        }
-
-        public String toString() {
-            return "(id:" + getId() + " requester:" + getRequesterKey()
-                    + " partition:" + getPartition() + " stream:" + getStream()
-                    + ")";
-        }
-    }
-
-    public static class NullRInfo extends RInfo {
-        public NullRInfo() {
-            super.stream = "@null";
-            super.partition = -1;
-        }
-    }
-
-    /**
-     * Query metainformation.
-     * 
-     * @return Info representing origin of request.
-     */
-    public RInfo getRInfo() {
-        return rinfo;
-    }
-
-    /**
-     * Query metainformation.
-     */
-    public void setRInfo(RInfo rinfo) {
-        this.rinfo = rinfo;
-    }
-
-    /**
-     * Partition itself. This is used by the default partitioner.
-     * 
-     * @param h
-     *            hasher
-     * @param delim
-     *            delimiter used to concatenate compound key values
-     * @param partCount
-     *            number of partitions
-     * @return list of compound keys: one event may have to be sent to multiple
-     *         nodes.
-     */
-    abstract public List<CompoundKeyInfo> partition(Hasher h, String delim,
-                                                    int partCount);
-}
\ No newline at end of file


Mime
View raw message