incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [3/7] git commit: S4-44 add timeout for fetching checkpoints
Date Wed, 12 Sep 2012 16:36:37 GMT
S4-44 add timeout for fetching checkpoints

- also avoid enqueueing checkpoints messages for time-triggered
checkpointing
- tests: use zk rather than files for comparing results


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/b6a90d0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/b6a90d0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/b6a90d0a

Branch: refs/heads/dev-old
Commit: b6a90d0a99581f35dcf33aafeaf1aa0110a7d426
Parents: dd96e87
Author: Matthieu Morel <mmorel@apache.org>
Authored: Thu Mar 1 16:57:49 2012 +0100
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Thu Mar 1 16:57:49 2012 +0100

----------------------------------------------------------------------
 .../org/apache/s4/ft/CheckpointingCoordinator.java |   77 +++--
 .../src/main/java/org/apache/s4/ft/FetchTask.java  |   43 +++
 .../apache/s4/ft/InitiateCheckpointingEvent.java   |   37 --
 .../src/main/java/org/apache/s4/ft/SafeKeeper.java |  255 +++++++--------
 .../main/java/org/apache/s4/ft/SaveStateTask.java  |   10 +-
 .../main/java/org/apache/s4/ft/SerializeTask.java  |    9 +-
 .../java/org/apache/s4/processor/AbstractPE.java   |  130 +++-----
 .../main/java/org/apache/s4/util/MetricsName.java  |   30 +-
 .../test/java/org/apache/s4/ft/RecoveryTest.java   |   10 +-
 .../test/java/org/apache/s4/ft/StatefulTestPE.java |  243 +++++++-------
 .../src/test/java/org/apache/s4/ft/TestUtils.java  |   10 +-
 .../org/apache/s4/ft/rectimeout/BrokenStorage.java |   53 +++
 .../s4/ft/rectimeout/RecoveryTimeoutTest.java      |  130 ++++++++
 .../java/org/apache/s4/ft/rectimeout/app_conf.xml  |   26 ++
 .../ft/rectimeout/s4_core_conf_broken_backend.xml  |  196 +++++++++++
 .../org/apache/s4/ft/rectimeout/wall_clock.xml     |    6 +
 .../apache/s4/ft/wordcount/FTWordCountTest.java    |   33 +--
 .../org/apache/s4/wordcount/WordClassifier.java    |   17 +-
 .../org/apache/s4/wordcount/WordCountTest.java     |   13 +-
 19 files changed, 861 insertions(+), 467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java b/s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java
index 7a20ea7..e49f529 100644
--- a/s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java
+++ b/s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.s4.ft;
 
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -27,8 +28,8 @@ import org.apache.s4.processor.AbstractPE;
 
 /**
  * Prevents event processing thread and serialization thread to overlap on the same PE instance, which would cause consistency issues during recovery.
- * 
- * How it works:
+ *
+ * How it works (for each prototype):
  * - we keep track of the PE being serialized and the PE being processed
  * - access to the PE is guarded by the instance of this class
  * - if the event processing thread receives an event that is handled by a PE currently being serialized, it waits until serialization is complete
@@ -41,14 +42,18 @@ public class CheckpointingCoordinator {
 	private static final Logger logger = Logger
 			.getLogger(CheckpointingCoordinator.class);
 
-	AbstractPE processing = null;
-	AbstractPE serializing = null;
+	private static class PrototypeSynchro {
+		Lock lock = new ReentrantLock();
+		Condition processingFinished = lock.newCondition();
+		Condition serializingFinished = lock.newCondition();
+		AbstractPE processing = null;
+		AbstractPE serializing = null;
+	}
+
+	ConcurrentHashMap<String, PrototypeSynchro> synchros = new ConcurrentHashMap<String, CheckpointingCoordinator.PrototypeSynchro>();
 
 	long maxSerializationLockDuration;
 
-	Lock lock = new ReentrantLock();
-	Condition processingFinished = lock.newCondition();
-	Condition serializingFinished = lock.newCondition();
 
 	public CheckpointingCoordinator(long maxSerializationLockDuration) {
 		super();
@@ -56,15 +61,16 @@ public class CheckpointingCoordinator {
 	}
 
 	public void acquireForProcessing(AbstractPE pe) {
-		lock.lock();
+		PrototypeSynchro sync = getPrototypeSynchro(pe);
+		sync.lock.lock();
 		try {
-			if (serializing == pe) {
+			if (sync.serializing == pe) {
 				try {
 					if (logger.isTraceEnabled()) {
 						logger.trace("processing must wait for serialization to finish for PE "
 								+ pe.getId() + "/" + pe.getKeyValueString());
 					}
-					serializingFinished.await(maxSerializationLockDuration,
+					sync.serializingFinished.await(maxSerializationLockDuration,
 							TimeUnit.MILLISECONDS);
 					acquireForProcessing(pe);
 				} catch (InterruptedException e) {
@@ -75,61 +81,76 @@ public class CheckpointingCoordinator {
 							+ "/"
 							+ pe.getKeyValueString()
 							+ "]\nProceeding anyway, but checkpoint may contain inconsistent value");
-					serializing = null;
+					sync.serializing = null;
 				}
 			}
-			processing = pe;
+			sync.processing = pe;
 		} finally {
-			lock.unlock();
+			sync.lock.unlock();
 		}
 	}
 
 	public void releaseFromProcessing(AbstractPE pe) {
-		lock.lock();
+		PrototypeSynchro sync = getPrototypeSynchro(pe);
+		sync.lock.lock();
 		try {
-			if (processing == pe) {
-				processing = null;
-				processingFinished.signal();
+			if (sync.processing == pe) {
+				sync.processing = null;
+				sync.processingFinished.signal();
 			} else {
 				logger.warn("Cannot release from processing thread a PE that is not already in processing state");
 			}
 		} finally {
-			lock.unlock();
+			sync.lock.unlock();
 		}
 	}
 
 	public void acquireForSerialization(AbstractPE pe) {
-		lock.lock();
+		PrototypeSynchro sync = getPrototypeSynchro(pe);
+		sync.lock.lock();
 		try {
-			if (processing == pe) {
+			if (sync.processing == pe) {
 				try {
 					if (logger.isTraceEnabled()) {
 						logger.trace("serialization must wait for processing to finish for PE "
 								+ pe.getId() + "/" + pe.getKeyValueString());
 					}
-					processingFinished.await(maxSerializationLockDuration, TimeUnit.MILLISECONDS);
+					sync.processingFinished.await(maxSerializationLockDuration, TimeUnit.MILLISECONDS);
 					acquireForSerialization(pe);
 				} catch (InterruptedException e) {
 					// we still need to make sure it is now safe to serialize
 					acquireForSerialization(pe);
 				}
 			}
-			serializing = pe;
+			sync.serializing = pe;
 		} finally {
-			lock.unlock();
+			sync.lock.unlock();
+		}
+	}
+
+	private PrototypeSynchro getPrototypeSynchro(AbstractPE pe) {
+		PrototypeSynchro sync = synchros.get(pe.getId());
+		if (sync==null) {
+			sync = new PrototypeSynchro();
+			PrototypeSynchro existing = synchros.putIfAbsent(pe.getId(), sync);
+			if (existing !=null) {
+				sync = existing;
+			}
 		}
+		return sync;
 	}
 
 	public void releaseFromSerialization(AbstractPE pe)
 			throws InterruptedException {
-		lock.lock();
+		PrototypeSynchro sync = synchros.get(pe.getId());
+		sync.lock.lock();
 		try {
-			if (serializing == pe) {
-				serializing = null;
-				serializingFinished.signal();
+			if (sync.serializing == pe) {
+				sync.serializing = null;
+				sync.serializingFinished.signal();
 			}
 		} finally {
-			lock.unlock();
+			sync.lock.unlock();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/main/java/org/apache/s4/ft/FetchTask.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/FetchTask.java b/s4-core/src/main/java/org/apache/s4/ft/FetchTask.java
new file mode 100644
index 0000000..d3153ab
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/FetchTask.java
@@ -0,0 +1,43 @@
+package org.apache.s4.ft;
+
+import static org.apache.s4.util.MetricsName.S4_CORE_METRICS;
+
+import java.util.concurrent.Callable;
+
+import org.apache.s4.util.MetricsName;
+
+/**
+ * Encapsulates a fetching operation.
+ *
+ */
+public class FetchTask implements Callable<byte[]>{
+
+	StateStorage stateStorage;
+	SafeKeeper safeKeeper;
+	SafeKeeperId safeKeeperId;
+
+	public FetchTask(StateStorage stateStorage, SafeKeeper safeKeeper,
+			SafeKeeperId safeKeeperId) {
+		super();
+		this.stateStorage = stateStorage;
+		this.safeKeeper = safeKeeper;
+		this.safeKeeperId = safeKeeperId;
+	}
+
+	@Override
+	public byte[] call() throws Exception {
+		try {
+			byte[] result = stateStorage.fetchState(safeKeeperId);
+			if (safeKeeper.monitor!=null) {
+				safeKeeper.monitor.increment(MetricsName.checkpointing_fetching_success.toString(), 1, S4_CORE_METRICS.toString());
+			}
+			return result;
+		} catch (Exception e) {
+			if (safeKeeper.monitor!=null) {
+				safeKeeper.monitor.increment(MetricsName.checkpointing_fetching_failed.toString(), 1, S4_CORE_METRICS.toString());
+			}
+			throw e;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/main/java/org/apache/s4/ft/InitiateCheckpointingEvent.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/InitiateCheckpointingEvent.java b/s4-core/src/main/java/org/apache/s4/ft/InitiateCheckpointingEvent.java
deleted file mode 100644
index b910e62..0000000
--- a/s4-core/src/main/java/org/apache/s4/ft/InitiateCheckpointingEvent.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.s4.ft;
-
-/**
- * 
- * Event that triggers a checkpoint.
- *
- */
-public class InitiateCheckpointingEvent extends CheckpointingEvent {
-
-    public InitiateCheckpointingEvent() {
-        // as required by default kryo serializer
-    }
-
-    public InitiateCheckpointingEvent(SafeKeeperId safeKeeperId) {
-        super(safeKeeperId);
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
index aa72010..5e8b8c4 100644
--- a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
+++ b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
@@ -23,10 +23,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.log4j.Logger;
 import org.apache.s4.dispatcher.Dispatcher;
@@ -38,19 +40,19 @@ import org.apache.s4.serialize.SerializerDeserializer;
 import org.apache.s4.util.MetricsName;
 
 /**
- * 
+ *
  * <p>
  * This class is responsible for coordinating interactions between the S4 event
  * processor and the checkpoint storage backend. In particular, it schedules
  * asynchronous save tasks to be executed on the backend.
  * </p>
- * 
- * 
- * 
+ *
+ *
+ *
  */
 public class SafeKeeper {
 
-    public enum StorageResultCode { 
+    public enum StorageResultCode {
         SUCCESS, FAILURE
     }
 
@@ -66,19 +68,26 @@ public class SafeKeeper {
 
     private ThreadPoolExecutor storageThreadPool;
     private ThreadPoolExecutor serializationThreadPool;
-    
+    private ThreadPoolExecutor fetchingThreadPool;
+
     private CheckpointingCoordinator processingSerializationSynchro;
-    
-    private Monitor monitor;
-    
+
+    Monitor monitor;
+
     int storageMaxThreads = 1;
     int storageThreadKeepAliveSeconds = 120;
     int storageMaxOutstandingRequests = 1000;
-    
-    int serializationMaxThreads=1;
+
     int serializationThreadKeepAliveSeconds = 120;
     int serializationMaxOutstandingRequests = 1000;
-    
+
+    long fetchingMaxWaitMs = 1000;
+    int fetchingMaxConsecutiveFailuresBeforeDisabling = 10;
+    int fetchingCurrentConsecutiveFailures = 0;
+    long fetchingDisabledDurationMs = 600000;
+    long fetchingDisabledInitTime=-1;
+
+
     long maxSerializationLockTime = 1000;
 
     public SafeKeeper() {
@@ -103,9 +112,11 @@ public class SafeKeeper {
         }
         storageThreadPool = new ThreadPoolExecutor(1, storageMaxThreads, storageThreadKeepAliveSeconds, TimeUnit.SECONDS,
                 new ArrayBlockingQueue<Runnable>(storageMaxOutstandingRequests));
-        serializationThreadPool = new ThreadPoolExecutor(1, serializationMaxThreads, serializationThreadKeepAliveSeconds, TimeUnit.SECONDS,
+        serializationThreadPool = new ThreadPoolExecutor(1, 1, serializationThreadKeepAliveSeconds, TimeUnit.SECONDS,
                 new ArrayBlockingQueue<Runnable>(serializationMaxOutstandingRequests));
-        
+        fetchingThreadPool = new ThreadPoolExecutor(1, 1, serializationThreadKeepAliveSeconds, TimeUnit.SECONDS,
+                new ArrayBlockingQueue<Runnable>(serializationMaxOutstandingRequests));
+
         processingSerializationSynchro = new CheckpointingCoordinator(maxSerializationLockTime);
 
         logger.debug("Started thread pool with maxWriteThreads=[" + storageMaxThreads
@@ -121,68 +132,68 @@ public class SafeKeeper {
             }
             nodeCount = getLoopbackDispatcher().getEventEmitter().getNodeCount();
         }
-        
+
         signalNodesAvailable.countDown();
     }
-    
+
     /**
      * Synchronization to prevent race conditions with serialization threads
      */
     public void acquirePermitForProcessing(AbstractPE pe) {
-    	processingSerializationSynchro.acquireForProcessing(pe);
+        processingSerializationSynchro.acquireForProcessing(pe);
     }
-    
+
     /**
      * Notification part of the mechanism for preventing race condition with serialization threads
      */
-	public void releasePermitForProcessing(AbstractPE pe) {
-		processingSerializationSynchro.releaseFromProcessing(pe);
-	}
-
-
-	/**
-	 * Serializes and stores state to the storage backend. Serialization and storage operations are asynchronous.
-	 * 
-	 * @return a callback for getting notified of the result of the storage operation
-	 */
-	public StorageCallback saveState(AbstractPE pe) {
-		StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
-		Future<byte[]> futureSerializedState = null;
-		try {
-			futureSerializedState = serializeState(pe, processingSerializationSynchro);
-		} catch (RejectedExecutionException e) {
-			if (monitor!=null) {
-				monitor.increment(MetricsName.checkpointing_dropped_from_serialization_queue.toString(), 1, S4_CORE_METRICS.toString());
-			}
-			storageCallback.storageOperationResult(StorageResultCode.FAILURE,
-					"Serialization task queue is full. An older serialization task was dumped in order to serialize PE ["+ pe.getId()+"]" +
-							"	Remaining capacity for the serialization task queue is ["
-							+ serializationThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
-							+ serializationThreadPool.getQueue().size() + "] ; maximum capacity is [" + serializationThreadPool
-							+ "]");
-			return storageCallback;
-		}
-		submitSaveStateTask(new SaveStateTask(pe.getSafeKeeperId(), futureSerializedState, storageCallback, stateStorage), storageCallback);
-		return storageCallback;
-	}
-    
+    public void releasePermitForProcessing(AbstractPE pe) {
+        processingSerializationSynchro.releaseFromProcessing(pe);
+    }
+
+
+    /**
+     * Serializes and stores state to the storage backend. Serialization and storage operations are asynchronous.
+     *
+     * @return a callback for getting notified of the result of the storage operation
+     */
+    public StorageCallback saveState(AbstractPE pe) {
+        StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
+        Future<byte[]> futureSerializedState = null;
+        try {
+            futureSerializedState = serializeState(pe, processingSerializationSynchro);
+        } catch (RejectedExecutionException e) {
+            if (monitor!=null) {
+                monitor.increment(MetricsName.checkpointing_dropped_from_serialization_queue.toString(), 1, S4_CORE_METRICS.toString());
+            }
+            storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+                    "Serialization task queue is full. An older serialization task was dumped in order to serialize PE ["+ pe.getId()+"]" +
+                            "    Remaining capacity for the serialization task queue is ["
+                            + serializationThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+                            + serializationThreadPool.getQueue().size() + "] ; maximum capacity is [" + serializationThreadPool
+                            + "]");
+            return storageCallback;
+        }
+        submitSaveStateTask(new SaveStateTask(pe.getSafeKeeperId(), futureSerializedState, storageCallback, stateStorage), storageCallback);
+        return storageCallback;
+    }
+
     private Future<byte[]> serializeState(AbstractPE pe, CheckpointingCoordinator coordinator) {
         Future<byte[]> future = serializationThreadPool.submit(new SerializeTask(pe, coordinator));
-    	if(monitor!=null) {
+        if(monitor!=null) {
             monitor.increment(MetricsName.checkpointing_added_to_serialization_queue.toString(), 1, S4_CORE_METRICS.toString());
         }
-    	return future;
+        return future;
     }
-    
+
     private void submitSaveStateTask(SaveStateTask task, StorageCallback storageCallback) {
-    	try {
+        try {
             storageThreadPool.execute(task);
             if (monitor!=null) {
-                monitor.increment(MetricsName.checkpointing_added_to_storage_queue.toString(), 1);
+                monitor.increment(MetricsName.checkpointing_added_to_storage_queue.toString(), 1, S4_CORE_METRICS.toString());
             }
-    	} catch (RejectedExecutionException e) {
+        } catch (RejectedExecutionException e) {
             if (monitor!=null) {
-                monitor.increment(MetricsName.checkpointing_dropped_from_storage_queue.toString(), 1);
+                monitor.increment(MetricsName.checkpointing_dropped_from_storage_queue.toString(), 1, S4_CORE_METRICS.toString());
             }
             storageCallback.storageOperationResult(StorageResultCode.FAILURE,
                     "Storage checkpoint queue is full. Removed an old task to handle latest task. Remaining capacity for task queue is ["
@@ -194,7 +205,7 @@ public class SafeKeeper {
 
     /**
      * Fetches checkpoint data from storage for a given PE
-     * 
+     *
      * @param key
      *            safeKeeperId
      * @return checkpoint data
@@ -206,38 +217,32 @@ public class SafeKeeper {
         } catch (InterruptedException ignored) {
         }
         byte[] result = null;
-        result = stateStorage.fetchState(key);
-        return result;
-    }
-
-    /**
-     * Generates a checkpoint event for a given PE, and enqueues it in the local
-     * event queue.
-     * 
-     * @param pe
-     *            reference to a PE
-     */
-    public void generateCheckpoint(AbstractPE pe) {
-        InitiateCheckpointingEvent initiateCheckpointingEvent = new InitiateCheckpointingEvent(pe.getSafeKeeperId());
-
-        List<List<String>> compoundKeyNames;
-        if (pe.getKeyValueString() == null) {
-            logger.warn("Only keyed PEs can be checkpointed. Current PE [" + pe.getSafeKeeperId()
-                    + "] will not be checkpointed.");
-        } else {
-            List<String> list = new ArrayList<String>(1);
-            list.add("");
-            compoundKeyNames = new ArrayList<List<String>>(1);
-            compoundKeyNames.add(list);
-            loopbackDispatcher.dispatchEvent(pe.getId() + "_checkpointing", compoundKeyNames,
-                    initiateCheckpointingEvent);
+        if ((fetchingCurrentConsecutiveFailures>0 && (fetchingCurrentConsecutiveFailures== fetchingMaxConsecutiveFailuresBeforeDisabling))) {
+            if((fetchingDisabledInitTime+fetchingDisabledDurationMs)<System.currentTimeMillis()) {
+                return null;
+            } else {
+                // reached time, reinit
+                fetchingCurrentConsecutiveFailures=0;
+            }
         }
+        Future<byte[]> fetched = fetchingThreadPool.submit(new FetchTask(stateStorage, this, key));
+        try {
+            result = fetched.get(fetchingMaxWaitMs, TimeUnit.MILLISECONDS);
+            fetchingCurrentConsecutiveFailures=0;
+        } catch (Exception e) {
+            logger.error("Cannot fetch checkpoint from backend for key ["+ key.getStringRepresentation()+"]", e);
+            fetchingCurrentConsecutiveFailures++;
+            if (fetchingCurrentConsecutiveFailures==fetchingMaxConsecutiveFailuresBeforeDisabling) {
+                fetchingDisabledInitTime = System.currentTimeMillis();
+            }
+        }
+        return result;
     }
 
     /**
      * Generates a recovery event, and enqueues it in the local event queue.<br/>
      * This can be used for an eager recovery mechanism.
-     * 
+     *
      * @param safeKeeperId
      *            safeKeeperId to recover
      */
@@ -296,62 +301,54 @@ public class SafeKeeper {
         this.storageCallbackFactory = storageCallbackFactory;
     }
 
-	public int getStorageMaxThreads() {
-		return storageMaxThreads;
-	}
-
-	public void setStorageMaxThreads(int storageMaxThreads) {
-		this.storageMaxThreads = storageMaxThreads;
-	}
-
-	public int getStorageThreadKeepAliveSeconds() {
-		return storageThreadKeepAliveSeconds;
-	}
+    public int getStorageMaxThreads() {
+        return storageMaxThreads;
+    }
 
-	public void setStorageThreadKeepAliveSeconds(int storageThreadKeepAliveSeconds) {
-		this.storageThreadKeepAliveSeconds = storageThreadKeepAliveSeconds;
-	}
+    public void setStorageMaxThreads(int storageMaxThreads) {
+        this.storageMaxThreads = storageMaxThreads;
+    }
 
-	public int getStorageMaxOutstandingRequests() {
-		return storageMaxOutstandingRequests;
-	}
+    public int getStorageThreadKeepAliveSeconds() {
+        return storageThreadKeepAliveSeconds;
+    }
 
-	public void setStorageMaxOutstandingRequests(int storageMaxOutstandingRequests) {
-		this.storageMaxOutstandingRequests = storageMaxOutstandingRequests;
-	}
+    public void setStorageThreadKeepAliveSeconds(int storageThreadKeepAliveSeconds) {
+        this.storageThreadKeepAliveSeconds = storageThreadKeepAliveSeconds;
+    }
 
-	public int getSerializationMaxThreads() {
-		return serializationMaxThreads;
-	}
+    public int getStorageMaxOutstandingRequests() {
+        return storageMaxOutstandingRequests;
+    }
 
-	public void setSerializationMaxThreads(int serializationMaxThreads) {
-		this.serializationMaxThreads = serializationMaxThreads;
-	}
+    public void setStorageMaxOutstandingRequests(int storageMaxOutstandingRequests) {
+        this.storageMaxOutstandingRequests = storageMaxOutstandingRequests;
+    }
 
-	public int getSerializationThreadKeepAliveSeconds() {
-		return serializationThreadKeepAliveSeconds;
-	}
+    public int getSerializationThreadKeepAliveSeconds() {
+        return serializationThreadKeepAliveSeconds;
+    }
 
-	public void setSerializationThreadKeepAliveSeconds(
-			int serializationThreadKeepAliveSeconds) {
-		this.serializationThreadKeepAliveSeconds = serializationThreadKeepAliveSeconds;
-	}
+    public void setSerializationThreadKeepAliveSeconds(
+            int serializationThreadKeepAliveSeconds) {
+        this.serializationThreadKeepAliveSeconds = serializationThreadKeepAliveSeconds;
+    }
 
-	public int getSerializationMaxOutstandingRequests() {
-		return serializationMaxOutstandingRequests;
-	}
+    public int getSerializationMaxOutstandingRequests() {
+        return serializationMaxOutstandingRequests;
+    }
 
-	public void setSerializationMaxOutstandingRequests(
-			int serializationMaxOutstandingRequests) {
-		this.serializationMaxOutstandingRequests = serializationMaxOutstandingRequests;
-	}
+    public void setSerializationMaxOutstandingRequests(
+            int serializationMaxOutstandingRequests) {
+        this.serializationMaxOutstandingRequests = serializationMaxOutstandingRequests;
+    }
 
-	public long getMaxSerializationLockTime() {
-		return maxSerializationLockTime;
-	}
+    public long getMaxSerializationLockTime() {
+        return maxSerializationLockTime;
+    }
 
-	public void setMaxSerializationLockTime(long maxSerializationLockTime) {
-		this.maxSerializationLockTime = maxSerializationLockTime;
-	}
+    public void setMaxSerializationLockTime(long maxSerializationLockTime) {
+        this.maxSerializationLockTime = maxSerializationLockTime;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java b/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
index 81363a1..25ce279 100644
--- a/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
+++ b/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
@@ -24,18 +24,18 @@ import org.apache.log4j.Logger;
 
 
 /**
- * 
+ *
  * Encapsulates a checkpoint request. It is scheduled by the checkpointing framework.
  *
  */
 public class SaveStateTask implements Runnable {
-    
+
     SafeKeeperId safeKeeperId;
     byte[] serializedState;
     Future<byte[]> futureSerializedState = null;
     StorageCallback storageCallback;
     StateStorage stateStorage;
-    
+
     public SaveStateTask(SafeKeeperId safeKeeperId, byte[] state, StorageCallback storageCallback, StateStorage stateStorage) {
         super();
         this.safeKeeperId = safeKeeperId;
@@ -43,14 +43,14 @@ public class SaveStateTask implements Runnable {
         this.storageCallback = storageCallback;
         this.stateStorage = stateStorage;
     }
-    
+
     public SaveStateTask(SafeKeeperId safeKeeperId, Future<byte[]> futureSerializedState, StorageCallback storageCallback, StateStorage stateStorage) {
     	this.safeKeeperId = safeKeeperId;
         this.futureSerializedState = futureSerializedState;
         this.storageCallback = storageCallback;
         this.stateStorage = stateStorage;
     }
-    
+
     @Override
     public void run() {
     	if (futureSerializedState!=null) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java b/s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java
index 2fe2d13..ba68aa4 100644
--- a/s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java
+++ b/s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java
@@ -21,11 +21,15 @@ import java.util.concurrent.Callable;
 
 import org.apache.s4.processor.AbstractPE;
 
+/**
+ * Encapsulates serialiation operation. Ensures semaphore taken on PE when serializing.
+ *
+ */
 public class SerializeTask implements Callable<byte[]> {
 
 	AbstractPE pe;
 	private CheckpointingCoordinator coordinator;
-	
+
 	public SerializeTask(AbstractPE pe, CheckpointingCoordinator coordinator) {
 		super();
 		this.pe = pe;
@@ -38,6 +42,9 @@ public class SerializeTask implements Callable<byte[]> {
 			coordinator.acquireForSerialization(pe);
 			return pe.serializeState();
 		} finally {
+	        // remove dirty flag
+	        pe.setCheckpointable(false);
+
 			coordinator.releaseFromSerialization(pe);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java b/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
index e4b7109..169e32b 100644
--- a/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
+++ b/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
@@ -17,14 +17,21 @@
  */
 package org.apache.s4.processor;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.log4j.Logger;
 import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
 import org.apache.s4.dispatcher.partitioner.KeyInfo;
 import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElement;
 import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementIndex;
 import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementName;
-import org.apache.s4.ft.CheckpointingCoordinator;
-import org.apache.s4.ft.InitiateCheckpointingEvent;
-import org.apache.s4.ft.RecoveryEvent;
 import org.apache.s4.ft.SafeKeeper;
 import org.apache.s4.ft.SafeKeeperId;
 import org.apache.s4.persist.Persister;
@@ -33,23 +40,6 @@ import org.apache.s4.schema.Schema.Property;
 import org.apache.s4.schema.SchemaContainer;
 import org.apache.s4.util.clock.Clock;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.log4j.Logger;
-
 /**
  * This is the base class for processor classes.
  * <p>
@@ -86,7 +76,7 @@ public abstract class AbstractPE implements Cloneable {
     }
 
     transient private Clock clock;
-    // FIXME replaces monitor wait on AbstractPE, for triggering possible extra
+    // replaces monitor wait on AbstractPE, for triggering possible extra
     // thread when checkpointing activated
     transient private CountDownLatch s4ClockSetSignal = new CountDownLatch(1);
     transient private int outputFrequency = 1;
@@ -110,9 +100,7 @@ public abstract class AbstractPE implements Cloneable {
 
     transient private boolean recoveryAttempted = false;
     // true if state may have changed
-    transient private boolean checkpointable = false;
-    // use a flag for identifying checkpointing events
-    transient private boolean isCheckpointingEvent = false;
+    transient private volatile boolean checkpointable = false;
 
     transient private SafeKeeper safeKeeper; // handles fault tolerance
     transient private CountDownLatch safeKeeperSetSignal = new CountDownLatch(1);
@@ -122,7 +110,7 @@ public abstract class AbstractPE implements Cloneable {
     transient private int checkpointableEventCount = 0;
     transient private int checkpointsBeforePause = -1;
     transient private long checkpointingPauseTimeInMillis;
-    
+
 
     transient private OverloadDispatcher overloadDispatcher;
 
@@ -209,9 +197,7 @@ public abstract class AbstractPE implements Cloneable {
         this.streamName = streamName;
 
         if (safeKeeper != null) {
-        	safeKeeper.acquirePermitForProcessing(this);
-            // initialize checkpointing event flag
-            this.isCheckpointingEvent = false;
+            safeKeeper.acquirePermitForProcessing(this);
             if (!recoveryAttempted) {
                 recover();
                 recoveryAttempted = true;
@@ -224,7 +210,7 @@ public abstract class AbstractPE implements Cloneable {
             keyRecord.clear(); // the PE doesn't need it anymore
         }
 
-        if (outputFrequencyType == FrequencyType.EVENTCOUNT && outputFrequency > 0 && !isCheckpointingEvent) {
+        if (outputFrequencyType == FrequencyType.EVENTCOUNT && outputFrequency > 0) {
             eventCount++;
             if (eventCount % outputFrequency == 0) {
                 try {
@@ -235,18 +221,17 @@ public abstract class AbstractPE implements Cloneable {
             }
         }
 
-        // do not take into account checkpointing/recovery trigger messages
-        if (!isCheckpointingEvent) {
-            checkpointable = true; // dirty flag
-            if (checkpointingFrequencyType == FrequencyType.EVENTCOUNT && checkpointingFrequency > 0) {
-                checkpointableEventCount++;
-                if (checkpointableEventCount % checkpointingFrequency == 0) {
-                    // for count-based frequency, we directly checkpoint here
-                    checkpoint();
-                }
+        setCheckpointable(true); // dirty flag
+        if (checkpointingFrequencyType == FrequencyType.EVENTCOUNT && checkpointingFrequency > 0) {
+            checkpointableEventCount++;
+            if (checkpointableEventCount % checkpointingFrequency == 0) {
+                // for count-based frequency, we directly checkpoint here
+            	if (isCheckpointable()) {
+            		checkpoint();
+            	}
             }
         }
-        
+
         if (safeKeeper!=null) {
             safeKeeper.releasePermitForProcessing(this);
         }
@@ -261,7 +246,7 @@ public abstract class AbstractPE implements Cloneable {
      * <p>
      * The key value is a list because the key may be a compound (composite)
      * key, in which case the key will have one value for each simple key.
-     * 
+     *
      * @return the key value as a List of Objects (each element contains the
      *         value of a simple key).
      **/
@@ -363,7 +348,7 @@ public abstract class AbstractPE implements Cloneable {
      * "by event count" with an output frequency of 1. (That is,
      * <code>output</code> is called after after each return from
      * <code>processEvent</code>).
-     * 
+     *
      * @param outputFrequency
      *            the number of application events passed to
      *            <code>processEvent</code> before output is called.
@@ -377,7 +362,7 @@ public abstract class AbstractPE implements Cloneable {
     /**
      * Sets the frequency strategy to "by event count". Uses the same mechanism
      * than {@link #setOutputFrequencyByEventCount(int)}
-     * 
+     *
      * @param checkpointingFrequency
      *            the number of application events passed to
      *            <code>processEvent</code> before output is called (ignoring
@@ -386,7 +371,6 @@ public abstract class AbstractPE implements Cloneable {
     public void setCheckpointingFrequencyByEventCount(int checkpointingFrequency) {
         this.checkpointingFrequency = checkpointingFrequency;
         this.checkpointingFrequencyType = FrequencyType.EVENTCOUNT;
-        supplementAdviceForCheckpointingAndRecovery();
     }
 
     /**
@@ -414,7 +398,7 @@ public abstract class AbstractPE implements Cloneable {
      * "by event count" with an output frequency of 1. (That is,
      * <code>output</code> is called after after each return from
      * <code>processEvent</code>).
-     * 
+     *
      * @param outputFrequency
      *            the time boundary in seconds
      **/
@@ -427,14 +411,13 @@ public abstract class AbstractPE implements Cloneable {
     /**
      * Sets the frequency of checkpointing. It uses the same mechanism than
      * {@link #setOutputFrequencyByTimeBoundary(int)}
-     * 
+     *
      * @param checkpointingFrequency
      *            the time boundary in seconds
      */
     public void setCheckpointingFrequencyByTimeBoundary(int checkpointingFrequency) {
         this.checkpointingFrequency = checkpointingFrequency;
         this.checkpointingFrequencyType = FrequencyType.TIMEBOUNDARY;
-        supplementAdviceForCheckpointingAndRecovery();
         initFrequency(PeriodicInvokerType.CHECKPOINTING);
     }
 
@@ -458,13 +441,12 @@ public abstract class AbstractPE implements Cloneable {
      * Set the offset from the time boundary at which calls to checkpoint should
      * be performed. It uses the same mechanism than
      * {@link AbstractPE#setOutputFrequencyOffset(int)}
-     * 
+     *
      * @param checkpointingFrequencyOffset
      *            checkpointing frequency offset in seconds
      */
     public void setCheckpointingFrequencyOffset(int checkpointingFrequencyOffset) {
         this.checkpointingFrequencyOffset = checkpointingFrequencyOffset;
-        supplementAdviceForCheckpointingAndRecovery();
     }
 
     public void setKeys(String[] keys) {
@@ -472,7 +454,6 @@ public abstract class AbstractPE implements Cloneable {
             StringTokenizer st = new StringTokenizer(key);
             eventAdviceList.add(new EventAdvice(st.nextToken(), st.nextToken()));
         }
-        supplementAdviceForCheckpointingAndRecovery();
     }
 
     private void initFrequency(PeriodicInvokerType type) {
@@ -519,7 +500,7 @@ public abstract class AbstractPE implements Cloneable {
     }
 
     /**
-     * 
+     *
      */
     public int getTtl() {
         return ttl;
@@ -530,7 +511,7 @@ public abstract class AbstractPE implements Cloneable {
     }
 
     /**
-     * 
+     *
      */
     public void setLookupTable(Persister lookupTable) {
         this.lookupTable = lookupTable;
@@ -548,8 +529,6 @@ public abstract class AbstractPE implements Cloneable {
 
         // NOTE: assumes pe id is keyvalue from the PE...
         safeKeeper.saveState(this);
-        // remove dirty flag
-        checkpointable = false;
     }
 
     protected void recover() {
@@ -581,28 +560,24 @@ public abstract class AbstractPE implements Cloneable {
         }
     }
 
-    public final void processEvent(InitiateCheckpointingEvent checkpointingEvent) {
-        isCheckpointingEvent = true;
-        if (isCheckpointable()) {
-            checkpoint();
-        }
-    }
-
+    /**
+     * Indicates whether this PE is dirty and therefore checkpointable.
+     * Developers can override this method in order to precisely define the conditions of checkpointability.
+     * @return true if this PE can be checkpointed, false otherwise
+     */
     protected boolean isCheckpointable() {
         return checkpointable;
     }
 
-    protected void setCheckpointable(boolean checkpointable) {
+    /**
+     * Marks the PE as clean or dirty. Only dirty PEs are checkpointed.
+     * Developers can override this method in order to have more control
+     * @param checkpointable true|false for setting the dirty state of the PE
+     */
+    public void setCheckpointable(boolean checkpointable) {
         this.checkpointable = checkpointable;
     }
 
-    public final void initiateCheckpoint() {
-        // enqueue checkpointing event
-        if (safeKeeper != null) {
-            safeKeeper.generateCheckpoint(this);
-        }
-    }
-
     public byte[] serializeState() {
         return safeKeeper.getSerializer().serialize(this);
     }
@@ -643,23 +618,6 @@ public abstract class AbstractPE implements Cloneable {
     }
 
     /**
-     * Subscribes this PE to the checkpointing stream
-     */
-    private void supplementAdviceForCheckpointingAndRecovery() {
-        // don't do anything until both conditions are true
-        Logger.getLogger("s4").info(
-                "Maybe adding for " + this.getId() + ": " + checkpointingFrequency + " and " + eventAdviceList.size());
-        if (checkpointingFrequency > 0 && eventAdviceList.size() > 0) {
-            eventAdviceList.add(new EventAdvice(this.getId() + "_checkpointing", "key"));
-        }
-    }
-
-    public void processEvent(RecoveryEvent recoveryEvent) {
-        isCheckpointingEvent = true;
-        recover();
-    }
-    
-    /**
      * This method expires the current PE.
      **/
     protected void expire() {
@@ -751,7 +709,7 @@ public abstract class AbstractPE implements Cloneable {
                         } else if (PeriodicInvokerType.CHECKPOINTING.equals(type)) {
                             try {
                                 if (pe.isCheckpointable()) {
-                                    pe.initiateCheckpoint();
+                                    pe.checkpoint();
                                     checkpointCount++;
                                 }
                             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/MetricsName.java b/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
index 2fae80d..ce845a4 100644
--- a/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
+++ b/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
@@ -34,23 +34,25 @@ public enum MetricsName {
     pecontainer_exec_elapse_time("pec_exec_t"), low_level_emitter_msg_out_ct(
             "lle_out"), low_level_emitter_out_err_ct("lle_err"), low_level_emitter_qsz(
             "lle_qsz"), s4_core_exit_ct("s4_ex_ct"), s4_core_free_mem("s4_fmem"), pe_join_ev_ct(
-            "pe_j_ct"), pe_error_count("pe_err"), checkpointing_dropped_from_serialization_queue("cp_ser_dr"), 
-            checkpointing_dropped_from_storage_queue("cp_sto_dr"), 
-            checkpointing_added_to_serialization_queue("cp_ser_in"), 
-            checkpointing_added_to_storage_queue("cp_sto_in");
+            "pe_j_ct"), pe_error_count("pe_err"), checkpointing_dropped_from_serialization_queue("cp_ser_dr"),
+            checkpointing_dropped_from_storage_queue("cp_sto_dr"),
+            checkpointing_added_to_serialization_queue("cp_ser_in"),
+            checkpointing_added_to_storage_queue("cp_sto_in"),
+            checkpointing_fetching_failed("cp_fet_err"),
+            checkpointing_fetching_success("cp_fet_ok");
 
-	private final String eventShortName;
+    private final String eventShortName;
 
-	private MetricsName(String eventShortName) {
-		this.eventShortName = eventShortName;
-	}
+    private MetricsName(String eventShortName) {
+        this.eventShortName = eventShortName;
+    }
 
-	public String toString() {
-		return eventShortName;
-	}
+    public String toString() {
+        return eventShortName;
+    }
 
-	public static void main(String[] args) {
-		System.out.println(generic_listener_msg_in_ct.toString());
+    public static void main(String[] args) {
+        System.out.println(generic_listener_msg_in_ct.toString());
 
-	}
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java b/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
index 0265ba0..e1e630f 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
@@ -5,6 +5,7 @@ import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 import org.junit.After;
@@ -83,14 +84,17 @@ public class RecoveryTest extends S4TestCase {
         gen.injectValueEvent(new KeyValue("value1", "message1b"), "Stream1", 0);
         signalValue1Set.await(10, TimeUnit.SECONDS);
         Assert.assertEquals("value1=message1b ; value2=",
-                TestUtils.readFile(StatefulTestPE.DATA_FILE));
+                new String(zk.getData(StatefulTestPE.STATEFUL_TEST_PE_DATA_ZNODE, null, null)));
 
         Thread.sleep(2000);
         // kill app
         forkedS4App.destroy();
         // S4App.killS4App(getClass().getName());
 
-        StatefulTestPE.DATA_FILE.delete();
+        try {
+			zk.delete(StatefulTestPE.STATEFUL_TEST_PE_DATA_ZNODE, -1);
+		} catch (Exception ignored) {
+		}
 
         forkedS4App = TestUtils.forkS4App(getClass().getName(),
                 "s4_core_conf_fs_backend.xml");
@@ -106,7 +110,7 @@ public class RecoveryTest extends S4TestCase {
         // we should get "message1" (checkpointed) instead of "message1b"
         // (latest)
         Assert.assertEquals("value1=message1 ; value2=message2",
-                TestUtils.readFile(StatefulTestPE.DATA_FILE));
+        		new String(zk.getData(StatefulTestPE.STATEFUL_TEST_PE_DATA_ZNODE, null, null)));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java b/s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java
index cfb9132..a0b5d83 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java
@@ -1,14 +1,10 @@
 package org.apache.s4.ft;
 
-import org.apache.s4.processor.AbstractPE;
-
-import java.io.File;
-import java.io.FileDescriptor;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.PrintStream;
 
+import org.apache.s4.processor.AbstractPE;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -16,121 +12,124 @@ import org.apache.zookeeper.ZooKeeper;
 
 public class StatefulTestPE extends AbstractPE implements Watcher {
 
-    String id;
-    String value1 = "";
-    String value2 = "";
-    transient ZooKeeper zk = null;
-    transient public static File DATA_FILE = new File(
-            System.getProperty("user.dir")
-            + File.separator + "tmp" + File.separator + "StatefulTestPE.data");;
-
-    @Override
-    public String getId() {
-        return id;
-    }
-
-    @Override
-    public void output() {
-        // TODO Auto-generated method stub
-
-    }
-
-    public void processEvent(KeyValue event) {
-        if (zk == null) {
-            try {
-                zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-        if (!S4TestCase.registeredPEs.containsKey(getSafeKeeperId())) {
-            S4TestCase.registeredPEs.put(getSafeKeeperId(), this);
-        }
-        try {
-
-            if ("value1".equals(event.getKey())) {
-                setValue1(event.getValue());
-                zk.create("/value1Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
-                        CreateMode.PERSISTENT);
-            } else if ("value2".equals(event.getKey())) {
-                setValue2(event.getValue());
-                zk.create("/value2Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
-                        CreateMode.PERSISTENT);
-            } else if ("initiateCheckpoint".equals(event.getKey())) {
-                initiateCheckpoint();
-            } else {
-                throw new RuntimeException("unidentified event: " + event);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    public String getValue1() {
-        return value1;
-    }
-
-    public void setValue1(String value1) {
-        this.value1 = value1;
-        persistValues();
-    }
-
-    public String getValue2() {
-        return value2;
-    }
-
-    public void setValue2(String value2) {
-        this.value2 = value2;
-        persistValues();
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    protected void checkpoint() {
-        super.checkpoint();
-        try {
-            zk.create("/checkpointed", new byte[0], Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    // NOTE: we use a file as a simple way to keep track of changes
-    private void persistValues() {
-
-        if (DATA_FILE.exists()) {
-            if (!DATA_FILE.delete()) {
-                throw new RuntimeException("Cannot delete datafile "
-                        + DATA_FILE.getAbsolutePath());
-            }
-        }
-        try {
-            if (!DATA_FILE.createNewFile()) {
-                throw new RuntimeException("Cannot create datafile "
-                        + DATA_FILE.getAbsolutePath());
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("Cannot create datafile "
-                    + DATA_FILE.getAbsolutePath());
-        }
-        try {
-            TestUtils.writeStringToFile("value1=" + value1 + " ; value2=" + value2,
-                    DATA_FILE);
-        } catch (IOException e) {
-            throw new RuntimeException("Cannot write to datafile "
-                    + DATA_FILE.getAbsolutePath());
-        }
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        // TODO Auto-generated method stub
-
-    }
+	public static final String STATEFUL_TEST_PE_DATA_ZNODE = "/statefulTestPE.data";
+	String id;
+	String value1 = "";
+	String value2 = "";
+	transient ZooKeeper zk = null;
+
+	// transient public static File DATA_FILE = new File(
+	// System.getProperty("user.dir")
+	// + File.separator + "tmp" + File.separator + "StatefulTestPE.data");;
+
+	@Override
+	public String getId() {
+		return id;
+	}
+
+	@Override
+	public void output() {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void processEvent(KeyValue event) {
+		if (zk == null) {
+			try {
+				zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+			} catch (IOException e) {
+				throw new RuntimeException(e);
+			}
+		}
+		if (!S4TestCase.registeredPEs.containsKey(getSafeKeeperId())) {
+			S4TestCase.registeredPEs.put(getSafeKeeperId(), this);
+		}
+		try {
+
+			if ("value1".equals(event.getKey())) {
+				setValue1(event.getValue());
+				zk.create("/value1Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
+						CreateMode.PERSISTENT);
+			} else if ("value2".equals(event.getKey())) {
+				setValue2(event.getValue());
+				zk.create("/value2Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
+						CreateMode.PERSISTENT);
+			} else if ("initiateCheckpoint".equals(event.getKey())) {
+				checkpoint();
+			} else {
+				throw new RuntimeException("unidentified event: " + event);
+			}
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	public String getValue1() {
+		return value1;
+	}
+
+	public void setValue1(String value1) {
+		this.value1 = value1;
+		persistValues();
+	}
+
+	public String getValue2() {
+		return value2;
+	}
+
+	public void setValue2(String value2) {
+		this.value2 = value2;
+		persistValues();
+	}
+
+	public void setId(String id) {
+		this.id = id;
+	}
+
+	protected void checkpoint() {
+		super.checkpoint();
+		try {
+			zk.create("/checkpointed", new byte[0], Ids.OPEN_ACL_UNSAFE,
+					CreateMode.PERSISTENT);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	// NOTE: we use a file as a simple way to keep track of changes
+	private void persistValues() {
+		if (zk == null) {
+			try {
+				zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+			} catch (IOException e) {
+				throw new RuntimeException(e);
+			}
+		}
+		try {
+			try {
+				zk.delete(STATEFUL_TEST_PE_DATA_ZNODE, -1);
+			} catch (KeeperException e) {
+				if (e instanceof KeeperException.NoNodeException) {
+					// ignore
+				} else {
+					throw new RuntimeException(e);
+				}
+			}
+			zk.create(STATEFUL_TEST_PE_DATA_ZNODE, new String("value1="
+					+ value1 + " ; value2=" + value2).getBytes(),
+					Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	@Override
+	public void process(WatchedEvent event) {
+		// TODO Auto-generated method stub
+
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java b/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
index 84af34f..83c20a1 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
@@ -180,9 +180,9 @@ public class TestUtils {
             Assert.assertTrue("waiting for server down",
                     waitForServerDown("localhost", ZK_PORT, 3000));
         }
-        
-        
-        
+
+
+
         // List<String> cmdList = new ArrayList<String>();
         // cmdList.add(System.getProperty("user.dir")
         // + "/src/test/scripts/killJavaProcessForPort.sh");
@@ -297,7 +297,7 @@ public class TestUtils {
             }
         });
     }
-    
+
     public static void watchAndSignalChangedChildren(String path,
             final CountDownLatch latch, final ZooKeeper zk)
             throws KeeperException, InterruptedException {
@@ -393,7 +393,7 @@ public class TestUtils {
             deleteDirectoryContents(S4TestCase.DEFAULT_TEST_OUTPUT_DIR);
         }
         S4TestCase.DEFAULT_STORAGE_DIR.mkdirs();
-    
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/test/java/org/apache/s4/ft/rectimeout/BrokenStorage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/rectimeout/BrokenStorage.java b/s4-core/src/test/java/org/apache/s4/ft/rectimeout/BrokenStorage.java
new file mode 100644
index 0000000..ca22a38
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/rectimeout/BrokenStorage.java
@@ -0,0 +1,53 @@
+package org.apache.s4.ft.rectimeout;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.s4.ft.DefaultFileSystemStateStorage;
+import org.apache.s4.ft.SafeKeeperId;
+import org.apache.s4.ft.StateStorage;
+import org.apache.s4.ft.StorageCallback;
+import org.apache.s4.ft.TestUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper;
+
+// triggered by ZK
+public class BrokenStorage implements StateStorage {
+
+	DefaultFileSystemStateStorage storage = new DefaultFileSystemStateStorage();
+
+	CountDownLatch signalFetchable = new CountDownLatch(1);
+
+	public BrokenStorage() {
+	}
+
+	@Override
+	public void saveState(SafeKeeperId key, byte[] state,
+			StorageCallback callback) {
+		storage.saveState(key, state, callback);
+	}
+
+	@Override
+	public byte[] fetchState(SafeKeeperId key) {
+		try {
+			Thread.sleep(10000);
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		throw new RuntimeException("fetching failed");
+	}
+
+	@Override
+	public Set<SafeKeeperId> fetchStoredKeys() {
+		return storage.fetchStoredKeys();
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/test/java/org/apache/s4/ft/rectimeout/RecoveryTimeoutTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/rectimeout/RecoveryTimeoutTest.java b/s4-core/src/test/java/org/apache/s4/ft/rectimeout/RecoveryTimeoutTest.java
new file mode 100644
index 0000000..7621f16
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/rectimeout/RecoveryTimeoutTest.java
@@ -0,0 +1,130 @@
+package org.apache.s4.ft.rectimeout;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.ft.EventGenerator;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.ft.S4TestCase;
+import org.apache.s4.ft.StatefulTestPE;
+import org.apache.s4.ft.TestUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.json.JSONException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RecoveryTimeoutTest extends S4TestCase {
+
+	public static long ZOOKEEPER_PORT = 21810;
+    private Process forkedS4App = null;
+    private static Factory zookeeperServerConnectionFactory = null;
+	private CountDownLatch signalValue2Set;
+
+    @Before
+    public void prepare() throws Exception {
+        TestUtils.cleanupTmpDirs();
+        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+        final ZooKeeper zk = TestUtils.createZkClient();
+        try {
+            zk.delete("/value1Set", -1);
+        } catch (Exception ignored) {
+        }
+        try {
+            // FIXME can't figure out where this is retained
+            zk.delete("/value2Set", -1);
+        } catch (Exception ignored) {
+        }
+        try {
+            // FIXME can't figure out where this is retained
+            zk.delete("/checkpointed", -1);
+        } catch (Exception ignored) {
+        }
+        zk.close();
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+        TestUtils.killS4App(forkedS4App);
+    }
+
+    @Test
+    public void testRecoveryTimeout()
+            throws Exception {
+        checkpointAndRecover();
+        signalValue2Set.await(10, TimeUnit.SECONDS);
+
+        // we should NOT get value1 since the checkpoint is not recovered"
+        ZooKeeper zk = TestUtils.createZkClient();
+        Assert.assertEquals("value1= ; value2=message2",
+        		new String(zk.getData(StatefulTestPE.STATEFUL_TEST_PE_DATA_ZNODE, null, null)));
+
+    }
+
+	private void checkpointAndRecover() throws IOException,
+			InterruptedException, KeeperException, JSONException {
+		ZooKeeper zk = TestUtils.createZkClient();
+        // 1. instantiate remote S4 app
+        forkedS4App = TestUtils.forkS4App(getClass().getName(),
+                "s4_core_conf_broken_backend.xml");
+        // TODO synchro
+        Thread.sleep(5000);
+
+        CountDownLatch signalValue1Set = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
+
+        // 2. generate a simple event that changes the state of the PE
+        // --> this event triggers recovery
+        // we inject a value for value2 field (was for value1 in
+        // checkpointing
+        // test). This should trigger recovery and provide a pe with value1
+        // and
+        // value2 set:
+        // value1 from recovery, and value2 from injected event.
+        EventGenerator gen = new EventGenerator();
+        gen.injectValueEvent(new KeyValue("value1", "message1"), "Stream1", 0);
+        signalValue1Set.await();
+        final CountDownLatch signalCheckpointed = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed,
+                zk);
+        // trigger checkpoint
+        gen.injectValueEvent(new KeyValue("initiateCheckpoint", "blah"),
+                "Stream1", 0);
+        signalCheckpointed.await(10, TimeUnit.SECONDS);
+        // signalCheckpointAddedByBK.await();
+
+        signalValue1Set = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
+        gen.injectValueEvent(new KeyValue("value1", "message1b"), "Stream1", 0);
+        signalValue1Set.await(10, TimeUnit.SECONDS);
+        Assert.assertEquals("value1=message1b ; value2=",
+        		new String(zk.getData(StatefulTestPE.STATEFUL_TEST_PE_DATA_ZNODE, null, null)));
+
+        Thread.sleep(2000);
+        // kill app
+        forkedS4App.destroy();
+        // S4App.killS4App(getClass().getName());
+
+
+        try {
+			zk.delete(StatefulTestPE.STATEFUL_TEST_PE_DATA_ZNODE, -1);
+		} catch (Exception ignored) {
+		}
+
+        forkedS4App = TestUtils.forkS4App(getClass().getName(),
+                "s4_core_conf_broken_backend.xml");
+        // TODO synchro
+        Thread.sleep(2000);
+        signalValue2Set = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/value2Set", signalValue2Set, zk);
+
+        gen.injectValueEvent(new KeyValue("value2", "message2"), "Stream1", 0);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/test/java/org/apache/s4/ft/rectimeout/app_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/rectimeout/app_conf.xml b/s4-core/src/test/java/org/apache/s4/ft/rectimeout/app_conf.xml
new file mode 100644
index 0000000..84346e5
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/rectimeout/app_conf.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans             http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+  <!-- <bean id="printEventPE" class="org.apache.s4.processor.PrintEventPE">
+    <property name="id" value="printEventPE"/>
+    <property name="keys">
+      <list>
+        <value>TopicSeen topic</value>
+      </list>
+    </property>
+  </bean> -->
+
+  <bean id="statefulPE" class="org.apache.s4.ft.StatefulTestPE">
+    <property name="id" value="statefulPE"/>
+    <property name="keys">
+      <list>
+        <value>Stream1 key</value>
+      </list>
+    </property>
+    <!-- we set the frequency to 1000 so that it checkpointing does NOT get triggered automatically! -->
+    <property name="checkpointingFrequencyByEventCount" value="1000" />  
+  </bean>
+
+
+
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/test/java/org/apache/s4/ft/rectimeout/s4_core_conf_broken_backend.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/rectimeout/s4_core_conf_broken_backend.xml b/s4-core/src/test/java/org/apache/s4/ft/rectimeout/s4_core_conf_broken_backend.xml
new file mode 100755
index 0000000..20457ad
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/rectimeout/s4_core_conf_broken_backend.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+	<bean id="propertyConfigurer"
+		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+		<property name="location">
+			<value>classpath:s4_core.properties</value>
+		</property>
+		<property name="properties">
+			<props>
+				<prop key="kryoSerDeser.initialBufferSize">2048</prop>
+				<prop key="kryoSerDeser.maxBufferSize">262144</prop>
+			</props>
+		</property>
+		<property name="ignoreUnresolvablePlaceholders" value="true" />
+	</bean>
+
+	<bean id="hasher" class="org.apache.s4.dispatcher.partitioner.DefaultHasher" />
+
+	<bean id="commLayerEmitterToAdapter" class="org.apache.s4.emitter.CommLayerEmitter"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="listener" ref="rawListener" />
+		<property name="listenerAppName" value="${adapter_app_name}" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="commLayerEmitter" class="org.apache.s4.emitter.CommLayerEmitter"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="listener" ref="rawListener" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="serDeser" class="org.apache.s4.serialize.KryoSerDeser">
+		<property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+		<property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+	</bean>
+
+	<!--START: Dispatchers for control event processor. If stream name in Response
+		is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter).
+		Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+	<bean id="ctrlDispatcher" class="org.apache.s4.dispatcher.MultiDispatcher">
+		<property name="dispatchers">
+			<list>
+				<ref bean="ctrlDispatcherFilteredS4" />
+				<ref bean="ctrlDispatcherFilteredAdapter" />
+			</list>
+		</property>
+	</bean>
+
+	<bean id="ctrlDispatcherFilteredAdapter" class="org.apache.s4.dispatcher.StreamSelectingDispatcher">
+		<property name="dispatcher" ref="ctrlDispatcherAdapter" />
+		<property name="streams">
+			<list>
+				<value>@${adapter_app_name}</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="ctrlDispatcherFilteredS4" class="org.apache.s4.dispatcher.StreamExcludingDispatcher">
+		<property name="dispatcher" ref="ctrlDispatcherS4" />
+		<property name="streams">
+			<list>
+				<value>@${adapter_app_name}</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="genericPartitioner" class="org.apache.s4.dispatcher.partitioner.DefaultPartitioner">
+		<property name="hasher" ref="hasher" />
+		<property name="debug" value="false" />
+	</bean>
+
+	<bean id="ctrlDispatcherS4" class="org.apache.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="genericPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<bean id="ctrlDispatcherAdapter" class="org.apache.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="genericPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+	<!-- END: Dispatchers for control events -->
+
+	<!-- Control Events handler -->
+	<bean id="ctrlHandler" class="org.apache.s4.processor.ControlEventProcessor">
+		<property name="dispatcher" ref="ctrlDispatcher" />
+	</bean>
+
+	<bean id="peContainer" class="org.apache.s4.processor.PEContainer"
+		init-method="init" lazy-init="true">
+		<property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+		<property name="monitor" ref="monitor" />
+		<property name="trackByKey" value="true" />
+		<property name="clock" ref="clock" />
+		<property name="controlEventProcessor" ref="ctrlHandler" />
+		<property name="safeKeeper" ref="safeKeeper" />
+	</bean>
+
+	<bean id="rawListener" class="org.apache.s4.listener.CommLayerListener"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="clusterManagerAddress" value="${zk_address}" />
+		<property name="appName" value="${s4_app_name}" />
+		<property name="maxQueueSize" value="${listener_max_queue_size}" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="eventListener" class="org.apache.s4.collector.EventListener"
+		init-method="init">
+		<property name="rawListener" ref="rawListener" />
+		<property name="peContainer" ref="peContainer" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="monitor" class="org.apache.s4.logger.Log4jMonitor" lazy-init="true"
+		init-method="init">
+		<property name="flushInterval" value="30" />
+		<property name="loggerName" value="monitor" />
+	</bean>
+
+	<bean id="watcher" class="org.apache.s4.util.Watcher" init-method="init"
+		lazy-init="true">
+		<property name="monitor" ref="monitor" />
+		<property name="peContainer" ref="peContainer" />
+		<property name="minimumMemory" value="52428800" />
+	</bean>
+
+
+
+
+	<!-- Some useful beans related to client-adapter for apps -->
+
+	<!-- Dispatcher to send to all adapter nodes. -->
+	<bean id="dispatcherToClientAdapters" class="org.apache.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="broadcastPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<!-- Partitioner to achieve broadcast -->
+	<bean id="broadcastPartitioner" class="org.apache.s4.dispatcher.partitioner.BroadcastPartitioner" />
+
+
+
+	<bean id="loopbackDispatcher" class="org.apache.s4.dispatcher.Dispatcher"
+        init-method="init">
+        <property name="partitioners">
+            <list>
+                <ref bean="loopbackPartitioner" />
+            </list>
+        </property>
+        <property name="eventEmitter" ref="commLayerEmitter" />
+        <property name="loggerName" value="s4" />
+    </bean>
+
+    <bean id="loopbackPartitioner" class="org.apache.s4.dispatcher.partitioner.LoopbackPartitioner">
+        <property name="eventEmitter" ref="commLayerEmitter"/>
+    </bean>
+
+    <bean id="safeKeeper" class="org.apache.s4.ft.SafeKeeper" init-method="init">
+        <property name="stateStorage" ref="mockStateStorage" />
+        <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+        <property name="serializer" ref="serDeser"/>
+        <property name="hasher" ref="hasher"/>
+        <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
+    </bean>
+
+    <bean id="loggingStorageCallbackFactory" class="org.apache.s4.ft.LoggingStorageCallbackFactory"/>
+
+    <bean id="mockStateStorage" class="org.apache.s4.ft.rectimeout.BrokenStorage">
+        <!-- if not specified, default is <current_dir>/tmp/storage
+        <property name="storageRootPath" value="${storage_root_path}" /> -->
+    </bean>
+
+
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/test/java/org/apache/s4/ft/rectimeout/wall_clock.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/rectimeout/wall_clock.xml b/s4-core/src/test/java/org/apache/s4/ft/rectimeout/wall_clock.xml
new file mode 100644
index 0000000..cc571a6
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/rectimeout/wall_clock.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+ 
+  <bean id="clock" class="org.apache.s4.util.clock.WallClock"/>
+ 
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java b/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
index f3c0f55..9b4a7dd 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
@@ -5,6 +5,7 @@ import org.apache.s4.ft.KeyValue;
 import org.apache.s4.ft.S4TestCase;
 import org.apache.s4.ft.TestRedisStateStorage;
 import org.apache.s4.ft.TestUtils;
+import org.apache.s4.wordcount.WordClassifier;
 import org.apache.s4.wordcount.WordCountTest;
 
 import java.io.File;
@@ -22,23 +23,23 @@ import org.junit.Before;
 import org.junit.Test;
 
 /**
- * 
+ *
  * We use 2 lists of words that we inject in a word counting s4 system.
- * 
+ *
  * After processing the first sentence, we just kill the platform and restart
  * it.
- * 
+ *
  * Then we inject the second sentence.
- * 
- * 
+ *
+ *
  * We verify that no state was lost, i.e. that the words count includes words
  * from both the first and the second sentence.
- * 
+ *
  * NOTE 1: we synchronize through zookeeper to control when to kill the
  * application, and when to verify assertions. NOTE 2: we use some additional
  * explicit waits for bookkeeper backend so that it gets correctly initialized.
- * 
- * 
+ *
+ *
  */
 public class FTWordCountTest extends S4TestCase {
 
@@ -104,8 +105,8 @@ public class FTWordCountTest extends S4TestCase {
                 "Sentences", 0);
         signalSentence1Processed.await(10, TimeUnit.SECONDS);
         Thread.sleep(1000);
-        
-        
+
+
         // crash the app
         forkedS4App.destroy();
 
@@ -149,16 +150,8 @@ public class FTWordCountTest extends S4TestCase {
                 new KeyValue("sentence", WordCountTest.SENTENCE_3),
                 "Sentences", 0);
         signalTextProcessed.await(10, TimeUnit.SECONDS);
-        File results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
-                + File.separator + "wordcount");
-        if (!results.exists()) {
-        	// in case the results file isn't ready yet
-        	Thread.sleep(1000);
-        	results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
-                    + File.separator + "wordcount");
-        }
-        String s = TestUtils.readFile(results);
-        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+
+        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", new String(zk.getData(WordClassifier.WORD_COUNT_ZNODE, null, null)));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java b/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java
index 5b0558d..c3bd19d 100644
--- a/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java
@@ -2,6 +2,7 @@ package org.apache.s4.wordcount;
 
 import org.apache.s4.ft.KeyValue;
 import org.apache.s4.ft.S4TestCase;
+import org.apache.s4.ft.StatefulTestPE;
 import org.apache.s4.ft.TestUtils;
 import org.apache.s4.processor.AbstractPE;
 
@@ -28,6 +29,7 @@ public class WordClassifier extends AbstractPE implements Watcher {
     transient private ZooKeeper zk;
     private String id;
     public final static String ROUTING_KEY = "classifier";
+	public static final String WORD_COUNT_ZNODE = "/wordcount";
 
     public void setId(String id) {
         this.id = id;
@@ -59,20 +61,17 @@ public class WordClassifier extends AbstractPE implements Watcher {
         }
         ++counter;
         if (counter == WordCountTest.TOTAL_WORDS) {
-            File results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
-                    + File.separator + "wordcount");
-            if (results.exists()) {
-                if (!results.delete()) {
-                    throw new RuntimeException("cannot delete results file");
-                }
-            }
+            try {
+    			zk.delete(WordClassifier.WORD_COUNT_ZNODE, -1);
+    		} catch (Exception ignored) {
+    		}
             Set<Entry<String, Integer>> entrySet = counts.entrySet();
             StringBuilder sb = new StringBuilder();
             for (Entry<String, Integer> entry : entrySet) {
                 sb.append(entry.getKey() + "=" + entry.getValue() + ";");
             }
-            TestUtils.writeStringToFile(sb.toString(), results);
-
+            zk.create(WordClassifier.WORD_COUNT_ZNODE, sb.toString().getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
             zk.create("/textProcessed", new byte[0], Ids.OPEN_ACL_UNSAFE,
                     CreateMode.PERSISTENT);
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b6a90d0a/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 5a511d7..8d95a22 100644
--- a/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -34,7 +34,7 @@ public class WordCountTest extends S4TestCase {
             + SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
     private static Factory zookeeperServerConnectionFactory;
 
-    
+
     @Before
     public void prepare() throws IOException, InterruptedException, KeeperException {
         TestUtils.cleanupTmpDirs();
@@ -44,7 +44,7 @@ public class WordCountTest extends S4TestCase {
 
     @Test
     public void testSimple() throws Exception {
-        
+
         S4App app = new S4App(getClass(), "s4_core_conf.xml");
         app.initializeS4App();
         final ZooKeeper zk = TestUtils.createZkClient();
@@ -53,7 +53,7 @@ public class WordCountTest extends S4TestCase {
         TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
                 zk);
         EventGenerator gen = new EventGenerator();
-        
+
         // add authorizations for processing
         for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS
                 + 1; i++) {
@@ -67,11 +67,8 @@ public class WordCountTest extends S4TestCase {
         gen.injectValueEvent(new KeyValue("sentence", SENTENCE_3), "Sentences",
                 0);
         signalTextProcessed.await();
-        File results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
-                + File.separator + "wordcount");
-        String s = TestUtils.readFile(results);
-        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
-        
+        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", new String(zk.getData(WordClassifier.WORD_COUNT_ZNODE, null, null)));
+
     }
 
     @After


Mime
View raw message