beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xi...@apache.org
Subject [beam] branch master updated: [BEAM-11458] Upgrade SamzRunner to Samza 1.5 (#13550)
Date Fri, 18 Dec 2020 00:14:01 GMT
This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3663e03  [BEAM-11458] Upgrade SamzRunner to Samza 1.5 (#13550)
3663e03 is described below

commit 3663e03644f44966ecd873c41f60186565893657
Author: Ke Wu <kwu@linkedin.com>
AuthorDate: Thu Dec 17 16:13:26 2020 -0800

    [BEAM-11458] Upgrade SamzRunner to Samza 1.5 (#13550)
---
 runners/samza/build.gradle                         |  17 +-
 .../beam/runners/samza/SamzaJobInvocation.java     | 114 +++++
 .../beam/runners/samza/SamzaJobServerDriver.java   |  60 ++-
 .../beam/runners/samza/SamzaPipelineOptions.java   |  22 +-
 .../samza/SamzaPipelineOptionsValidator.java       |  38 +-
 .../samza/SamzaPortablePipelineOptions.java        |  13 +
 .../runners/samza/SamzaRunnerOverrideConfigs.java  |  15 +
 .../samza/adapter/UnboundedSourceSystem.java       |  39 +-
 .../samza/container/BeamContainerRunner.java       |   6 +-
 .../samza/container/BeamJobCoordinatorRunner.java  |  78 ++++
 ...inerCfgFactory.java => ContainerCfgLoader.java} |  18 +-
 .../ContainerCfgLoaderFactory.java}                |  20 +-
 .../beam/runners/samza/runtime/BundleManager.java  | 349 +++++++++++++++
 .../apache/beam/runners/samza/runtime/DoFnOp.java  | 277 +++++++-----
 .../runners/samza/runtime/FutureCollector.java     |  60 +++
 .../beam/runners/samza/runtime/GroupByKeyOp.java   |  11 +-
 .../beam/runners/samza/runtime/KeyedTimerData.java |   1 +
 .../beam/runners/samza/runtime/OpAdapter.java      |  48 ++-
 .../beam/runners/samza/runtime/OpEmitter.java      |   5 +
 .../samza/runtime/OutputManagerFactory.java        |   5 +
 .../samza/runtime/SamzaStoreStateInternals.java    | 230 ++++++----
 .../samza/runtime/SamzaTimerInternalsFactory.java  | 217 ++++++++--
 .../runners/samza/translation/ConfigBuilder.java   |  77 ++--
 .../runners/samza/translation/ConfigContext.java   |   8 +
 .../translation/FlattenPCollectionsTranslator.java |   2 +-
 .../samza/translation/GroupByKeyTranslator.java    |  24 +-
 .../translation/ParDoBoundMultiTranslator.java     |  56 ++-
 .../samza/translation/SamzaPipelineTranslator.java |  21 +-
 .../translation/SamzaTestStreamSystemFactory.java  | 179 ++++++++
 .../translation/SamzaTestStreamTranslator.java     | 100 +++++
 .../translation/SplittableParDoTranslators.java    |   6 +-
 .../samza/translation/TranslationContext.java      |  59 ++-
 .../samza/translation/WindowAssignTranslator.java  |   4 +-
 .../beam/runners/samza/util/FutureUtils.java       |  50 +++
 .../samza/SamzaPipelineOptionsValidatorTest.java   |  60 +++
 .../samza/adapter/UnboundedSourceSystemTest.java   |  28 ++
 .../runners/samza/runtime/BundleManagerTest.java   | 474 +++++++++++++++++++++
 .../samza/runtime/FutureCollectorImplTest.java     |  92 ++++
 .../runners/samza/runtime/KeyedTimerDataTest.java  |  14 +-
 .../runtime/SamzaStoreStateInternalsTest.java      |  38 +-
 .../runtime/SamzaTimerInternalsFactoryTest.java    | 416 +++++++++++++++++-
 .../samza/translation/ConfigGeneratorTest.java     |  86 +++-
 .../samza/translation/TranslationContextTest.java  |  94 ++++
 .../beam/runners/samza/util/FutureUtilsTest.java   | 107 +++++
 .../site/content/en/documentation/runners/samza.md |   5 +
 45 files changed, 3228 insertions(+), 415 deletions(-)

diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle
index 3d02d9d..8a14b24 100644
--- a/runners/samza/build.gradle
+++ b/runners/samza/build.gradle
@@ -40,7 +40,7 @@ configurations {
   validatesRunner
 }
 
-def samza_version = "1.3.0"
+def samza_version = "1.5.0"
 
 dependencies {
   compile library.java.vendored_guava_26_0_jre
@@ -53,7 +53,13 @@ dependencies {
   compile library.java.slf4j_api
   compile library.java.joda_time
   compile library.java.commons_io
+  compile library.java.commons_lang3
   compile library.java.args4j
+  compile "javax.servlet:javax.servlet-api:3.1.0"
+  compile "io.dropwizard.metrics:metrics-core:3.1.2"
+  compile "org.rocksdb:rocksdbjni:5.7.3"
+  compile "org.apache.commons:commons-collections4:4.0"
+  compile "org.scala-lang:scala-library:2.11.8"
   compile "org.apache.samza:samza-api:$samza_version"
   compile "org.apache.samza:samza-core_2.11:$samza_version"
   compile "org.apache.samza:samza-kafka_2.11:$samza_version"
@@ -87,11 +93,10 @@ task validatesRunner(type: Test) {
     excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
     excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
     excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
-    excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+    excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
     excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher'
     excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
     excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
-    excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
     excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
     excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
     excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
@@ -99,6 +104,8 @@ task validatesRunner(type: Test) {
   filter {
     // TODO(BEAM-10025)
     excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestampDefaultUnbounded'
+    // TODO(BEAM-11479)
+    excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestamp'
 
     // These tests fail since there is no support for side inputs in Samza's unbounded splittable DoFn integration
     excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testWindowedSideInputWithCheckpointsUnbounded'
@@ -110,6 +117,10 @@ task validatesRunner(type: Test) {
     excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestampedUnbounded'
     excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testOutputAfterCheckpointUnbounded'
   }
+  filter {
+    // Re-enable the test after Samza runner supports same state id across DoFn(s).
+    excludeTest('ParDoTest$StateTests', 'testValueStateSameId')
+  }
 }
 
 // Generates :runners:samza:runQuickstartJavaSamza
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java
new file mode 100644
index 0000000..acaea62
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED;
+
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.jobsubmission.JobInvocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Invocation of a Samza job via {@link SamzaRunner}. */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class SamzaJobInvocation extends JobInvocation {
+  private static final Logger LOG = LoggerFactory.getLogger(SamzaJobInvocation.class);
+
+  private final SamzaPipelineOptions options;
+  private final RunnerApi.Pipeline originalPipeline;
+  private volatile SamzaPipelineResult pipelineResult;
+
+  public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions options) {
+    super(null, null, pipeline, null);
+    this.originalPipeline = pipeline;
+    this.options = options;
+  }
+
+  private SamzaPipelineResult invokeSamzaJob() {
+    // Fused pipeline proto.
+    final RunnerApi.Pipeline fusedPipeline =
+        GreedyPipelineFuser.fuse(originalPipeline).toPipeline();
+    // the pipeline option coming from sdk will set the sdk specific runner which will break
+    // serialization
+    // so we need to reset the runner here to a valid Java runner
+    options.setRunner(SamzaRunner.class);
+    try {
+      final SamzaRunner runner = SamzaRunner.fromOptions(options);
+      return (SamzaPortablePipelineResult) runner.runPortablePipeline(fusedPipeline);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to invoke samza job", e);
+    }
+  }
+
+  @Override
+  public synchronized void start() {
+    LOG.info("Starting job invocation {}", getId());
+    pipelineResult = invokeSamzaJob();
+  }
+
+  @Override
+  public String getId() {
+    return options.getJobName();
+  }
+
+  @Override
+  public synchronized void cancel() {
+    try {
+      if (pipelineResult != null) {
+        LOG.info("Cancelling pipeline {}", getId());
+        pipelineResult.cancel();
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to cancel job.", e);
+    }
+  }
+
+  @Override
+  public JobApi.JobState.Enum getState() {
+    if (pipelineResult == null) {
+      return STARTING;
+    }
+    switch (pipelineResult.getState()) {
+      case RUNNING:
+        return RUNNING;
+      case FAILED:
+        return FAILED;
+      case DONE:
+        return DONE;
+      case STOPPED:
+        return STOPPED;
+      case UPDATED:
+        return UPDATED;
+      case CANCELLED:
+        return CANCELLED;
+      default:
+        return UNRECOGNIZED;
+    }
+  }
+}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
index c3d1603..6895754 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
@@ -18,17 +18,18 @@
 package org.apache.beam.runners.samza;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.UUID;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.ServerFactory;
-import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.jobsubmission.InMemoryJobService;
 import org.apache.beam.runners.jobsubmission.JobInvocation;
 import org.apache.beam.runners.jobsubmission.JobInvoker;
+import org.apache.beam.sdk.expansion.service.ExpansionServer;
+import org.apache.beam.sdk.expansion.service.ExpansionService;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
@@ -37,7 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Driver program that starts a job server. */
-// TODO extend JobServerDriver
+// TODO(BEAM-8510): extend JobServerDriver
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
@@ -46,7 +47,7 @@ public class SamzaJobServerDriver {
 
   private final SamzaPortablePipelineOptions pipelineOptions;
 
-  private SamzaJobServerDriver(SamzaPortablePipelineOptions pipelineOptions) {
+  protected SamzaJobServerDriver(SamzaPortablePipelineOptions pipelineOptions) {
     this.pipelineOptions = pipelineOptions;
   }
 
@@ -65,12 +66,13 @@ public class SamzaJobServerDriver {
     overrideConfig.put(
         SamzaRunnerOverrideConfigs.FN_CONTROL_PORT,
         String.valueOf(pipelineOptions.getControlPort()));
+    overrideConfig.put(SamzaRunnerOverrideConfigs.FS_TOKEN_PATH, pipelineOptions.getFsTokenPath());
+
     pipelineOptions.setConfigOverride(overrideConfig);
     return new SamzaJobServerDriver(pipelineOptions);
   }
 
-  private static InMemoryJobService createJobService(SamzaPortablePipelineOptions pipelineOptions)
-      throws IOException {
+  private InMemoryJobService createJobService() throws IOException {
     JobInvoker jobInvoker =
         new JobInvoker("samza-job-invoker") {
           @Override
@@ -80,16 +82,7 @@ public class SamzaJobServerDriver {
               @Nullable String retrievalToken,
               ListeningExecutorService executorService)
               throws IOException {
-            String invocationId =
-                String.format("%s_%s", pipelineOptions.getJobName(), UUID.randomUUID().toString());
-            SamzaPipelineRunner pipelineRunner = new SamzaPipelineRunner(pipelineOptions);
-            JobInfo jobInfo =
-                JobInfo.create(
-                    invocationId,
-                    pipelineOptions.getJobName(),
-                    retrievalToken,
-                    PipelineOptionsTranslation.toProto(pipelineOptions));
-            return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner);
+            return new SamzaJobInvocation(pipeline, pipelineOptions);
           }
         };
     return InMemoryJobService.create(
@@ -100,17 +93,46 @@ public class SamzaJobServerDriver {
         InMemoryJobService.DEFAULT_MAX_INVOCATION_HISTORY);
   }
 
+  private ExpansionServer createExpansionService(String host, int expansionPort)
+      throws IOException {
+    if (host == null) {
+      host = InetAddress.getLoopbackAddress().getHostName();
+    }
+    ExpansionServer expansionServer =
+        ExpansionServer.create(new ExpansionService(), host, expansionPort);
+    LOG.info(
+        "Java ExpansionService started on {}:{}",
+        expansionServer.getHost(),
+        expansionServer.getPort());
+    return expansionServer;
+  }
+
   public void run() throws Exception {
-    final InMemoryJobService service = createJobService(pipelineOptions);
+    // Create services
+    final InMemoryJobService service = createJobService();
     final GrpcFnServer<InMemoryJobService> jobServiceGrpcFnServer =
         GrpcFnServer.allocatePortAndCreateFor(
             service, ServerFactory.createWithPortSupplier(pipelineOptions::getJobPort));
-    LOG.info("JobServer started on {}", jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
+    final String jobServerUrl = jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl();
+    LOG.info("JobServer started on {}", jobServerUrl);
+    final URI uri = new URI(jobServerUrl);
+    final ExpansionServer expansionServer =
+        createExpansionService(uri.getHost(), pipelineOptions.getExpansionPort());
+
     try {
       jobServiceGrpcFnServer.getServer().awaitTermination();
     } finally {
       LOG.info("JobServer closing");
       jobServiceGrpcFnServer.close();
+      if (expansionServer != null) {
+        try {
+          expansionServer.close();
+          LOG.info(
+              "Expansion stopped on {}:{}", expansionServer.getHost(), expansionServer.getPort());
+        } catch (Exception e) {
+          LOG.error("Error while closing the Expansion Service.", e);
+        }
+      }
     }
   }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
index 3ff64e3..c0af1fa 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
@@ -23,8 +23,8 @@ import java.util.Map;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.samza.config.ConfigFactory;
-import org.apache.samza.config.factories.PropertiesConfigFactory;
+import org.apache.samza.config.ConfigLoaderFactory;
+import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
 import org.apache.samza.metrics.MetricsReporter;
 
 /** Options which can be used to configure a Samza PortablePipelineRunner. */
@@ -38,10 +38,10 @@ public interface SamzaPipelineOptions extends PipelineOptions {
   void setConfigFilePath(String filePath);
 
   @Description("The factory to read config file from config file path.")
-  @Default.Class(PropertiesConfigFactory.class)
-  Class<? extends ConfigFactory> getConfigFactory();
+  @Default.Class(PropertiesConfigLoaderFactory.class)
+  Class<? extends ConfigLoaderFactory> getConfigLoaderFactory();
 
-  void setConfigFactory(Class<? extends ConfigFactory> configFactory);
+  void setConfigLoaderFactory(Class<? extends ConfigLoaderFactory> configLoaderFactory);
 
   @Description(
       "The config override to set programmatically. It will be applied on "
@@ -76,6 +76,18 @@ public interface SamzaPipelineOptions extends PipelineOptions {
 
   void setSystemBufferSize(int consumerBufferSize);
 
+  @Description("The maximum number of event-time timers to buffer in memory for a PTransform")
+  @Default.Integer(50000)
+  int getEventTimerBufferSize();
+
+  void setEventTimerBufferSize(int eventTimerBufferSize);
+
+  @Description("The maximum number of ready timers to process at once per watermark.")
+  @Default.Integer(Integer.MAX_VALUE)
+  int getMaxReadyTimersToProcessOnce();
+
+  void setMaxReadyTimersToProcessOnce(int maxReadyTimersToProcessOnce);
+
   @Description("The maximum parallelism allowed for any data source.")
   @Default.Integer(1)
   int getMaxSourceParallelism();
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidator.java
index 591c0ee..7702b6b 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidator.java
@@ -18,11 +18,12 @@
 package org.apache.beam.runners.samza;
 
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
-import static org.apache.samza.config.TaskConfig.MAX_CONCURRENCY;
+import static org.apache.samza.config.JobConfig.JOB_CONTAINER_THREAD_POOL_SIZE;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
 
 /** Validates that the {@link SamzaPipelineOptions} conforms to all the criteria. */
 public class SamzaPipelineOptionsValidator {
@@ -32,30 +33,25 @@ public class SamzaPipelineOptionsValidator {
   }
 
   /*
-   * Perform some bundling related validation for pipeline option .
+   * Perform some bundling related validation for pipeline option.
+   * Visible for testing.
    */
-  private static void validateBundlingRelatedOptions(SamzaPipelineOptions pipelineOptions) {
+  static void validateBundlingRelatedOptions(SamzaPipelineOptions pipelineOptions) {
     if (pipelineOptions.getMaxBundleSize() > 1) {
-      // TODO: remove this check and implement bundling for side input, timer, etc in DoFnOp.java
-      checkState(
-          isPortable(pipelineOptions),
-          "Bundling is not supported in non portable mode. Please disable by setting maxBundleSize to 1.");
-
-      String taskConcurrencyConfig = MAX_CONCURRENCY;
-      Map<String, String> configs =
+      final Map<String, String> configs =
           pipelineOptions.getConfigOverride() == null
               ? new HashMap<>()
               : pipelineOptions.getConfigOverride();
-      long taskConcurrency = Long.parseLong(configs.getOrDefault(taskConcurrencyConfig, "1"));
-      checkState(
-          taskConcurrency == 1,
-          "Bundling is not supported if "
-              + taskConcurrencyConfig
-              + " is greater than 1. Please disable bundling by setting maxBundleSize to 1. Or disable task concurrency.");
-    }
-  }
+      final JobConfig jobConfig = new JobConfig(new MapConfig(configs));
 
-  private static boolean isPortable(SamzaPipelineOptions options) {
-    return options instanceof SamzaPortablePipelineOptions;
+      // TODO: once Samza supports a better thread pool modle, e.g. thread
+      // per-task/key-range, this can be supported.
+      checkArgument(
+          jobConfig.getThreadPoolSize() <= 1,
+          JOB_CONTAINER_THREAD_POOL_SIZE
+              + " cannot be configured to"
+              + " greater than 1 for max bundle size: "
+              + pipelineOptions.getMaxBundleSize());
+    }
   }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java
index 661c1a5..5a82606 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java
@@ -33,4 +33,17 @@ public interface SamzaPortablePipelineOptions extends SamzaPipelineOptions {
   int getControlPort();
 
   void setControlPort(int port);
+
+  @Description("The expansion service port. (Default: 11442) ")
+  @Default.Integer(11442)
+  int getExpansionPort();
+
+  void setExpansionPort(int port);
+
+  @Description(
+      "The file path for the local file system token. If not set (by default), then the runner would"
+          + " not use secure server factory.")
+  String getFsTokenPath();
+
+  void setFsTokenPath(String path);
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunnerOverrideConfigs.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunnerOverrideConfigs.java
index e31fea9..3546a1c 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunnerOverrideConfigs.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunnerOverrideConfigs.java
@@ -19,6 +19,10 @@ package org.apache.beam.runners.samza;
 
 import java.time.Duration;
 
+// TODO: can we get rid of this class? Right now the SamzaPipelineOptionsValidator would force
+// the pipeline option to be the type SamzaPipelineOption. Ideally, we should be able to keep
+// passing SamzaPortablePipelineOption. Alternative, we could merge portable and non-portable
+// pipeline option.
 /** A helper class for holding all the beam runner specific samza configs. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
@@ -33,6 +37,8 @@ public class SamzaRunnerOverrideConfigs {
   public static final String CONTROL_CLIENT_MAX_WAIT_TIME_MS = "controL.wait.time.ms";
   public static final long DEFAULT_CONTROL_CLIENT_MAX_WAIT_TIME_MS =
       Duration.ofMinutes(2).toMillis();
+  public static final String FS_TOKEN_PATH = BEAM_RUNNER_CONFIG_PREFIX + "fs.token.path";
+  public static final String DEFAULT_FS_TOKEN_PATH = null;
 
   private static boolean containsKey(SamzaPipelineOptions options, String configKey) {
     if (options == null || options.getConfigOverride() == null) {
@@ -67,4 +73,13 @@ public class SamzaRunnerOverrideConfigs {
       return DEFAULT_CONTROL_CLIENT_MAX_WAIT_TIME_MS;
     }
   }
+
+  /** Get fs token path for portable mode. */
+  public static String getFsTokenPath(SamzaPipelineOptions options) {
+    if (containsKey(options, FS_TOKEN_PATH)) {
+      return options.getConfigOverride().get(FS_TOKEN_PATH);
+    } else {
+      return DEFAULT_FS_TOKEN_PATH;
+    }
+  }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
index ef9531b..f94caae 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
@@ -372,7 +372,13 @@ public class UnboundedSourceSystem {
             final Instant nextWatermark = reader.getWatermark();
             if (currentWatermark.isBefore(nextWatermark)) {
               currentWatermarks.put(ssp, nextWatermark);
-              enqueueWatermark(reader);
+              if (BoundedWindow.TIMESTAMP_MAX_VALUE.isAfter(nextWatermark)) {
+                enqueueWatermark(reader);
+              } else {
+                // Max watermark has been reached for this reader.
+                enqueueMaxWatermarkAndEndOfStream(reader);
+                running = false;
+              }
             }
           }
 
@@ -403,6 +409,37 @@ public class UnboundedSourceSystem {
         queues.get(ssp).put(envelope);
       }
 
+      // Send an max watermark message and an end of stream message to the corresponding ssp to
+      // close windows and finish the task.
+      private void enqueueMaxWatermarkAndEndOfStream(UnboundedReader<T> reader) {
+        final SystemStreamPartition ssp = readerToSsp.get(reader);
+        // Send the max watermark to force completion of any open windows.
+        final IncomingMessageEnvelope watermarkEnvelope =
+            IncomingMessageEnvelope.buildWatermarkEnvelope(
+                ssp, BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+        enqueueUninterruptibly(watermarkEnvelope);
+
+        final IncomingMessageEnvelope endOfStreamEnvelope =
+            IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp);
+        enqueueUninterruptibly(endOfStreamEnvelope);
+      }
+
+      private void enqueueUninterruptibly(IncomingMessageEnvelope envelope) {
+        final BlockingQueue<IncomingMessageEnvelope> queue =
+            queues.get(envelope.getSystemStreamPartition());
+        while (true) {
+          try {
+            queue.put(envelope);
+            return;
+          } catch (InterruptedException e) {
+            // Some events require that we post an envelope to the queue even if the interrupt
+            // flag was set (i.e. during a call to stop) to ensure that the consumer properly
+            // shuts down. Consequently, if we receive an interrupt here we ignore it and retry
+            // the put operation.
+          }
+        }
+      }
+
       void stop() {
         running = false;
       }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java
index 60d7f69..6ca8b29 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java
@@ -40,8 +40,10 @@ import org.slf4j.LoggerFactory;
 public class BeamContainerRunner implements ApplicationRunner {
   private static final Logger LOG = LoggerFactory.getLogger(BeamContainerRunner.class);
 
+  @SuppressWarnings("rawtypes")
   private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
 
+  @SuppressWarnings("rawtypes")
   public BeamContainerRunner(SamzaApplication app, Config config) {
     this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
   }
@@ -56,9 +58,7 @@ public class BeamContainerRunner implements ApplicationRunner {
             }));
 
     ContainerLaunchUtil.run(
-        appDesc,
-        System.getenv(ShellCommandConfig.ENV_CONTAINER_ID()),
-        ContainerCfgFactory.jobModel);
+        appDesc, System.getenv(ShellCommandConfig.ENV_CONTAINER_ID), ContainerCfgLoader.jobModel);
   }
 
   @Override
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamJobCoordinatorRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamJobCoordinatorRunner.java
new file mode 100644
index 0000000..fb00a01
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamJobCoordinatorRunner.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.container;
+
+import java.time.Duration;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.clustermanager.JobCoordinatorLaunchUtil;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ExternalContext;
+import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.runtime.ApplicationRunner;
+
+/** Runs on Yarn AM, execute planning and launches JobCoordinator. */
+public class BeamJobCoordinatorRunner implements ApplicationRunner {
+
+  @SuppressWarnings("rawtypes")
+  private final SamzaApplication<? extends ApplicationDescriptor> app;
+
+  private final Config config;
+
+  /**
+   * Constructors a {@link BeamJobCoordinatorRunner} to run the {@code app} with the {@code config}.
+   *
+   * @param app application to run
+   * @param config configuration for the application
+   */
+  @SuppressWarnings("rawtypes")
+  public BeamJobCoordinatorRunner(
+      SamzaApplication<? extends ApplicationDescriptor> app, Config config) {
+    this.app = app;
+    this.config = config;
+  }
+
+  @Override
+  public void run(ExternalContext externalContext) {
+    JobCoordinatorLaunchUtil.run(app, config);
+  }
+
+  @Override
+  public void kill() {
+    throw new UnsupportedOperationException(
+        "BeamJobCoordinatorRunner#kill should never be invoked.");
+  }
+
+  @Override
+  public ApplicationStatus status() {
+    throw new UnsupportedOperationException(
+        "BeamJobCoordinatorRunner#status should never be invoked.");
+  }
+
+  @Override
+  public void waitForFinish() {
+    throw new UnsupportedOperationException(
+        "BeamJobCoordinatorRunner#waitForFinish should never be invoked.");
+  }
+
+  @Override
+  public boolean waitForFinish(Duration timeout) {
+    throw new UnsupportedOperationException(
+        "BeamJobCoordinatorRunner#waitForFinish should never be invoked.");
+  }
+}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgLoader.java
similarity index 79%
rename from runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgFactory.java
rename to runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgLoader.java
index cb97b58..99455e7 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgLoader.java
@@ -17,12 +17,11 @@
  */
 package org.apache.beam.runners.samza.container;
 
-import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigFactory;
+import org.apache.samza.config.ConfigLoader;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.SamzaContainer;
@@ -30,26 +29,27 @@ import org.apache.samza.job.model.JobModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** Factory for the Beam yarn container to load job model. */
+/** Loader for the Beam yarn container to load job model. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class ContainerCfgFactory implements ConfigFactory {
-  private static final Logger LOG = LoggerFactory.getLogger(ContainerCfgFactory.class);
+public class ContainerCfgLoader implements ConfigLoader {
+  private static final Logger LOG = LoggerFactory.getLogger(ContainerCfgLoader.class);
 
   private static final Object LOCK = new Object();
   static volatile JobModel jobModel;
 
   @Override
-  public Config getConfig(URI configUri) {
+  public Config getConfig() {
     if (jobModel == null) {
       synchronized (LOCK) {
         if (jobModel == null) {
-          String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
+          final String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID);
           LOG.info(String.format("Got container ID: %s", containerId));
-          String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+          final String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
           LOG.info(String.format("Got coordinator URL: %s", coordinatorUrl));
-          int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
+          final int delay =
+              new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
           jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
         }
       }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpEmitter.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgLoaderFactory.java
similarity index 65%
copy from runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpEmitter.java
copy to runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgLoaderFactory.java
index d1e1f06..d3b090d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpEmitter.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/ContainerCfgLoaderFactory.java
@@ -15,16 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.samza.runtime;
+package org.apache.beam.runners.samza.container;
 
-import org.apache.beam.sdk.util.WindowedValue;
-import org.joda.time.Instant;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigLoader;
+import org.apache.samza.config.ConfigLoaderFactory;
 
-/** Output emitter for Samza {@link Op}. */
-public interface OpEmitter<OutT> {
-  void emitElement(WindowedValue<OutT> element);
-
-  void emitWatermark(Instant watermark);
-
-  <T> void emitView(String id, WindowedValue<Iterable<T>> elements);
+/** Factory for the Beam yarn container to get loader to load job model. */
+public class ContainerCfgLoaderFactory implements ConfigLoaderFactory {
+  @Override
+  public ConfigLoader getLoader(Config config) {
+    return new ContainerCfgLoader();
+  }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java
new file mode 100644
index 0000000..e5bf8ec
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.samza.operators.Scheduler;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Bundle management for the {@link DoFnOp} that handles lifecycle of a bundle. It also serves as a
+ * proxy for the {@link DoFnOp} to process watermark and decides to 1. Hold watermark if there is at
+ * least one bundle in progress. 2. Propagates the watermark to downstream DAG, if all the previous
+ * bundles have completed.
+ *
+ * <p>A bundle is considered complete only when the outputs corresponding to each element in the
+ * bundle have been resolved and the watermark associated with the bundle(if any) is propagated
+ * downstream. The output of an element is considered resolved based on the nature of the ParDoFn 1.
+ * In case of synchronous ParDo, outputs of the element is resolved immediately after the
+ * processElement returns. 2. In case of asynchronous ParDo, outputs of the element is resolved when
+ * all the future emitted by the processElement is resolved.
+ *
+ * <p>This class is not thread safe and the current implementation relies on the assumption that
+ * messages are dispatched to BundleManager in a single threaded mode.
+ *
+ * @param <OutT> output type of the {@link DoFnOp}
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class BundleManager<OutT> {
+  private static final Logger LOG = LoggerFactory.getLogger(BundleManager.class);
+  private static final long MIN_BUNDLE_CHECK_TIME_MS = 10L;
+
+  private final long maxBundleSize;
+  private final long maxBundleTimeMs;
+  private final BundleProgressListener<OutT> bundleProgressListener;
+  private final FutureCollector<OutT> futureCollector;
+  private final Scheduler<KeyedTimerData<Void>> bundleTimerScheduler;
+  private final String bundleCheckTimerId;
+
+  // Number elements belonging to the current active bundle
+  private transient AtomicLong currentBundleElementCount;
+  // Number of bundles that are in progress but not yet finished
+  private transient AtomicLong pendingBundleCount;
+  // Denotes the start time of the current active bundle
+  private transient AtomicLong bundleStartTime;
+  // Denotes if there is an active in progress bundle. Note at a given time, we can have multiple
+  // bundle in progress.
+  // This flag denotes if there is a bundle that is current and hasn't been closed.
+  private transient AtomicBoolean isBundleStarted;
+  // Holder for watermark which gets propagated when the bundle is finished.
+  private transient Instant bundleWatermarkHold;
+  // A future that is completed once all futures belonging to the current active bundle are
+  // completed.  The value is null if there are no futures in the current active bundle.
+  private transient AtomicReference<CompletableFuture<Void>> currentActiveBundleDoneFutureReference;
+  private transient CompletionStage<Void> watermarkFuture;
+
+  public BundleManager(
+      BundleProgressListener<OutT> bundleProgressListener,
+      FutureCollector<OutT> futureCollector,
+      long maxBundleSize,
+      long maxBundleTimeMs,
+      Scheduler<KeyedTimerData<Void>> bundleTimerScheduler,
+      String bundleCheckTimerId) {
+    this.maxBundleSize = maxBundleSize;
+    this.maxBundleTimeMs = maxBundleTimeMs;
+    this.bundleProgressListener = bundleProgressListener;
+    this.bundleTimerScheduler = bundleTimerScheduler;
+    this.bundleCheckTimerId = bundleCheckTimerId;
+    this.futureCollector = futureCollector;
+
+    if (maxBundleSize > 1) {
+      scheduleNextBundleCheck();
+    }
+
+    // instance variable initialization for bundle tracking
+    this.bundleStartTime = new AtomicLong(Long.MAX_VALUE);
+    this.currentActiveBundleDoneFutureReference = new AtomicReference<>();
+    this.currentBundleElementCount = new AtomicLong(0L);
+    this.isBundleStarted = new AtomicBoolean(false);
+    this.pendingBundleCount = new AtomicLong(0L);
+    this.watermarkFuture = CompletableFuture.completedFuture(null);
+  }
+
+  /*
+   * Schedule in processing time to check whether the current bundle should be closed. Note that
+   * we only approximately achieve max bundle time by checking as frequent as half of the max bundle
+   * time set by users. This would violate the max bundle time by up to half of it but should
+   * acceptable in most cases (and cheaper than scheduling a timer at the beginning of every bundle).
+   */
+  private void scheduleNextBundleCheck() {
+    final Instant nextBundleCheckTime =
+        Instant.now().plus(Duration.millis(maxBundleTimeMs / 2 + MIN_BUNDLE_CHECK_TIME_MS));
+    final TimerInternals.TimerData timerData =
+        TimerInternals.TimerData.of(
+            this.bundleCheckTimerId,
+            StateNamespaces.global(),
+            nextBundleCheckTime,
+            nextBundleCheckTime,
+            TimeDomain.PROCESSING_TIME);
+    bundleTimerScheduler.schedule(
+        new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis());
+  }
+
+  void tryStartBundle() {
+    futureCollector.prepare();
+
+    if (isBundleStarted.compareAndSet(false, true)) {
+      LOG.debug("Starting a new bundle.");
+      // make sure the previous bundle is sealed and futures are cleared
+      Preconditions.checkArgument(
+          currentActiveBundleDoneFutureReference.get() == null,
+          "Current active bundle done future should be null before starting a new bundle.");
+      bundleStartTime.set(System.currentTimeMillis());
+      pendingBundleCount.incrementAndGet();
+      bundleProgressListener.onBundleStarted();
+    }
+
+    currentBundleElementCount.incrementAndGet();
+  }
+
+  void processWatermark(Instant watermark, OpEmitter<OutT> emitter) {
+    // propagate watermark immediately if no bundle is in progress and all the previous bundles have
+    // completed.
+    if (!isBundleStarted() && pendingBundleCount.get() == 0) {
+      LOG.debug("Propagating watermark: {} directly since no bundle in progress.", watermark);
+      bundleProgressListener.onWatermark(watermark, emitter);
+      return;
+    }
+
+    // hold back the watermark since there is either a bundle in progress or previously closed
+    // bundles are unfinished.
+    this.bundleWatermarkHold = watermark;
+
+    // for batch mode, the max watermark should force the bundle to close
+    if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) {
+      /*
+       * Due to lack of async watermark function, we block on the previous watermark futures before propagating the watermark
+       * downstream. If a bundle is in progress tryFinishBundle() fill force the bundle to close and emit watermark.
+       * If no bundle in progress, we progress watermark explicitly after the completion of previous watermark futures.
+       */
+      if (isBundleStarted()) {
+        LOG.info(
+            "Received max watermark. Triggering finish bundle before flushing the watermark downstream.");
+        tryFinishBundle(emitter);
+        watermarkFuture.toCompletableFuture().join();
+      } else {
+        LOG.info(
+            "Received max watermark. Waiting for previous bundles to complete before flushing the watermark downstream.");
+        watermarkFuture.toCompletableFuture().join();
+        bundleProgressListener.onWatermark(watermark, emitter);
+      }
+    }
+  }
+
+  void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter) {
+    // this is internal timer in processing time to check whether a bundle should be closed
+    if (bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
+      tryFinishBundle(emitter);
+      scheduleNextBundleCheck();
+    }
+  }
+
+  /**
+   * Signal the bundle manager to handle failure. We discard the output collected as part of
+   * processing the current element and reset the bundle count.
+   *
+   * @param t failure cause
+   */
+  void signalFailure(Throwable t) {
+    LOG.error("Encountered error during processing the message. Discarding the output due to: ", t);
+    futureCollector.discard();
+    // reset the bundle start flag only if the bundle has started
+    isBundleStarted.compareAndSet(true, false);
+
+    // bundle start may not necessarily mean we have actually started the bundle since some of the
+    // invariant check conditions within bundle start could throw exceptions. so rely on bundle
+    // start time
+    if (bundleStartTime.get() != Long.MAX_VALUE) {
+      currentBundleElementCount.set(0L);
+      bundleStartTime.set(Long.MAX_VALUE);
+      pendingBundleCount.decrementAndGet();
+      currentActiveBundleDoneFutureReference.set(null);
+    }
+  }
+
+  void tryFinishBundle(OpEmitter<OutT> emitter) {
+
+    // we need to seal the output for each element within a bundle irrespective of the whether we
+    // decide to finish the
+    // bundle or not
+    CompletionStage<Collection<WindowedValue<OutT>>> outputFuture = futureCollector.finish();
+
+    if (shouldFinishBundle() && isBundleStarted.compareAndSet(true, false)) {
+      LOG.debug("Finishing the current bundle.");
+
+      // reset the bundle count
+      // seal the bundle and emit the result future (collection of results)
+      // chain the finish bundle invocation on the finish bundle
+      currentBundleElementCount.set(0L);
+      bundleStartTime.set(Long.MAX_VALUE);
+      Instant watermarkHold = bundleWatermarkHold;
+      bundleWatermarkHold = null;
+
+      CompletionStage<Void> currentActiveBundleDoneFuture =
+          currentActiveBundleDoneFutureReference.get();
+      outputFuture =
+          outputFuture.thenCombine(
+              currentActiveBundleDoneFuture != null
+                  ? currentActiveBundleDoneFuture
+                  : CompletableFuture.completedFuture(null),
+              (res, ignored) -> {
+                bundleProgressListener.onBundleFinished(emitter);
+                return res;
+              });
+
+      BiConsumer<Collection<WindowedValue<OutT>>, Void> watermarkPropagationFn;
+      if (watermarkHold == null) {
+        watermarkPropagationFn = (ignored, res) -> pendingBundleCount.decrementAndGet();
+      } else {
+        watermarkPropagationFn =
+            (ignored, res) -> {
+              LOG.debug("Propagating watermark: {} to downstream.", watermarkHold);
+              bundleProgressListener.onWatermark(watermarkHold, emitter);
+              pendingBundleCount.decrementAndGet();
+            };
+      }
+
+      // We chain the current watermark emission with previous watermark and the output futures
+      // since bundles can finish out of order but we still want the watermark to be emitted in
+      // order.
+      watermarkFuture = outputFuture.thenAcceptBoth(watermarkFuture, watermarkPropagationFn);
+      currentActiveBundleDoneFutureReference.set(null);
+    } else if (isBundleStarted.get()) {
+      final CompletableFuture<Collection<WindowedValue<OutT>>> finalOutputFuture =
+          outputFuture.toCompletableFuture();
+      currentActiveBundleDoneFutureReference.updateAndGet(
+          maybePrevFuture -> {
+            CompletableFuture<Void> prevFuture =
+                maybePrevFuture != null ? maybePrevFuture : CompletableFuture.completedFuture(null);
+
+            return CompletableFuture.allOf(prevFuture, finalOutputFuture);
+          });
+    }
+
+    // emit the future to the propagate it to rest of the DAG
+    emitter.emitFuture(outputFuture);
+  }
+
+  @VisibleForTesting
+  long getCurrentBundleElementCount() {
+    return currentBundleElementCount.longValue();
+  }
+
+  @VisibleForTesting
+  @Nullable
+  CompletionStage<Void> getCurrentBundleDoneFuture() {
+    return currentActiveBundleDoneFutureReference.get();
+  }
+
+  @VisibleForTesting
+  void setCurrentBundleDoneFuture(CompletableFuture<Void> currentBundleResultFuture) {
+    this.currentActiveBundleDoneFutureReference.set(currentBundleResultFuture);
+  }
+
+  @VisibleForTesting
+  long getPendingBundleCount() {
+    return pendingBundleCount.longValue();
+  }
+
+  @VisibleForTesting
+  void setPendingBundleCount(long value) {
+    pendingBundleCount.set(value);
+  }
+
+  @VisibleForTesting
+  boolean isBundleStarted() {
+    return isBundleStarted.get();
+  }
+
+  @VisibleForTesting
+  void setBundleWatermarkHold(Instant watermark) {
+    this.bundleWatermarkHold = watermark;
+  }
+
+  /**
+   * We close the current bundle in progress if one of the following criteria is met 1. The bundle
+   * count &ge; maxBundleSize 2. Time elapsed since the bundle started is &ge; maxBundleTimeMs 3.
+   * Watermark hold equals to TIMESTAMP_MAX_VALUE which usually is the case for bounded jobs
+   *
+   * @return true - if one of the criteria above is satisfied; false - otherwise
+   */
+  private boolean shouldFinishBundle() {
+    return isBundleStarted.get()
+        && (currentBundleElementCount.get() >= maxBundleSize
+            || System.currentTimeMillis() - bundleStartTime.get() >= maxBundleTimeMs
+            || BoundedWindow.TIMESTAMP_MAX_VALUE.equals(bundleWatermarkHold));
+  }
+
+  /**
+   * A listener used to track the lifecycle of a bundle. Typically, the lifecycle of a bundle
+   * consists of 1. Start bundle - Invoked when the bundle is started 2. Finish bundle - Invoked
+   * when the bundle is complete. Refer to the docs under {@link BundleManager} for definition on
+   * when a bundle is considered complete. 3. onWatermark - Invoked when watermark is ready to be
+   * propagated to downstream DAG. Refer to the docs under {@link BundleManager} on when watermark
+   * is held vs propagated.
+   *
+   * @param <OutT>
+   */
+  public interface BundleProgressListener<OutT> {
+    void onBundleStarted();
+
+    void onBundleFinished(OpEmitter<OutT> emitter);
+
+    void onWatermark(Instant watermark, OpEmitter<OutT> emitter);
+  }
+}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index 9f2ea43..6565218 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -17,15 +17,19 @@
  */
 package org.apache.beam.runners.samza.runtime;
 
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
@@ -40,9 +44,9 @@ import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.samza.SamzaExecutionContext;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.apache.beam.runners.samza.util.FutureUtils;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -60,7 +64,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterator
 import org.apache.samza.config.Config;
 import org.apache.samza.context.Context;
 import org.apache.samza.operators.Scheduler;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,7 +75,6 @@ import org.slf4j.LoggerFactory;
 })
 public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
   private static final Logger LOG = LoggerFactory.getLogger(DoFnOp.class);
-  private static final long MIN_BUNDLE_CHECK_TIME_MS = 10L;
 
   private final TupleTag<FnOutT> mainOutputTag;
   private final DoFn<InT, FnOutT> doFn;
@@ -113,17 +115,12 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
 
   // TODO: add this to checkpointable state
   private transient Instant inputWatermark;
-  private transient Instant bundleWatermarkHold;
+  private transient BundleManager<OutT> bundleManager;
   private transient Instant sideInputWatermark;
   private transient List<WindowedValue<InT>> pushbackValues;
   private transient StageBundleFactory stageBundleFactory;
-  private transient long maxBundleSize;
-  private transient long maxBundleTimeMs;
-  private transient AtomicLong currentBundleElementCount;
-  private transient AtomicLong bundleStartTime;
-  private transient AtomicBoolean isBundleStarted;
-  private transient Scheduler<KeyedTimerData<Void>> bundleTimerScheduler;
   private DoFnSchemaInformation doFnSchemaInformation;
+  private transient boolean bundleDisabled;
   private Map<String, PCollectionView<?>> sideInputMapping;
 
   public DoFnOp(
@@ -178,26 +175,27 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
     this.inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
     this.sideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
     this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
-    this.currentBundleElementCount = new AtomicLong(0L);
-    this.bundleStartTime = new AtomicLong(Long.MAX_VALUE);
-    this.isBundleStarted = new AtomicBoolean(false);
-    this.bundleWatermarkHold = null;
 
     final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
     final SamzaExecutionContext samzaExecutionContext =
         (SamzaExecutionContext) context.getApplicationContainerContext();
     this.samzaPipelineOptions = samzaExecutionContext.getPipelineOptions();
-    this.maxBundleSize = samzaPipelineOptions.getMaxBundleSize();
-    this.maxBundleTimeMs = samzaPipelineOptions.getMaxBundleTimeMs();
-    this.bundleTimerScheduler = timerRegistry;
-
-    if (this.maxBundleSize > 1) {
-      scheduleNextBundleCheck();
-    }
+    this.bundleDisabled = samzaPipelineOptions.getMaxBundleSize() <= 1;
 
+    final String stateId = "pardo-" + transformId;
     final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
         SamzaStoreStateInternals.createStateInternalFactory(
-            transformId, null, context.getTaskContext(), samzaPipelineOptions, signature);
+            stateId, null, context.getTaskContext(), samzaPipelineOptions, signature);
+    final FutureCollector<OutT> outputFutureCollector = createFutureCollector();
+
+    this.bundleManager =
+        new BundleManager<>(
+            createBundleProgressListener(),
+            outputFutureCollector,
+            samzaPipelineOptions.getMaxBundleSize(),
+            samzaPipelineOptions.getMaxBundleTimeMs(),
+            timerRegistry,
+            bundleCheckTimerId);
 
     this.timerInternalsFactory =
         SamzaTimerInternalsFactory.createTimerInternalFactory(
@@ -224,7 +222,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
           SamzaDoFnRunners.createPortable(
               samzaPipelineOptions,
               bundledEventsBagState,
-              outputManagerFactory.create(emitter),
+              outputManagerFactory.create(emitter, outputFutureCollector),
               stageBundleFactory,
               mainOutputTag,
               idToTupleTagMap,
@@ -237,13 +235,13 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
               doFn,
               windowingStrategy,
               transformFullName,
-              transformId,
+              stateId,
               context,
               mainOutputTag,
               sideInputHandler,
               timerInternalsFactory,
               keyCoder,
-              outputManagerFactory.create(emitter),
+              outputManagerFactory.create(emitter, outputFutureCollector),
               inputCoder,
               sideOutputTags,
               outputCoders,
@@ -267,24 +265,8 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
     doFnInvoker.invokeSetup();
   }
 
-  /*
-   * Schedule in processing time to check whether the current bundle should be closed. Note that
-   * we only approximately achieve max bundle time by checking as frequent as half of the max bundle
-   * time set by users. This would violate the max bundle time by up to half of it but should
-   * acceptable in most cases (and cheaper than scheduling a timer at the beginning of every bundle).
-   */
-  private void scheduleNextBundleCheck() {
-    final Instant nextBundleCheckTime =
-        Instant.now().plus(Duration.millis(maxBundleTimeMs / 2 + MIN_BUNDLE_CHECK_TIME_MS));
-    final TimerInternals.TimerData timerData =
-        TimerInternals.TimerData.of(
-            bundleCheckTimerId,
-            StateNamespaces.global(),
-            nextBundleCheckTime,
-            nextBundleCheckTime,
-            TimeDomain.PROCESSING_TIME);
-    bundleTimerScheduler.schedule(
-        new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis());
+  /*package private*/ FutureCollector<OutT> createFutureCollector() {
+    return new FutureCollectorImpl<>();
   }
 
   private String getTimerStateId(DoFnSignature signature) {
@@ -295,51 +277,25 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
     return builder.toString();
   }
 
-  private void attemptStartBundle() {
-    if (isBundleStarted.compareAndSet(false, true)) {
-      currentBundleElementCount.set(0L);
-      bundleStartTime.set(System.currentTimeMillis());
-      pushbackFnRunner.startBundle();
-    }
-  }
-
-  private void finishBundle(OpEmitter<OutT> emitter) {
-    if (isBundleStarted.compareAndSet(true, false)) {
-      currentBundleElementCount.set(0L);
-      bundleStartTime.set(Long.MAX_VALUE);
-      pushbackFnRunner.finishBundle();
-      if (bundleWatermarkHold != null) {
-        doProcessWatermark(bundleWatermarkHold, emitter);
-      }
-      bundleWatermarkHold = null;
-    }
-  }
-
-  private void attemptFinishBundle(OpEmitter<OutT> emitter) {
-    if (!isBundleStarted.get()) {
-      return;
-    }
-    if (currentBundleElementCount.get() >= maxBundleSize
-        || System.currentTimeMillis() - bundleStartTime.get() > maxBundleTimeMs) {
-      finishBundle(emitter);
-    }
-  }
-
   @Override
   public void processElement(WindowedValue<InT> inputElement, OpEmitter<OutT> emitter) {
-    attemptStartBundle();
-
-    final Iterable<WindowedValue<InT>> rejectedValues =
-        pushbackFnRunner.processElementInReadyWindows(inputElement);
-    for (WindowedValue<InT> rejectedValue : rejectedValues) {
-      if (rejectedValue.getTimestamp().compareTo(pushbackWatermarkHold) < 0) {
-        pushbackWatermarkHold = rejectedValue.getTimestamp();
+    try {
+      bundleManager.tryStartBundle();
+      final Iterable<WindowedValue<InT>> rejectedValues =
+          pushbackFnRunner.processElementInReadyWindows(inputElement);
+      for (WindowedValue<InT> rejectedValue : rejectedValues) {
+        if (rejectedValue.getTimestamp().compareTo(pushbackWatermarkHold) < 0) {
+          pushbackWatermarkHold = rejectedValue.getTimestamp();
+        }
+        pushbackValues.add(rejectedValue);
       }
-      pushbackValues.add(rejectedValue);
-    }
 
-    currentBundleElementCount.incrementAndGet();
-    attemptFinishBundle(emitter);
+      bundleManager.tryFinishBundle(emitter);
+    } catch (Throwable t) {
+      LOG.error("Encountered error during process element", t);
+      bundleManager.signalFailure(t);
+      throw t;
+    }
   }
 
   private void doProcessWatermark(Instant watermark, OpEmitter<OutT> emitter) {
@@ -373,21 +329,14 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
 
   @Override
   public void processWatermark(Instant watermark, OpEmitter<OutT> emitter) {
-    if (!isBundleStarted.get()) {
-      doProcessWatermark(watermark, emitter);
-    } else {
-      // if there is a bundle in progress, hold back the watermark until end of the bundle
-      this.bundleWatermarkHold = watermark;
-      if (watermark.isEqual(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
-        // for batch mode, the max watermark should force the bundle to close
-        finishBundle(emitter);
-      }
-    }
+    bundleManager.processWatermark(watermark, emitter);
   }
 
   @Override
   public void processSideInput(
       String id, WindowedValue<? extends Iterable<?>> elements, OpEmitter<OutT> emitter) {
+    checkState(
+        bundleDisabled, "Side input not supported in bundling mode. Please disable bundling.");
     @SuppressWarnings("unchecked")
     final WindowedValue<Iterable<?>> retypedElements = (WindowedValue<Iterable<?>>) elements;
 
@@ -413,6 +362,8 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
 
   @Override
   public void processSideInputWatermark(Instant watermark, OpEmitter<OutT> emitter) {
+    checkState(
+        bundleDisabled, "Side input not supported in bundling mode. Please disable bundling.");
     sideInputWatermark = watermark;
 
     if (sideInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
@@ -425,8 +376,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
   public void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter) {
     // this is internal timer in processing time to check whether a bundle should be closed
     if (bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
-      attemptFinishBundle(emitter);
-      scheduleNextBundleCheck();
+      bundleManager.processTimer(keyedTimerData, emitter);
       return;
     }
 
@@ -439,7 +389,6 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
 
   @Override
   public void close() {
-    bundleWatermarkHold = null;
     doFnInvoker.invokeTeardown();
     try (AutoCloseable closer = stageBundleFactory) {
       // do nothing
@@ -471,6 +420,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
     }
   }
 
+  // todo: should this go through bundle manager to start and finish the bundle?
   private void emitAllPushbackValues() {
     if (!pushbackValues.isEmpty()) {
       pushbackFnRunner.startBundle();
@@ -487,6 +437,88 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
     }
   }
 
+  private BundleManager.BundleProgressListener<OutT> createBundleProgressListener() {
+    return new BundleManager.BundleProgressListener<OutT>() {
+      @Override
+      public void onBundleStarted() {
+        pushbackFnRunner.startBundle();
+      }
+
+      @Override
+      public void onBundleFinished(OpEmitter<OutT> emitter) {
+        pushbackFnRunner.finishBundle();
+      }
+
+      @Override
+      public void onWatermark(Instant watermark, OpEmitter<OutT> emitter) {
+        doProcessWatermark(watermark, emitter);
+      }
+    };
+  }
+
+  static <T, OutT> CompletionStage<WindowedValue<OutT>> createOutputFuture(
+      WindowedValue<T> windowedValue,
+      CompletionStage<T> valueFuture,
+      Function<T, OutT> valueMapper) {
+    return valueFuture.thenApply(
+        res ->
+            WindowedValue.of(
+                valueMapper.apply(res),
+                windowedValue.getTimestamp(),
+                windowedValue.getWindows(),
+                windowedValue.getPane()));
+  }
+
+  static class FutureCollectorImpl<OutT> implements FutureCollector<OutT> {
+    private final List<CompletionStage<WindowedValue<OutT>>> outputFutures;
+    private AtomicBoolean collectorSealed;
+
+    FutureCollectorImpl() {
+      /*
+       * Choosing synchronized list here since the concurrency is low as the message dispatch thread is single threaded.
+       * We need this guard against scenarios when watermark/finish bundle trigger outputs.
+       */
+      outputFutures = Collections.synchronizedList(new ArrayList<>());
+      collectorSealed = new AtomicBoolean(true);
+    }
+
+    @Override
+    public void add(CompletionStage<WindowedValue<OutT>> element) {
+      checkState(
+          !collectorSealed.get(),
+          "Cannot add elements to an unprepared collector. Make sure prepare() is invoked before adding elements.");
+      outputFutures.add(element);
+    }
+
+    @Override
+    public void discard() {
+      collectorSealed.compareAndSet(false, true);
+      outputFutures.clear();
+    }
+
+    @Override
+    public CompletionStage<Collection<WindowedValue<OutT>>> finish() {
+      /*
+       * We can ignore the results here because its okay to call finish without invoking prepare. It will be a no-op
+       * and an empty collection will be returned.
+       */
+      collectorSealed.compareAndSet(false, true);
+
+      CompletionStage<Collection<WindowedValue<OutT>>> sealedOutputFuture =
+          FutureUtils.flattenFutures(outputFutures);
+      outputFutures.clear();
+      return sealedOutputFuture;
+    }
+
+    @Override
+    public void prepare() {
+      boolean isCollectorSealed = collectorSealed.compareAndSet(true, false);
+      checkState(
+          isCollectorSealed,
+          "Failed to prepare the collector. Collector needs to be sealed before prepare() is invoked.");
+    }
+  }
+
   /**
    * Factory class to create an {@link org.apache.beam.runners.core.DoFnRunners.OutputManager} that
    * emits values to the main output only, which is a single {@link
@@ -497,13 +529,31 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
   public static class SingleOutputManagerFactory<OutT> implements OutputManagerFactory<OutT> {
     @Override
     public DoFnRunners.OutputManager create(OpEmitter<OutT> emitter) {
+      return createOutputManager(emitter, null);
+    }
+
+    @Override
+    public DoFnRunners.OutputManager create(
+        OpEmitter<OutT> emitter, FutureCollector<OutT> collector) {
+      return createOutputManager(emitter, collector);
+    }
+
+    private DoFnRunners.OutputManager createOutputManager(
+        OpEmitter<OutT> emitter, FutureCollector<OutT> collector) {
       return new DoFnRunners.OutputManager() {
         @Override
+        @SuppressWarnings("unchecked")
         public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
           // With only one input we know that T is of type OutT.
-          @SuppressWarnings("unchecked")
-          final WindowedValue<OutT> retypedWindowedValue = (WindowedValue<OutT>) windowedValue;
-          emitter.emitElement(retypedWindowedValue);
+          if (windowedValue.getValue() instanceof CompletionStage) {
+            CompletionStage<T> valueFuture = (CompletionStage<T>) windowedValue.getValue();
+            if (collector != null) {
+              collector.add(createOutputFuture(windowedValue, valueFuture, value -> (OutT) value));
+            }
+          } else {
+            final WindowedValue<OutT> retypedWindowedValue = (WindowedValue<OutT>) windowedValue;
+            emitter.emitElement(retypedWindowedValue);
+          }
         }
       };
     }
@@ -523,13 +573,34 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
 
     @Override
     public DoFnRunners.OutputManager create(OpEmitter<RawUnionValue> emitter) {
+      return createOutputManager(emitter, null);
+    }
+
+    @Override
+    public DoFnRunners.OutputManager create(
+        OpEmitter<RawUnionValue> emitter, FutureCollector<RawUnionValue> collector) {
+      return createOutputManager(emitter, collector);
+    }
+
+    private DoFnRunners.OutputManager createOutputManager(
+        OpEmitter<RawUnionValue> emitter, FutureCollector<RawUnionValue> collector) {
       return new DoFnRunners.OutputManager() {
         @Override
+        @SuppressWarnings("unchecked")
         public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
           final int index = tagToIndexMap.get(tupleTag);
           final T rawValue = windowedValue.getValue();
-          final RawUnionValue rawUnionValue = new RawUnionValue(index, rawValue);
-          emitter.emitElement(windowedValue.withValue(rawUnionValue));
+          if (rawValue instanceof CompletionStage) {
+            CompletionStage<T> valueFuture = (CompletionStage<T>) rawValue;
+            if (collector != null) {
+              collector.add(
+                  createOutputFuture(
+                      windowedValue, valueFuture, res -> new RawUnionValue(index, res)));
+            }
+          } else {
+            final RawUnionValue rawUnionValue = new RawUnionValue(index, rawValue);
+            emitter.emitElement(windowedValue.withValue(rawUnionValue));
+          }
         }
       };
     }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollector.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollector.java
new file mode 100644
index 0000000..acb2eba
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollector.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionStage;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * A future collector that buffers the output from the users {@link
+ * org.apache.beam.sdk.transforms.DoFn} and propagates the result future to downstream operators
+ * only after {@link #finish()} is invoked.
+ *
+ * @param <OutT> type of the output element
+ */
+public interface FutureCollector<OutT> {
+  /**
+   * Outputs the element to the collector.
+   *
+   * @param element to add to the collector
+   */
+  void add(CompletionStage<WindowedValue<OutT>> element);
+
+  /**
+   * Discards the elements within the collector. Once the elements have been discarded, callers need
+   * to prepare the collector again before invoking {@link #add(CompletionStage)}.
+   */
+  void discard();
+
+  /**
+   * Seals this {@link FutureCollector}, returning a {@link CompletionStage} containing all of the
+   * elements that were added to it. The {@link #add(CompletionStage)} method will throw an {@link
+   * IllegalStateException} if called after a call to finish.
+   *
+   * <p>The {@link FutureCollector} needs to be started again to collect newer batch of output.
+   */
+  CompletionStage<Collection<WindowedValue<OutT>>> finish();
+
+  /**
+   * Prepares the {@link FutureCollector} to accept output elements. The {@link
+   * #add(CompletionStage)} method will throw an {@link IllegalStateException} if called without
+   * preparing the collector.
+   */
+  void prepare();
+}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
index a84dde6..c4022f9 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
@@ -32,8 +32,6 @@ import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
-import org.apache.beam.runners.core.serialization.Base64Serializer;
 import org.apache.beam.runners.samza.SamzaExecutionContext;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics;
@@ -111,11 +109,10 @@ public class GroupByKeyOp<K, InputT, OutputT>
       Context context,
       Scheduler<KeyedTimerData<K>> timerRegistry,
       OpEmitter<KV<K, OutputT>> emitter) {
-    this.pipelineOptions =
-        Base64Serializer.deserializeUnchecked(
-                config.get("beamPipelineOptions"), SerializablePipelineOptions.class)
-            .get()
-            .as(SamzaPipelineOptions.class);
+
+    final SamzaExecutionContext samzaExecutionContext =
+        (SamzaExecutionContext) context.getApplicationContainerContext();
+    this.pipelineOptions = samzaExecutionContext.getPipelineOptions();
 
     final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
         SamzaStoreStateInternals.createStateInternalFactory(
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
index 9dcec3d..9c6bf38 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
@@ -42,6 +42,7 @@ import org.joda.time.Instant;
  * {@link Comparable} by first comparing the wrapped TimerData then the key.
  */
 @SuppressWarnings({
+  "keyfor",
   "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
index 564ee46..46746a6 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
@@ -21,24 +21,30 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.samza.config.Config;
 import org.apache.samza.context.Context;
 import org.apache.samza.operators.Scheduler;
-import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.AsyncFlatMapFunction;
 import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** Adaptor class that runs a Samza {@link Op} for BEAM in the Samza {@link FlatMapFunction}. */
+/**
+ * Adaptor class that runs a Samza {@link Op} for BEAM in the Samza {@link AsyncFlatMapFunction}.
+ * This class is initialized once for each Op within a Task for each Task.
+ */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class OpAdapter<InT, OutT, K>
-    implements FlatMapFunction<OpMessage<InT>, OpMessage<OutT>>,
+    implements AsyncFlatMapFunction<OpMessage<InT>, OpMessage<OutT>>,
         WatermarkFunction<OpMessage<OutT>>,
         ScheduledFunction<KeyedTimerData<K>, OpMessage<OutT>>,
         Serializable {
@@ -46,12 +52,13 @@ public class OpAdapter<InT, OutT, K>
 
   private final Op<InT, OutT, K> op;
   private transient List<OpMessage<OutT>> outputList;
+  private transient CompletionStage<Collection<OpMessage<OutT>>> outputFuture;
   private transient Instant outputWatermark;
   private transient OpEmitter<OutT> emitter;
   private transient Config config;
   private transient Context context;
 
-  public static <InT, OutT, K> FlatMapFunction<OpMessage<InT>, OpMessage<OutT>> adapt(
+  public static <InT, OutT, K> AsyncFlatMapFunction<OpMessage<InT>, OpMessage<OutT>> adapt(
       Op<InT, OutT, K> op) {
     return new OpAdapter<>(op);
   }
@@ -76,7 +83,7 @@ public class OpAdapter<InT, OutT, K>
   }
 
   @Override
-  public Collection<OpMessage<OutT>> apply(OpMessage<InT> message) {
+  public synchronized CompletionStage<Collection<OpMessage<OutT>>> apply(OpMessage<InT> message) {
     assert outputList.isEmpty();
 
     try {
@@ -99,13 +106,26 @@ public class OpAdapter<InT, OutT, K>
       throw UserCodeException.wrap(e);
     }
 
-    final List<OpMessage<OutT>> results = new ArrayList<>(outputList);
+    CompletionStage<Collection<OpMessage<OutT>>> resultFuture =
+        CompletableFuture.completedFuture(new ArrayList<>(outputList));
+
+    if (outputFuture != null) {
+      resultFuture =
+          resultFuture.thenCombine(
+              outputFuture,
+              (res1, res2) -> {
+                res1.addAll(res2);
+                return res1;
+              });
+    }
+
     outputList.clear();
-    return results;
+    outputFuture = null;
+    return resultFuture;
   }
 
   @Override
-  public Collection<OpMessage<OutT>> processWatermark(long time) {
+  public synchronized Collection<OpMessage<OutT>> processWatermark(long time) {
     assert outputList.isEmpty();
 
     try {
@@ -122,12 +142,13 @@ public class OpAdapter<InT, OutT, K>
   }
 
   @Override
-  public Long getOutputWatermark() {
+  public synchronized Long getOutputWatermark() {
     return outputWatermark != null ? outputWatermark.getMillis() : null;
   }
 
   @Override
-  public Collection<OpMessage<OutT>> onCallback(KeyedTimerData<K> keyedTimerData, long time) {
+  public synchronized Collection<OpMessage<OutT>> onCallback(
+      KeyedTimerData<K> keyedTimerData, long time) {
     assert outputList.isEmpty();
 
     try {
@@ -154,6 +175,13 @@ public class OpAdapter<InT, OutT, K>
     }
 
     @Override
+    public void emitFuture(CompletionStage<Collection<WindowedValue<OutT>>> resultFuture) {
+      outputFuture =
+          resultFuture.thenApply(
+              res -> res.stream().map(OpMessage::ofElement).collect(Collectors.toList()));
+    }
+
+    @Override
     public void emitWatermark(Instant watermark) {
       outputWatermark = watermark;
     }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpEmitter.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpEmitter.java
index d1e1f06..951f5df 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpEmitter.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpEmitter.java
@@ -17,11 +17,16 @@
  */
 package org.apache.beam.runners.samza.runtime;
 
+import java.util.Collection;
+import java.util.concurrent.CompletionStage;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.joda.time.Instant;
 
 /** Output emitter for Samza {@link Op}. */
 public interface OpEmitter<OutT> {
+
+  void emitFuture(CompletionStage<Collection<WindowedValue<OutT>>> resultFuture);
+
   void emitElement(WindowedValue<OutT> element);
 
   void emitWatermark(Instant watermark);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OutputManagerFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OutputManagerFactory.java
index e404c5f..5d4047d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OutputManagerFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OutputManagerFactory.java
@@ -23,4 +23,9 @@ import org.apache.beam.runners.core.DoFnRunners;
 /** Factory class to create {@link DoFnRunners.OutputManager}. */
 public interface OutputManagerFactory<OutT> extends Serializable {
   DoFnRunners.OutputManager create(OpEmitter<OutT> emitter);
+
+  default DoFnRunners.OutputManager create(
+      OpEmitter<OutT> emitter, FutureCollector<OutT> collector) {
+    return create(emitter);
+  }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index a388c25..cd30d22 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -89,14 +89,14 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
       new ThreadLocal<>();
 
   // the stores include both beamStore for system states as well as stores for user state
-  private final Map<String, KeyValueStore<ByteArray, byte[]>> stores;
+  private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
   private final K key;
   private final byte[] keyBytes;
   private final int batchGetSize;
   private final String stageId;
 
   private SamzaStoreStateInternals(
-      Map<String, KeyValueStore<ByteArray, byte[]>> stores,
+      Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
       @Nullable K key,
       byte @Nullable [] keyBytes,
       String stageId,
@@ -109,21 +109,23 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
   }
 
   @SuppressWarnings("unchecked")
-  static KeyValueStore<ByteArray, byte[]> getBeamStore(TaskContext context) {
-    return (KeyValueStore<ByteArray, byte[]>) context.getStore(SamzaStoreStateInternals.BEAM_STORE);
+  static KeyValueStore<ByteArray, StateValue<?>> getBeamStore(TaskContext context) {
+    return (KeyValueStore<ByteArray, StateValue<?>>)
+        context.getStore(SamzaStoreStateInternals.BEAM_STORE);
   }
 
-  static Factory createStateInternalFactory(
+  @SuppressWarnings("unchecked")
+  static <K> Factory<K> createStateInternalFactory(
       String id,
-      Coder<?> keyCoder,
+      Coder<K> keyCoder,
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
       DoFnSignature signature) {
     final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
-    final Map<String, KeyValueStore<ByteArray, byte[]>> stores = new HashMap<>();
+    final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores = new HashMap<>();
     stores.put(BEAM_STORE, getBeamStore(context));
 
-    final Coder stateKeyCoder;
+    final Coder<K> stateKeyCoder;
     if (keyCoder != null) {
       signature
           .stateDeclarations()
@@ -131,10 +133,11 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
           .forEach(
               stateId ->
                   stores.put(
-                      stateId, (KeyValueStore<ByteArray, byte[]>) context.getStore(stateId)));
+                      stateId,
+                      (KeyValueStore<ByteArray, StateValue<?>>) context.getStore(stateId)));
       stateKeyCoder = keyCoder;
     } else {
-      stateKeyCoder = VoidCoder.of();
+      stateKeyCoder = (Coder<K>) VoidCoder.of();
     }
     return new Factory<>(Objects.toString(id), stores, stateKeyCoder, batchGetSize);
   }
@@ -227,13 +230,13 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
   /** Factory class to create {@link SamzaStoreStateInternals}. */
   public static class Factory<K> implements StateInternalsFactory<K> {
     private final String stageId;
-    private final Map<String, KeyValueStore<ByteArray, byte[]>> stores;
+    private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
     private final Coder<K> keyCoder;
     private final int batchGetSize;
 
     public Factory(
         String stageId,
-        Map<String, KeyValueStore<ByteArray, byte[]>> stores,
+        Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
         Coder<K> keyCoder,
         int batchGetSize) {
       this.stageId = stageId;
@@ -270,34 +273,28 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
   }
 
   private abstract class AbstractSamzaState<T> {
-    private final Coder<T> coder;
-    private final byte[] encodedStoreKey;
-    private final String namespace;
-    protected final KeyValueStore<ByteArray, byte[]> store;
+    private final StateNamespace namespace;
+    private final String addressId;
+    private final boolean isBeamStore;
+    private final String stageId;
+    private final byte[] keyBytes;
+    private byte[] encodedStoreKey;
+    protected final Coder<T> coder;
+    protected final KeyValueStore<ByteArray, StateValue<T>> store;
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
     protected AbstractSamzaState(
         StateNamespace namespace, StateTag<? extends State> address, Coder<T> coder) {
       this.coder = coder;
-      this.namespace = namespace.stringKey();
-
-      final KeyValueStore<ByteArray, byte[]> userStore = stores.get(address.getId());
-      this.store = userStore != null ? userStore : stores.get(BEAM_STORE);
-
-      final ByteArrayOutputStream baos = getThreadLocalBaos();
-      try (DataOutputStream dos = new DataOutputStream(baos)) {
-        dos.write(keyBytes);
-        dos.writeUTF(namespace.stringKey());
-
-        if (userStore == null) {
-          // for system state, we need to differentiate based on the following:
-          dos.writeUTF(stageId);
-          dos.writeUTF(address.getId());
-        }
-      } catch (IOException e) {
-        throw new RuntimeException(
-            "Could not encode full address for state: " + address.getId(), e);
-      }
-      this.encodedStoreKey = baos.toByteArray();
+      this.namespace = namespace;
+      this.addressId = address.getId();
+      this.isBeamStore = !stores.containsKey(address.getId());
+      this.store =
+          isBeamStore
+              ? (KeyValueStore) stores.get(BEAM_STORE)
+              : (KeyValueStore) stores.get(address.getId());
+      this.stageId = SamzaStoreStateInternals.this.stageId;
+      this.keyBytes = SamzaStoreStateInternals.this.keyBytes;
     }
 
     protected void clearInternal() {
@@ -305,12 +302,12 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
     }
 
     protected void writeInternal(T value) {
-      store.put(getEncodedStoreKey(), encodeValue(value));
+      store.put(getEncodedStoreKey(), StateValue.of(value, coder));
     }
 
     protected T readInternal() {
-      final byte[] valueBytes = store.get(getEncodedStoreKey());
-      return decodeValue(valueBytes);
+      final StateValue<T> stateValue = store.get(getEncodedStoreKey());
+      return decodeValue(stateValue);
     }
 
     protected ReadableState<Boolean> isEmptyInternal() {
@@ -328,32 +325,31 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
     }
 
     protected ByteArray getEncodedStoreKey() {
-      return ByteArray.of(encodedStoreKey);
+      return ByteArray.of(getEncodedStoreKeyBytes());
     }
 
     protected byte[] getEncodedStoreKeyBytes() {
-      return encodedStoreKey;
-    }
-
-    protected byte[] encodeValue(T value) {
-      final ByteArrayOutputStream baos = getThreadLocalBaos();
-      try {
-        coder.encode(value, baos);
-      } catch (IOException e) {
-        throw new RuntimeException("Could not encode state value: " + value, e);
-      }
-      return baos.toByteArray();
-    }
-
-    protected T decodeValue(byte[] valueBytes) {
-      if (valueBytes != null) {
-        try {
-          return coder.decode(new ByteArrayInputStream(valueBytes));
+      if (encodedStoreKey == null) {
+        final ByteArrayOutputStream baos = getThreadLocalBaos();
+        try (DataOutputStream dos = new DataOutputStream(baos)) {
+          dos.write(keyBytes);
+          dos.writeUTF(namespace.stringKey());
+
+          if (isBeamStore) {
+            // for system state, we need to differentiate based on the following:
+            dos.writeUTF(stageId);
+            dos.writeUTF(addressId);
+          }
         } catch (IOException e) {
-          throw new RuntimeException("Could not decode state", e);
+          throw new RuntimeException("Could not encode full address for state: " + addressId, e);
         }
+        this.encodedStoreKey = baos.toByteArray();
       }
-      return null;
+      return encodedStoreKey;
+    }
+
+    protected T decodeValue(StateValue<T> stateValue) {
+      return stateValue == null ? null : stateValue.getValue(coder);
     }
 
     @Override
@@ -367,13 +363,20 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
 
       @SuppressWarnings("unchecked")
       final AbstractSamzaState<?> that = (AbstractSamzaState<?>) o;
-      return Arrays.equals(encodedStoreKey, that.encodedStoreKey);
+      if (isBeamStore || that.isBeamStore) {
+        if (!isBeamStore || !that.isBeamStore || !stageId.equals(that.stageId)) {
+          return false;
+        }
+      }
+      return Arrays.equals(keyBytes, that.keyBytes)
+          && addressId.equals(that.addressId)
+          && this.namespace.equals(that.namespace);
     }
 
     @Override
     public int hashCode() {
       int result = namespace.hashCode();
-      result = 31 * result + Arrays.hashCode(encodedStoreKey);
+      result = 31 * result + Arrays.hashCode(getEncodedStoreKeyBytes());
       return result;
     }
   }
@@ -417,8 +420,8 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
       synchronized (store) {
         final int size = getSize();
         final ByteArray encodedKey = encodeKey(size);
-        store.put(encodedKey, encodeValue(value));
-        store.put(getEncodedStoreKey(), Ints.toByteArray(size + 1));
+        store.put(encodedKey, StateValue.of(value, coder));
+        store.put(getEncodedStoreKey(), StateValue.of(Ints.toByteArray(size + 1)));
       }
     }
 
@@ -476,8 +479,10 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
     }
 
     private int getSize() {
-      final byte[] sizeBytes = store.get(getEncodedStoreKey());
-      return sizeBytes == null ? 0 : Ints.fromByteArray(sizeBytes);
+      final StateValue stateSize = store.get(getEncodedStoreKey());
+      return (stateSize == null || stateSize.valueBytes == null)
+          ? 0
+          : Ints.fromByteArray(stateSize.valueBytes);
     }
 
     private ByteArray encodeKey(int size) {
@@ -589,7 +594,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
 
     private final Coder<KeyT> keyCoder;
     private final int storeKeySize;
-    private final List<KeyValueIterator<ByteArray, byte[]>> openIterators =
+    private final List<KeyValueIterator<ByteArray, StateValue<ValueT>>> openIterators =
         Collections.synchronizedList(new ArrayList<>());
 
     private int maxKeySize;
@@ -611,7 +616,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
     public void put(KeyT key, ValueT value) {
       final ByteArray encodedKey = encodeKey(key);
       maxKeySize = Math.max(maxKeySize, encodedKey.getValue().length);
-      store.put(encodedKey, encodeValue(value));
+      store.put(encodedKey, StateValue.of(value, coder));
     }
 
     @Override
@@ -687,7 +692,8 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
     @Override
     public ReadableState<Iterator<Map.Entry<KeyT, ValueT>>> readIterator() {
       final ByteArray maxKey = createMaxKey();
-      final KeyValueIterator<ByteArray, byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
+      final KeyValueIterator<ByteArray, StateValue<ValueT>> kvIter =
+          store.range(getEncodedStoreKey(), maxKey);
       openIterators.add(kvIter);
 
       return new ReadableState<Iterator<Map.Entry<KeyT, ValueT>>>() {
@@ -707,7 +713,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
 
             @Override
             public Map.Entry<KeyT, ValueT> next() {
-              Entry<ByteArray, byte[]> entry = kvIter.next();
+              Entry<ByteArray, StateValue<ValueT>> entry = kvIter.next();
               return new AbstractMap.SimpleEntry<>(
                   decodeKey(entry.getKey()), decodeValue(entry.getValue()));
             }
@@ -726,16 +732,19 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
      * properly, we need to load the content into memory.
      */
     private <OutputT> Iterable<OutputT> createIterable(
-        SerializableFunction<org.apache.samza.storage.kv.Entry<ByteArray, byte[]>, OutputT> fn) {
+        SerializableFunction<
+                org.apache.samza.storage.kv.Entry<ByteArray, StateValue<ValueT>>, OutputT>
+            fn) {
       final ByteArray maxKey = createMaxKey();
-      final KeyValueIterator<ByteArray, byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
-      final List<Entry<ByteArray, byte[]>> iterable = ImmutableList.copyOf(kvIter);
+      final KeyValueIterator<ByteArray, StateValue<ValueT>> kvIter =
+          store.range(getEncodedStoreKey(), maxKey);
+      final List<Entry<ByteArray, StateValue<ValueT>>> iterable = ImmutableList.copyOf(kvIter);
       kvIter.close();
 
       return new Iterable<OutputT>() {
         @Override
         public Iterator<OutputT> iterator() {
-          final Iterator<Entry<ByteArray, byte[]>> iter = iterable.iterator();
+          final Iterator<Entry<ByteArray, StateValue<ValueT>>> iter = iterable.iterator();
 
           return new Iterator<OutputT>() {
             @Override
@@ -755,7 +764,8 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
     @Override
     public void clear() {
       final ByteArray maxKey = createMaxKey();
-      final KeyValueIterator<ByteArray, byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
+      final KeyValueIterator<ByteArray, StateValue<ValueT>> kvIter =
+          store.range(getEncodedStoreKey(), maxKey);
       while (kvIter.hasNext()) {
         store.delete(kvIter.next().getKey());
       }
@@ -975,4 +985,76 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
       }
     }
   }
+
+  /**
+   * Wrapper for state value so that unencoded value can be read directly from the cache of
+   * KeyValueStore.
+   */
+  public static class StateValue<T> implements Serializable {
+    private T value;
+    private Coder<T> valueCoder;
+    private byte[] valueBytes;
+
+    private StateValue(T value, Coder<T> valueCoder, byte[] valueBytes) {
+      this.value = value;
+      this.valueCoder = valueCoder;
+      this.valueBytes = valueBytes;
+    }
+
+    public static <T> StateValue<T> of(T value, Coder<T> valueCoder) {
+      return new StateValue<>(value, valueCoder, null);
+    }
+
+    public static <T> StateValue<T> of(byte[] valueBytes) {
+      return new StateValue<>(null, null, valueBytes);
+    }
+
+    public T getValue(Coder<T> coder) {
+      if (value == null && valueBytes != null) {
+        if (valueCoder == null) {
+          valueCoder = coder;
+        }
+        try {
+          value = valueCoder.decode(new ByteArrayInputStream(valueBytes));
+        } catch (IOException e) {
+          throw new RuntimeException("Could not decode state", e);
+        }
+      }
+      return value;
+    }
+
+    public byte[] getValueBytes() {
+      if (valueBytes == null && value != null) {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try {
+          valueCoder.encode(value, baos);
+        } catch (IOException e) {
+          throw new RuntimeException("Could not encode state value: " + value, e);
+        }
+        valueBytes = baos.toByteArray();
+      }
+      return valueBytes;
+    }
+  }
+
+  /** Factory class to provide {@link StateValueSerdeFactory.StateValueSerde}. */
+  public static class StateValueSerdeFactory implements SerdeFactory<StateValue<?>> {
+    @Override
+    public Serde<StateValue<?>> getSerde(String name, Config config) {
+      return new StateValueSerde();
+    }
+
+    /** Serde for {@link StateValue}. */
+    public static class StateValueSerde implements Serde<StateValue<?>> {
+      @Override
+      public StateValue<?> fromBytes(byte[] bytes) {
+        return StateValue.of(bytes);
+      }
+
+      @Override
+      public byte[] toBytes(StateValue<?> stateValue) {
+        return stateValue == null ? null : stateValue.getValueBytes();
+      }
+    }
+  }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index 1fc0324..ad426f2 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -37,6 +37,7 @@ import org.apache.beam.runners.core.TimerInternalsFactory;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.runners.samza.SamzaRunner;
 import org.apache.beam.runners.samza.state.SamzaMapState;
+import org.apache.beam.runners.samza.state.SamzaSetState;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -64,8 +65,7 @@ import org.slf4j.LoggerFactory;
 })
 public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
   private static final Logger LOG = LoggerFactory.getLogger(SamzaTimerInternalsFactory.class);
-
-  private final NavigableSet<KeyedTimerData<K>> eventTimeTimers;
+  private final NavigableSet<KeyedTimerData<K>> eventTimeBuffer;
   private final Coder<K> keyCoder;
   private final Scheduler<KeyedTimerData<K>> timerRegistry;
   private final SamzaTimerState state;
@@ -74,16 +74,31 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
   private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
   private Instant outputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
+  // Size of each event timer is around 200B, by default with buffer size 50k, the default size is
+  // 10M
+  private final int maxEventTimerBufferSize;
+  // Max event time stored in eventTimerBuffer
+  // If it is set to long.MAX_VALUE, it indicates the State does not contain any KeyedTimerData
+  private long maxEventTimeInBuffer;
+
+  // The maximum number of ready timers to process at once per watermark.
+  private final long maxReadyTimersToProcessOnce;
+
   private SamzaTimerInternalsFactory(
       Coder<K> keyCoder,
       Scheduler<KeyedTimerData<K>> timerRegistry,
       String timerStateId,
       SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
       Coder<BoundedWindow> windowCoder,
-      IsBounded isBounded) {
+      IsBounded isBounded,
+      SamzaPipelineOptions pipelineOptions) {
     this.keyCoder = keyCoder;
     this.timerRegistry = timerRegistry;
-    this.eventTimeTimers = new TreeSet<>();
+    this.eventTimeBuffer = new TreeSet<>();
+    this.maxEventTimerBufferSize =
+        pipelineOptions.getEventTimerBufferSize(); // must be placed before state initialization
+    this.maxEventTimeInBuffer = Long.MAX_VALUE;
+    this.maxReadyTimersToProcessOnce = pipelineOptions.getMaxReadyTimersToProcessOnce();
     this.state = new SamzaTimerState(timerStateId, nonKeyedStateInternalsFactory, windowCoder);
     this.isBounded = isBounded;
   }
@@ -105,7 +120,8 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
         timerStateId,
         nonKeyedStateInternalsFactory,
         windowCoder,
-        isBounded);
+        isBounded,
+        pipelineOptions);
   }
 
   @Override
@@ -152,16 +168,37 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
     outputWatermark = watermark;
   }
 
+  /**
+   * The method is called when watermark comes. It compares timers in memory buffer with watermark
+   * to prepare ready timers. When memory buffer is empty, it asks store to reload timers into
+   * buffer. note that the number of timers returned may be larger than memory buffer size.
+   *
+   * @return a collection of ready timers to be fired
+   */
   public Collection<KeyedTimerData<K>> removeReadyTimers() {
     final Collection<KeyedTimerData<K>> readyTimers = new ArrayList<>();
 
-    while (!eventTimeTimers.isEmpty()
-        && eventTimeTimers.first().getTimerData().getTimestamp().isBefore(inputWatermark)) {
-      final KeyedTimerData<K> keyedTimerData = eventTimeTimers.pollFirst();
+    while (!eventTimeBuffer.isEmpty()
+        && eventTimeBuffer.first().getTimerData().getTimestamp().isBefore(inputWatermark)
+        && readyTimers.size() < maxReadyTimersToProcessOnce) {
+
+      final KeyedTimerData<K> keyedTimerData = eventTimeBuffer.pollFirst();
       readyTimers.add(keyedTimerData);
       state.deletePersisted(keyedTimerData);
+
+      if (eventTimeBuffer.isEmpty()) {
+        state.reloadEventTimeTimers();
+      }
     }
+    LOG.debug("Removed {} ready timers", readyTimers.size());
 
+    if (readyTimers.size() == maxReadyTimersToProcessOnce
+        && !eventTimeBuffer.isEmpty()
+        && eventTimeBuffer.first().getTimerData().getTimestamp().isBefore(inputWatermark)) {
+      LOG.warn(
+          "Loaded {} expired timers, the remaining will be processed at next watermark.",
+          maxReadyTimersToProcessOnce);
+    }
     return readyTimers;
   }
 
@@ -177,6 +214,11 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
     return outputWatermark;
   }
 
+  // for unit test only
+  NavigableSet<KeyedTimerData<K>> getEventTimeBuffer() {
+    return eventTimeBuffer;
+  }
+
   private class SamzaTimerInternals implements TimerInternals {
     private final byte[] keyBytes;
     private final K key;
@@ -202,13 +244,17 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
     public void setTimer(TimerData timerData) {
       if (isBounded == IsBounded.UNBOUNDED
           && timerData.getTimestamp().getMillis()
-              >= GlobalWindow.INSTANCE.maxTimestamp().getMillis()) {
-        // No need to register a timer of max timestamp if the input is unbounded
+              > GlobalWindow.INSTANCE.maxTimestamp().getMillis()) {
+        // No need to register a timer greater than maxTimestamp if the input is unbounded.
+        // 1. It will ignore timers with (maxTimestamp + 1) created by stateful ParDo with global
+        // window.
+        // 2. It will register timers with maxTimestamp so that global window can be closed
+        // correctly when max watermark comes.
         return;
       }
 
       final KeyedTimerData<K> keyedTimerData = new KeyedTimerData<>(keyBytes, key, timerData);
-      if (eventTimeTimers.contains(keyedTimerData)) {
+      if (eventTimeBuffer.contains(keyedTimerData)) {
         return;
       }
 
@@ -230,9 +276,32 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
         // persist it first
         state.persist(keyedTimerData);
 
+        // TO-DO: apply the same memory optimization over processing timers
         switch (timerData.getDomain()) {
           case EVENT_TIME:
-            eventTimeTimers.add(keyedTimerData);
+            /**
+             * To determine if the upcoming KeyedTimerData could be added to the Buffer while
+             * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
+             * timestamp eviction priority:
+             *
+             * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is
+             * empty, therefore all the Event times greater or lesser than newTimestamp are in the
+             * buffer;
+             *
+             * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries
+             * greater than newTimestamp, so it is safe to add it to the buffer
+             *
+             * <p>In case that the Buffer is full, we remove the largest timer from memory according
+             * to {@link KeyedTimerData.compareTo()}
+             */
+            if (newTimestamp < maxEventTimeInBuffer) {
+              eventTimeBuffer.add(keyedTimerData);
+              if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
+                eventTimeBuffer.pollLast();
+                maxEventTimeInBuffer =
+                    eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
+              }
+            }
             break;
 
           case PROCESSING_TIME:
@@ -272,7 +341,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
 
       switch (timerData.getDomain()) {
         case EVENT_TIME:
-          eventTimeTimers.remove(keyedTimerData);
+          eventTimeBuffer.remove(keyedTimerData);
           break;
 
         case PROCESSING_TIME:
@@ -281,7 +350,8 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
 
         default:
           throw new UnsupportedOperationException(
-              String.format("%s currently only supports event time", SamzaRunner.class));
+              String.format(
+                  "%s currently only supports event time or processing time", SamzaRunner.class));
       }
     }
 
@@ -309,15 +379,16 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
   }
 
   private class SamzaTimerState {
-    private final SamzaMapState<TimerKey<K>, Long> eventTimerTimerState;
-    private final SamzaMapState<TimerKey<K>, Long> processingTimerTimerState;
+    private final SamzaMapState<TimerKey<K>, Long> eventTimeTimerState;
+    private final SamzaSetState<KeyedTimerData<K>> timestampSortedEventTimeTimerState;
+    private final SamzaMapState<TimerKey<K>, Long> processingTimeTimerState;
 
     SamzaTimerState(
         String timerStateId,
         SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
         Coder<BoundedWindow> windowCoder) {
 
-      this.eventTimerTimerState =
+      this.eventTimeTimerState =
           (SamzaMapState<TimerKey<K>, Long>)
               nonKeyedStateInternalsFactory
                   .stateInternalsForKey(null)
@@ -328,7 +399,17 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
                           new TimerKeyCoder<>(keyCoder, windowCoder),
                           VarLongCoder.of()));
 
-      this.processingTimerTimerState =
+      this.timestampSortedEventTimeTimerState =
+          (SamzaSetState<KeyedTimerData<K>>)
+              nonKeyedStateInternalsFactory
+                  .stateInternalsForKey(null)
+                  .state(
+                      StateNamespaces.global(),
+                      StateTags.set(
+                          timerStateId + "-ts",
+                          new KeyedTimerData.KeyedTimerDataCoder<>(keyCoder, windowCoder)));
+
+      this.processingTimeTimerState =
           (SamzaMapState<TimerKey<K>, Long>)
               nonKeyedStateInternalsFactory
                   .stateInternalsForKey(null)
@@ -339,17 +420,17 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
                           new TimerKeyCoder<>(keyCoder, windowCoder),
                           VarLongCoder.of()));
 
-      restore();
+      init();
     }
 
     Long get(KeyedTimerData<K> keyedTimerData) {
       final TimerKey<K> timerKey = TimerKey.of(keyedTimerData);
       switch (keyedTimerData.getTimerData().getDomain()) {
         case EVENT_TIME:
-          return eventTimerTimerState.get(timerKey).read();
+          return eventTimeTimerState.get(timerKey).read();
 
         case PROCESSING_TIME:
-          return processingTimerTimerState.get(timerKey).read();
+          return processingTimeTimerState.get(timerKey).read();
 
         default:
           throw new UnsupportedOperationException(
@@ -361,18 +442,29 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
       final TimerKey<K> timerKey = TimerKey.of(keyedTimerData);
       switch (keyedTimerData.getTimerData().getDomain()) {
         case EVENT_TIME:
-          eventTimerTimerState.put(
+          final Long timestamp = eventTimeTimerState.get(timerKey).read();
+
+          if (timestamp != null) {
+            final KeyedTimerData keyedTimerDataInStore =
+                TimerKey.toKeyedTimerData(timerKey, timestamp, TimeDomain.EVENT_TIME, keyCoder);
+            timestampSortedEventTimeTimerState.remove(keyedTimerDataInStore);
+          }
+          eventTimeTimerState.put(
               timerKey, keyedTimerData.getTimerData().getTimestamp().getMillis());
+
+          timestampSortedEventTimeTimerState.add(keyedTimerData);
+
           break;
 
         case PROCESSING_TIME:
-          processingTimerTimerState.put(
+          processingTimeTimerState.put(
               timerKey, keyedTimerData.getTimerData().getTimestamp().getMillis());
           break;
 
         default:
           throw new UnsupportedOperationException(
-              String.format("%s currently only supports event time", SamzaRunner.class));
+              String.format(
+                  "%s currently only supports event time or processing time", SamzaRunner.class));
       }
     }
 
@@ -380,38 +472,52 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
       final TimerKey<K> timerKey = TimerKey.of(keyedTimerData);
       switch (keyedTimerData.getTimerData().getDomain()) {
         case EVENT_TIME:
-          eventTimerTimerState.remove(timerKey);
+          eventTimeTimerState.remove(timerKey);
+          timestampSortedEventTimeTimerState.remove(keyedTimerData);
           break;
 
         case PROCESSING_TIME:
-          processingTimerTimerState.remove(timerKey);
+          processingTimeTimerState.remove(timerKey);
           break;
 
         default:
           throw new UnsupportedOperationException(
-              String.format("%s currently only supports event time", SamzaRunner.class));
+              String.format(
+                  "%s currently only supports event time or processing time", SamzaRunner.class));
       }
     }
 
-    private void loadEventTimeTimers() {
-      final Iterator<Map.Entry<TimerKey<K>, Long>> iter =
-          eventTimerTimerState.readIterator().read();
-      // since the iterator will reach to the end, it will be closed automatically
-      while (iter.hasNext()) {
-        final Map.Entry<TimerKey<K>, Long> entry = iter.next();
-        final KeyedTimerData keyedTimerData =
-            TimerKey.toKeyedTimerData(
-                entry.getKey(), entry.getValue(), TimeDomain.EVENT_TIME, keyCoder);
+    /**
+     * Reload event time timers from state to memory buffer. Buffer size is bound by
+     * maxEventTimerBufferSize
+     */
+    private void reloadEventTimeTimers() {
+      final Iterator<KeyedTimerData<K>> iter =
+          timestampSortedEventTimeTimerState.readIterator().read();
 
-        eventTimeTimers.add(keyedTimerData);
+      while (iter.hasNext() && eventTimeBuffer.size() < maxEventTimerBufferSize) {
+        final KeyedTimerData<K> keyedTimerData = iter.next();
+        eventTimeBuffer.add(keyedTimerData);
+        maxEventTimeInBuffer = keyedTimerData.getTimerData().getTimestamp().getMillis();
       }
 
-      LOG.info("Loaded {} event time timers in memory", eventTimeTimers.size());
+      ((SamzaStoreStateInternals.KeyValueIteratorState) timestampSortedEventTimeTimerState)
+          .closeIterators();
+      LOG.info("Loaded {} event time timers in memory", eventTimeBuffer.size());
+
+      if (eventTimeBuffer.size() < maxEventTimerBufferSize) {
+        LOG.debug(
+            "Event time timers in State is empty, filled {} timers out of {} buffer capacity",
+            eventTimeBuffer.size(),
+            maxEventTimeInBuffer);
+        // Reset the flag variable to indicate there are no more KeyedTimerData in State
+        maxEventTimeInBuffer = Long.MAX_VALUE;
+      }
     }
 
     private void loadProcessingTimeTimers() {
       final Iterator<Map.Entry<TimerKey<K>, Long>> iter =
-          processingTimerTimerState.readIterator().read();
+          processingTimeTimerState.readIterator().read();
       // since the iterator will reach to the end, it will be closed automatically
       int count = 0;
       while (iter.hasNext()) {
@@ -424,12 +530,41 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
             keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
         ++count;
       }
+      ((SamzaStoreStateInternals.KeyValueIteratorState) processingTimeTimerState).closeIterators();
 
       LOG.info("Loaded {} processing time timers in memory", count);
     }
 
-    private void restore() {
-      loadEventTimeTimers();
+    /**
+     * Restore timer state from RocksDB. This is needed for migration of existing jobs. Give events
+     * in eventTimeTimerState, construct timestampSortedEventTimeTimerState preparing for memory
+     * reloading. TO-DO: processing time timers are still loaded into memory in one shot; will apply
+     * the same optimization mechanism as event time timer
+     */
+    private void init() {
+      final Iterator<Map.Entry<TimerKey<K>, Long>> eventTimersIter =
+          eventTimeTimerState.readIterator().read();
+      // use hasNext to check empty, because this is relatively cheap compared with Iterators.size()
+      if (eventTimersIter.hasNext()) {
+        final Iterator sortedEventTimerIter =
+            timestampSortedEventTimeTimerState.readIterator().read();
+
+        if (!sortedEventTimerIter.hasNext()) {
+          // inline the migration code
+          while (eventTimersIter.hasNext()) {
+            final Map.Entry<TimerKey<K>, Long> entry = eventTimersIter.next();
+            final KeyedTimerData keyedTimerData =
+                TimerKey.toKeyedTimerData(
+                    entry.getKey(), entry.getValue(), TimeDomain.EVENT_TIME, keyCoder);
+            timestampSortedEventTimeTimerState.add(keyedTimerData);
+          }
+        }
+        ((SamzaStoreStateInternals.KeyValueIteratorState) timestampSortedEventTimeTimerState)
+            .closeIterators();
+      }
+      ((SamzaStoreStateInternals.KeyValueIteratorState) eventTimeTimerState).closeIterators();
+
+      reloadEventTimeTimers();
       loadProcessingTimeTimers();
     }
   }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
index b8cfa27..08bef16 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
@@ -22,9 +22,10 @@ import static org.apache.samza.config.JobConfig.JOB_ID;
 import static org.apache.samza.config.JobConfig.JOB_NAME;
 import static org.apache.samza.config.TaskConfig.COMMIT_MS;
 import static org.apache.samza.config.TaskConfig.GROUPER_FACTORY;
+import static org.apache.samza.config.TaskConfig.MAX_CONCURRENCY;
 
 import java.io.File;
-import java.net.URI;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -34,21 +35,21 @@ import org.apache.beam.runners.core.serialization.Base64Serializer;
 import org.apache.beam.runners.samza.SamzaExecutionEnvironment;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.runners.samza.container.BeamContainerRunner;
+import org.apache.beam.runners.samza.container.BeamJobCoordinatorRunner;
 import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigFactory;
+import org.apache.samza.config.ConfigLoaderFactory;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ZkConfig;
-import org.apache.samza.config.factories.PropertiesConfigFactory;
+import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.job.yarn.YarnJobFactory;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.runtime.RemoteApplicationRunner;
-import org.apache.samza.serializers.ByteSerdeFactory;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 import org.apache.samza.zk.ZkJobCoordinatorFactory;
 import org.slf4j.Logger;
@@ -61,6 +62,7 @@ import org.slf4j.LoggerFactory;
 public class ConfigBuilder {
   private static final Logger LOG = LoggerFactory.getLogger(ConfigBuilder.class);
 
+  private static final String BEAM_STORE_FACTORY = "stores.beamStore.factory";
   private static final String APP_RUNNER_CLASS = "app.runner.class";
   private static final String YARN_PACKAGE_PATH = "yarn.package.path";
   private static final String JOB_FACTORY_CLASS = "job.factory.class";
@@ -80,10 +82,11 @@ public class ConfigBuilder {
     config.putAll(properties);
   }
 
+  /** @return built configuration */
   public Config build() {
     try {
       // apply framework configs
-      config.putAll(createSystemConfig(options));
+      config.putAll(createSystemConfig(options, config));
 
       // apply user configs
       config.putAll(createUserConfig(options));
@@ -92,7 +95,10 @@ public class ConfigBuilder {
       config.put(ApplicationConfig.APP_ID, options.getJobInstance());
       config.put(JOB_NAME, options.getJobName());
       config.put(JOB_ID, options.getJobInstance());
+      config.put(MAX_CONCURRENCY, String.valueOf(options.getMaxBundleSize()));
 
+      // remove config overrides before serialization (LISAMZA-15259)
+      options.setConfigOverride(new HashMap<>());
       config.put(
           "beamPipelineOptions",
           Base64Serializer.serializeUnchecked(new SerializablePipelineOptions(options)));
@@ -116,21 +122,21 @@ public class ConfigBuilder {
     if (StringUtils.isNoneEmpty(configFilePath)) {
       LOG.info("configFilePath: " + configFilePath);
 
-      final File configFile = new File(configFilePath);
-      final URI configUri = configFile.toURI();
-      final ConfigFactory configFactory =
-          options.getConfigFactory().getDeclaredConstructor().newInstance();
+      final Config properties = new MapConfig(Collections.singletonMap("path", configFilePath));
+      final ConfigLoaderFactory configLoaderFactory =
+          options.getConfigLoaderFactory().getDeclaredConstructor().newInstance();
 
-      LOG.info("configFactory: " + configFactory.getClass().getName());
+      LOG.info("configLoaderFactory: " + configLoaderFactory.getClass().getName());
 
       // Config file must exist for default properties config
       // TODO: add check to all non-empty files once we don't need to
       // pass the command-line args through the containers
-      if (configFactory instanceof PropertiesConfigFactory) {
-        checkArgument(configFile.exists(), "Config file %s does not exist", configFilePath);
+      if (configLoaderFactory instanceof PropertiesConfigLoaderFactory) {
+        checkArgument(
+            new File(configFilePath).exists(), "Config file %s does not exist", configFilePath);
       }
 
-      config.putAll(configFactory.getConfig(configUri));
+      config.putAll(configLoaderFactory.getLoader(properties).getConfig());
     }
     // Apply override on top
     if (options.getConfigOverride() != null) {
@@ -181,6 +187,7 @@ public class ConfigBuilder {
     final String appRunner = config.get(APP_RUNNER_CLASS);
     checkArgument(
         appRunner == null
+            || BeamJobCoordinatorRunner.class.getName().equals(appRunner)
             || RemoteApplicationRunner.class.getName().equals(appRunner)
             || BeamContainerRunner.class.getName().equals(appRunner),
         "Config %s must be set to %s for %s Deployment",
@@ -208,7 +215,7 @@ public class ConfigBuilder {
         .put(
             // TODO: remove after SAMZA-1531 is resolved
             ApplicationConfig.APP_RUN_ID,
-            String.valueOf(System.currentTimeMillis())
+            System.currentTimeMillis()
                 + "-"
                 // use the most significant bits in UUID (8 digits) to avoid collision
                 + UUID.randomUUID().toString().substring(0, 8))
@@ -231,23 +238,26 @@ public class ConfigBuilder {
         .build();
   }
 
-  private static Map<String, String> createSystemConfig(SamzaPipelineOptions options) {
-    ImmutableMap.Builder<String, String> configBuilder =
+  private static Map<String, String> createSystemConfig(
+      SamzaPipelineOptions options, Map<String, String> config) {
+    final ImmutableMap.Builder<String, String> configBuilder =
         ImmutableMap.<String, String>builder()
-            .put(
-                "stores.beamStore.factory",
-                "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory")
             .put("stores.beamStore.key.serde", "byteArraySerde")
-            .put("stores.beamStore.msg.serde", "byteSerde")
-            .put("serializers.registry.byteSerde.class", ByteSerdeFactory.class.getName())
+            .put("stores.beamStore.msg.serde", "stateValueSerde")
+            .put(
+                "serializers.registry.stateValueSerde.class",
+                SamzaStoreStateInternals.StateValueSerdeFactory.class.getName())
             .put(
                 "serializers.registry.byteArraySerde.class",
                 SamzaStoreStateInternals.ByteArraySerdeFactory.class.getName());
 
-    if (options.getStateDurable()) {
-      LOG.info("stateDurable is enabled");
-      configBuilder.put("stores.beamStore.changelog", getChangelogTopic(options, "beamStore"));
-      configBuilder.put("job.host-affinity.enabled", "true");
+    // if config does not contain "stores.beamStore.factory" at this moment,
+    // then it is a stateless job.
+    if (!config.containsKey(BEAM_STORE_FACTORY)) {
+      options.setStateDurable(false);
+      configBuilder.put(
+          BEAM_STORE_FACTORY,
+          "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory");
     }
 
     LOG.info("Execution environment is " + options.getSamzaExecutionEnvironment());
@@ -269,6 +279,23 @@ public class ConfigBuilder {
     return configBuilder.build();
   }
 
+  static Map<String, String> createRocksDBStoreConfig(SamzaPipelineOptions options) {
+    final ImmutableMap.Builder<String, String> configBuilder =
+        ImmutableMap.<String, String>builder()
+            .put(
+                BEAM_STORE_FACTORY,
+                "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory")
+            .put("stores.beamStore.rocksdb.compression", "lz4");
+
+    if (options.getStateDurable()) {
+      LOG.info("stateDurable is enabled");
+      configBuilder.put("stores.beamStore.changelog", getChangelogTopic(options, "beamStore"));
+      configBuilder.put("job.host-affinity.enabled", "true");
+    }
+
+    return configBuilder.build();
+  }
+
   private static void validateConfigs(SamzaPipelineOptions options, Map<String, String> config) {
 
     // validate execution environment
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
index d77bc1e..ea5f6d9 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.runners.samza.translation;
 
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -33,10 +35,12 @@ public class ConfigContext {
   private final Map<PValue, String> idMap;
   private AppliedPTransform<?, ?, ?> currentTransform;
   private final SamzaPipelineOptions options;
+  private final Set<String> stateIds;
 
   public ConfigContext(Map<PValue, String> idMap, SamzaPipelineOptions options) {
     this.idMap = idMap;
     this.options = options;
+    this.stateIds = new HashSet<>();
   }
 
   public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
@@ -60,6 +64,10 @@ public class ConfigContext {
     return this.options;
   }
 
+  public boolean addStateId(String stateId) {
+    return stateIds.add(stateId);
+  }
+
   private String getIdForPValue(PValue pvalue) {
     final String id = idMap.get(pvalue);
     if (id == null) {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
index 50e62a2..ec7af07 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
@@ -61,7 +61,7 @@ class FlattenPCollectionsTranslator<T> implements TransformTranslator<Flatten.PC
       // for some of the validateRunner tests only
       final MessageStream<OpMessage<T>> noOpStream =
           ctx.getDummyStream()
-              .flatMap(OpAdapter.adapt((Op<String, T, Void>) (inputElement, emitter) -> {}));
+              .flatMapAsync(OpAdapter.adapt((Op<String, T, Void>) (inputElement, emitter) -> {}));
       ctx.registerMessageStream(output, noOpStream);
       return;
     }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
index 6d2b2b6..4f3c409 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
@@ -19,12 +19,14 @@ package org.apache.beam.runners.samza.translation;
 
 import static org.apache.beam.runners.samza.util.SamzaPipelineTranslatorUtils.escape;
 
+import java.util.Map;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.runners.samza.runtime.DoFnOp;
 import org.apache.beam.runners.samza.runtime.GroupByKeyOp;
 import org.apache.beam.runners.samza.runtime.KvToKeyedWorkItemOp;
@@ -56,7 +58,9 @@ import org.apache.samza.serializers.KVSerde;
 @SuppressWarnings({"keyfor", "nullness"}) // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 class GroupByKeyTranslator<K, InputT, OutputT>
     implements TransformTranslator<
-        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
+            PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>>,
+        TransformConfigGenerator<
+            PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
 
   @Override
   public void translate(
@@ -111,6 +115,20 @@ class GroupByKeyTranslator<K, InputT, OutputT>
     doTranslatePortable(transform, pipeline, ctx);
   }
 
+  @Override
+  public Map<String, String> createConfig(
+      PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
+      TransformHierarchy.Node node,
+      ConfigContext ctx) {
+    return ConfigBuilder.createRocksDBStoreConfig(ctx.getPipelineOptions());
+  }
+
+  @Override
+  public Map<String, String> createPortableConfig(
+      PipelineNode.PTransformNode transform, SamzaPipelineOptions options) {
+    return ConfigBuilder.createRocksDBStoreConfig(options);
+  }
+
   private static <K, InputT, OutputT> void doTranslatePortable(
       PipelineNode.PTransformNode transform,
       QueryablePipeline pipeline,
@@ -193,8 +211,8 @@ class GroupByKeyTranslator<K, InputT, OutputT>
 
     final MessageStream<OpMessage<KV<K, OutputT>>> outputStream =
         partitionedInputStream
-            .flatMap(OpAdapter.adapt(new KvToKeyedWorkItemOp<>()))
-            .flatMap(
+            .flatMapAsync(OpAdapter.adapt(new KvToKeyedWorkItemOp<>()))
+            .flatMapAsync(
                 OpAdapter.adapt(
                     new GroupByKeyOp<>(
                         outputTag,
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index 2369f10..15f6f2f 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -176,7 +176,7 @@ class ParDoBoundMultiTranslator<InT, OutT>
     }
 
     final MessageStream<OpMessage<RawUnionValue>> taggedOutputStream =
-        mergedStreams.flatMap(OpAdapter.adapt(op));
+        mergedStreams.flatMapAsync(OpAdapter.adapt(op));
 
     for (int outputIndex : tagToIndexMap.values()) {
       @SuppressWarnings("unchecked")
@@ -186,7 +186,7 @@ class ParDoBoundMultiTranslator<InT, OutT>
                   message ->
                       message.getType() != OpMessage.Type.ELEMENT
                           || message.getElement().getValue().getUnionTag() == outputIndex)
-              .flatMap(OpAdapter.adapt(new RawUnionValueToValue()));
+              .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue()));
 
       ctx.registerMessageStream(indexToPCollectionMap.get(outputIndex), outputStream);
     }
@@ -218,12 +218,21 @@ class ParDoBoundMultiTranslator<InT, OutT>
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
+
     String inputId = stagePayload.getInput();
     final MessageStream<OpMessage<InT>> inputStream = ctx.getMessageStreamById(inputId);
+
     // TODO: support side input
+    if (!stagePayload.getSideInputsList().isEmpty()) {
+      throw new UnsupportedOperationException(
+          "Side inputs in portable pipelines are not supported in samza");
+    }
+
+    // set side inputs to empty until it's supported
     final List<MessageStream<OpMessage<InT>>> sideInputStreams = Collections.emptyList();
 
     final Map<TupleTag<?>, Integer> tagToIndexMap = new HashMap<>();
+    final Map<Integer, String> indexToIdMap = new HashMap<>();
     final Map<String, TupleTag<?>> idToTupleTagMap = new HashMap<>();
 
     // first output as the main output
@@ -238,19 +247,20 @@ class ParDoBoundMultiTranslator<InT, OutT>
             outputName -> {
               TupleTag<?> tupleTag = new TupleTag<>(outputName);
               tagToIndexMap.put(tupleTag, index.get());
-              index.incrementAndGet();
               String collectionId = outputs.get(outputName);
+              indexToIdMap.put(index.get(), collectionId);
               idToTupleTagMap.put(collectionId, tupleTag);
+              index.incrementAndGet();
             });
 
     WindowedValue.WindowedValueCoder<InT> windowedInputCoder =
         ctx.instantiateCoder(inputId, pipeline.getComponents());
 
-    final DoFnSchemaInformation doFnSchemaInformation;
-    doFnSchemaInformation = ParDoTranslation.getSchemaInformation(transform.getTransform());
-
-    Map<String, PCollectionView<?>> sideInputMapping =
-        ParDoTranslation.getSideInputMapping(transform.getTransform());
+    // TODO: support schema and side inputs for portable runner
+    // Note: transform.getTransform() is an ExecutableStage, not ParDo, so we need to extract
+    // these info from its components.
+    final DoFnSchemaInformation doFnSchemaInformation = null;
+    final Map<String, PCollectionView<?>> sideInputMapping = Collections.emptyMap();
 
     final RunnerApi.PCollection input = pipeline.getComponents().getPcollectionsOrThrow(inputId);
     final PCollection.IsBounded isBounded = SamzaPipelineTranslatorUtils.isBounded(input);
@@ -287,18 +297,19 @@ class ParDoBoundMultiTranslator<InT, OutT>
     }
 
     final MessageStream<OpMessage<RawUnionValue>> taggedOutputStream =
-        mergedStreams.flatMap(OpAdapter.adapt(op));
+        mergedStreams.flatMapAsync(OpAdapter.adapt(op));
 
     for (int outputIndex : tagToIndexMap.values()) {
+      @SuppressWarnings("unchecked")
       final MessageStream<OpMessage<OutT>> outputStream =
           taggedOutputStream
               .filter(
                   message ->
                       message.getType() != OpMessage.Type.ELEMENT
                           || message.getElement().getValue().getUnionTag() == outputIndex)
-              .flatMap(OpAdapter.adapt(new RawUnionValueToValue()));
+              .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue()));
 
-      ctx.registerMessageStream(ctx.getOutputId(transform), outputStream);
+      ctx.registerMessageStream(indexToIdMap.get(outputIndex), outputStream);
     }
   }
 
@@ -309,15 +320,29 @@ class ParDoBoundMultiTranslator<InT, OutT>
     final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
     final SamzaPipelineOptions options = ctx.getPipelineOptions();
 
+    // If a ParDo observes directly or indirectly with window, then this is a stateful ParDo
+    // in this case, we will use RocksDB as system store.
+    if (signature.processElement().observesWindow()) {
+      config.putAll(ConfigBuilder.createRocksDBStoreConfig(options));
+    }
+
     if (signature.usesState()) {
       // set up user state configs
       for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) {
         final String storeId = state.id();
+
+        // TODO: remove validation after we support same state id in different ParDo.
+        if (!ctx.addStateId(storeId)) {
+          throw new IllegalStateException(
+              "Duplicate StateId " + storeId + " found in multiple ParDo.");
+        }
+
         config.put(
             "stores." + storeId + ".factory",
             "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");
         config.put("stores." + storeId + ".key.serde", "byteArraySerde");
-        config.put("stores." + storeId + ".msg.serde", "byteSerde");
+        config.put("stores." + storeId + ".msg.serde", "stateValueSerde");
+        config.put("stores." + storeId + ".rocksdb.compression", "lz4");
 
         if (options.getStateDurable()) {
           config.put(
@@ -334,6 +359,13 @@ class ParDoBoundMultiTranslator<InT, OutT>
     return config;
   }
 
+  @Override
+  public Map<String, String> createPortableConfig(
+      PipelineNode.PTransformNode transform, SamzaPipelineOptions options) {
+    // TODO: Add beamStore configs when portable use case supports stateful ParDo.
+    return Collections.emptyMap();
+  }
+
   static class SideInputWatermarkFn<InT>
       implements FlatMapFunction<OpMessage<InT>, OpMessage<InT>>,
           WatermarkFunction<OpMessage<InT>> {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
index 8eb8746..3514b4f 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.samza.translation;
 
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.service.AutoService;
 import java.util.HashMap;
@@ -58,9 +57,6 @@ public class SamzaPipelineTranslator {
   private SamzaPipelineTranslator() {}
 
   public static void translate(Pipeline pipeline, TranslationContext ctx) {
-    checkState(
-        ctx.getPipelineOptions().getMaxBundleSize() <= 1,
-        "bundling is not supported for non portable mode. Please disable bundling (by setting max bundle size to 1).");
     final TransformVisitorFn translateFn =
         new TransformVisitorFn() {
 
@@ -180,18 +176,19 @@ public class SamzaPipelineTranslator {
     @Override
     public Map<String, TransformTranslator<?>> getTransformTranslators() {
       return ImmutableMap.<String, TransformTranslator<?>>builder()
-          .put(PTransformTranslation.READ_TRANSFORM_URN, new ReadTranslator())
-          .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoBoundMultiTranslator())
-          .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslator())
-          .put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new GroupByKeyTranslator())
-          .put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslator())
-          .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionsTranslator())
-          .put(SamzaPublishView.SAMZA_PUBLISH_VIEW_URN, new SamzaPublishViewTranslator())
+          .put(PTransformTranslation.READ_TRANSFORM_URN, new ReadTranslator<>())
+          .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoBoundMultiTranslator<>())
+          .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslator<>())
+          .put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new GroupByKeyTranslator<>())
+          .put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslator<>())
+          .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionsTranslator<>())
+          .put(SamzaPublishView.SAMZA_PUBLISH_VIEW_URN, new SamzaPublishViewTranslator<>())
           .put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new ImpulseTranslator())
+          .put(ExecutableStage.URN, new ParDoBoundMultiTranslator<>())
+          .put(PTransformTranslation.TEST_STREAM_TRANSFORM_URN, new SamzaTestStreamTranslator())
           .put(
               PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN,
               new SplittableParDoTranslators.ProcessKeyedElements<>())
-          .put(ExecutableStage.URN, new ParDoBoundMultiTranslator())
           .build();
     }
   }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamSystemFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamSystemFactory.java
new file mode 100644
index 0000000..96dc577
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamSystemFactory.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.core.serialization.Base64Serializer;
+import org.apache.beam.runners.samza.runtime.OpMessage;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * A Samza system factory that supports consuming from {@link TestStream} and translating events
+ * into messages according to the {@link org.apache.beam.sdk.testing.TestStream.EventType} of the
+ * events.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class SamzaTestStreamSystemFactory implements SystemFactory {
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+    final String streamPrefix = "systems." + systemName;
+    final Config scopedConfig = config.subset(streamPrefix + ".", true);
+    return new SmazaTestStreamSystemConsumer<>(getTestStream(scopedConfig));
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+    throw new UnsupportedOperationException("SamzaTestStreamSystem doesn't support producing");
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new SamzaTestStreamSystemAdmin();
+  }
+
+  /** A helper function to decode testStream from the config. */
+  private static <T> TestStream<T> getTestStream(Config config) {
+    @SuppressWarnings("unchecked")
+    final SerializableFunction<String, TestStream<T>> testStreamDecoder =
+        Base64Serializer.deserializeUnchecked(
+            config.get("testStreamDecoder"), SerializableFunction.class);
+    final TestStream<T> testStream = testStreamDecoder.apply(config.get("encodedTestStream"));
+    return testStream;
+  }
+
+  private static final String DUMMY_OFFSET = "0";
+
+  /** System admin for SmazaTestStreamSystem. */
+  public static class SamzaTestStreamSystemAdmin implements SystemAdmin {
+    @Override
+    public Map<SystemStreamPartition, String> getOffsetsAfter(
+        Map<SystemStreamPartition, String> offsets) {
+      return offsets.keySet().stream()
+          .collect(Collectors.toMap(Function.identity(), k -> DUMMY_OFFSET));
+    }
+
+    @Override
+    public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+      return streamNames.stream()
+          .collect(
+              Collectors.toMap(
+                  Function.identity(),
+                  stream -> {
+                    // TestStream will always be single partition
+                    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata>
+                        partitionMetadata =
+                            Collections.singletonMap(
+                                new Partition(0),
+                                new SystemStreamMetadata.SystemStreamPartitionMetadata(
+                                    DUMMY_OFFSET, DUMMY_OFFSET, DUMMY_OFFSET));
+                    return new SystemStreamMetadata(stream, partitionMetadata);
+                  }));
+    }
+
+    @Override
+    public Integer offsetComparator(String offset1, String offset2) {
+      return 0;
+    }
+  }
+
+  /** System consumer for SmazaTestStreamSystem. */
+  public static class SmazaTestStreamSystemConsumer<T> implements SystemConsumer {
+    TestStream<T> testStream;
+
+    public SmazaTestStreamSystemConsumer(TestStream<T> testStream) {
+      this.testStream = testStream;
+    }
+
+    @Override
+    public void start() {}
+
+    @Override
+    public void stop() {}
+
+    @Override
+    public void register(SystemStreamPartition systemStreamPartition, String offset) {}
+
+    @Override
+    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+        Set<SystemStreamPartition> systemStreamPartitions, long timeout)
+        throws InterruptedException {
+      SystemStreamPartition ssp = systemStreamPartitions.iterator().next();
+      ArrayList<IncomingMessageEnvelope> messages = new ArrayList<>();
+
+      for (TestStream.Event<T> event : testStream.getEvents()) {
+        if (event.getType().equals(TestStream.EventType.ELEMENT)) {
+          // If event type is element, for each element, create a message with the element and
+          // timestamp.
+          for (TimestampedValue<T> element : ((TestStream.ElementEvent<T>) event).getElements()) {
+            WindowedValue<T> windowedValue =
+                WindowedValue.timestampedValueInGlobalWindow(
+                    element.getValue(), element.getTimestamp());
+            final OpMessage<T> opMessage = OpMessage.ofElement(windowedValue);
+            final IncomingMessageEnvelope envelope =
+                new IncomingMessageEnvelope(ssp, DUMMY_OFFSET, null, opMessage);
+            messages.add(envelope);
+          }
+        } else if (event.getType().equals(TestStream.EventType.WATERMARK)) {
+          // If event type is watermark, create a watermark message.
+          long watermarkMillis = ((TestStream.WatermarkEvent<T>) event).getWatermark().getMillis();
+          final IncomingMessageEnvelope envelope =
+              IncomingMessageEnvelope.buildWatermarkEnvelope(ssp, watermarkMillis);
+          messages.add(envelope);
+          if (watermarkMillis == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+            // If watermark reached max watermark, also create a end-of-stream message
+            final IncomingMessageEnvelope endOfStreamMessage =
+                IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp);
+            messages.add(endOfStreamMessage);
+            break;
+          }
+        } else if (event.getType().equals(TestStream.EventType.PROCESSING_TIME)) {
+          throw new UnsupportedOperationException(
+              "Advancing Processing time is not supported by the Samza Runner.");
+        } else {
+          throw new SamzaException("Unknown event type " + event.getType());
+        }
+      }
+
+      return ImmutableMap.of(ssp, messages);
+    }
+  }
+}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamTranslator.java
new file mode 100644
index 0000000..ef38a79
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaTestStreamTranslator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import java.util.Map;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.core.serialization.Base64Serializer;
+import org.apache.beam.runners.samza.runtime.OpMessage;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.GenericSystemDescriptor;
+
+/**
+ * Translate {@link org.apache.beam.sdk.testing.TestStream} to a samza message stream produced by
+ * {@link
+ * org.apache.beam.runners.samza.translation.SamzaTestStreamSystemFactory.SmazaTestStreamSystemConsumer}.
+ */
+@SuppressWarnings({"rawtypes"})
+public class SamzaTestStreamTranslator<T> implements TransformTranslator<TestStream<T>> {
+
+  @Override
+  public void translate(
+      TestStream<T> testStream, TransformHierarchy.Node node, TranslationContext ctx) {
+    final PCollection<T> output = ctx.getOutput(testStream);
+    final String outputId = ctx.getIdForPValue(output);
+    final Coder<T> valueCoder = testStream.getValueCoder();
+    final TestStream.TestStreamCoder<T> testStreamCoder = TestStream.TestStreamCoder.of(valueCoder);
+    final GenericSystemDescriptor systemDescriptor =
+        new GenericSystemDescriptor(outputId, SamzaTestStreamSystemFactory.class.getName());
+
+    // encode testStream as a string
+    final String encodedTestStream;
+    try {
+      encodedTestStream = CoderUtils.encodeToBase64(testStreamCoder, testStream);
+    } catch (CoderException e) {
+      throw new SamzaException("Could not encode TestStream.", e);
+    }
+
+    // the decoder for encodedTestStream
+    SerializableFunction<String, TestStream<T>> testStreamDecoder =
+        string -> {
+          try {
+            return CoderUtils.decodeFromBase64(TestStream.TestStreamCoder.of(valueCoder), string);
+          } catch (CoderException e) {
+            throw new SamzaException("Could not decode TestStream.", e);
+          }
+        };
+
+    final Map<String, String> systemConfig =
+        ImmutableMap.of(
+            "encodedTestStream",
+            encodedTestStream,
+            "testStreamDecoder",
+            Base64Serializer.serializeUnchecked(testStreamDecoder));
+    systemDescriptor.withSystemConfigs(systemConfig);
+
+    // The KvCoder is needed here for Samza not to crop the key.
+    final Serde<KV<?, OpMessage<byte[]>>> kvSerde = KVSerde.of(new NoOpSerde(), new NoOpSerde<>());
+    final GenericInputDescriptor<KV<?, OpMessage<byte[]>>> inputDescriptor =
+        systemDescriptor.getInputDescriptor(outputId, kvSerde);
+
+    ctx.registerInputMessageStream(output, inputDescriptor);
+  }
+
+  @Override
+  public void translatePortable(
+      PipelineNode.PTransformNode transform,
+      QueryablePipeline pipeline,
+      PortableTranslationContext ctx) {
+    throw new SamzaException("TestStream is not supported in portable by Samza runner");
+  }
+}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java
index e2a37c2..91dc2a6 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java
@@ -119,8 +119,8 @@ public class SplittableParDoTranslators {
 
       final MessageStream<OpMessage<RawUnionValue>> taggedOutputStream =
           partitionedInputStream
-              .flatMap(OpAdapter.adapt(new KvToKeyedWorkItemOp<>()))
-              .flatMap(
+              .flatMapAsync(OpAdapter.adapt(new KvToKeyedWorkItemOp<>()))
+              .flatMapAsync(
                   OpAdapter.adapt(
                       new SplittableParDoProcessKeyedElementsOp<>(
                           transform.getMainOutputTag(),
@@ -139,7 +139,7 @@ public class SplittableParDoTranslators {
                     message ->
                         message.getType() != OpMessage.Type.ELEMENT
                             || message.getElement().getValue().getUnionTag() == outputIndex)
-                .flatMap(OpAdapter.adapt(new RawUnionValueToValue()));
+                .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue()));
 
         ctx.registerMessageStream(indexToPCollectionMap.get(outputIndex), outputStream);
       }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
index a298f38..5a69628 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
@@ -17,8 +17,12 @@
  */
 package org.apache.beam.runners.samza.translation;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.function.Consumer;
 import org.apache.beam.runners.core.construction.TransformInputs;
@@ -93,26 +97,38 @@ public class TranslationContext {
   }
 
   public <OutT> void registerInputMessageStream(
-      PValue pvalue,
-      InputDescriptor<org.apache.samza.operators.KV<?, OpMessage<OutT>>, ?> inputDescriptor) {
-    // we want to register it with the Samza graph only once per i/o stream
-    final String streamId = inputDescriptor.getStreamId();
-    if (registeredInputStreams.containsKey(streamId)) {
-      MessageStream<OpMessage<OutT>> messageStream = registeredInputStreams.get(streamId);
-      LOG.info(
-          String.format(
-              "Stream id %s has already been mapped to %s stream. Mapping %s to the same message stream.",
-              streamId, messageStream, pvalue));
-      registerMessageStream(pvalue, messageStream);
-
-      return;
-    }
-    @SuppressWarnings("unchecked")
-    final MessageStream<OpMessage<OutT>> typedStream =
-        getValueStream(appDescriptor.getInputStream(inputDescriptor));
+      PValue pvalue, InputDescriptor<KV<?, OpMessage<OutT>>, ?> inputDescriptor) {
+    registerInputMessageStreams(pvalue, Collections.singletonList(inputDescriptor));
+  }
 
-    registerMessageStream(pvalue, typedStream);
-    registeredInputStreams.put(streamId, typedStream);
+  /**
+   * Function to register a merged messageStream of all input messageStreams to a PCollection.
+   *
+   * @param pvalue output of a transform
+   * @param inputDescriptors a list of Samza InputDescriptors
+   */
+  public <OutT> void registerInputMessageStreams(
+      PValue pvalue, List<? extends InputDescriptor<KV<?, OpMessage<OutT>>, ?>> inputDescriptors) {
+    final Set<MessageStream<OpMessage<OutT>>> streamsToMerge = new HashSet<>();
+    for (InputDescriptor<KV<?, OpMessage<OutT>>, ?> inputDescriptor : inputDescriptors) {
+      final String streamId = inputDescriptor.getStreamId();
+      // each streamId registered in map should already be add in messageStreamMap
+      if (registeredInputStreams.containsKey(streamId)) {
+        @SuppressWarnings("unchecked")
+        MessageStream<OpMessage<OutT>> messageStream = registeredInputStreams.get(streamId);
+        LOG.info(
+            String.format(
+                "Stream id %s has already been mapped to %s stream. Mapping %s to the same message stream.",
+                streamId, messageStream, pvalue));
+        streamsToMerge.add(messageStream);
+      } else {
+        final MessageStream<OpMessage<OutT>> typedStream =
+            getValueStream(appDescriptor.getInputStream(inputDescriptor));
+        registeredInputStreams.put(streamId, typedStream);
+        streamsToMerge.add(typedStream);
+      }
+    }
+    registerMessageStream(pvalue, MessageStream.mergeAll(streamsToMerge));
   }
 
   public <OutT> void registerMessageStream(PValue pvalue, MessageStream<OpMessage<OutT>> stream) {
@@ -204,9 +220,8 @@ public class TranslationContext {
         tableDesc.getTableId(), id -> appDescriptor.getTable(tableDesc));
   }
 
-  private static <T> MessageStream<T> getValueStream(
-      MessageStream<org.apache.samza.operators.KV<?, T>> input) {
-    return input.map(org.apache.samza.operators.KV::getValue);
+  private static <T> MessageStream<T> getValueStream(MessageStream<KV<?, T>> input) {
+    return input.map(KV::getValue);
   }
 
   public String getIdForPValue(PValue pvalue) {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java
index 114a256..a8790c5 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java
@@ -47,7 +47,7 @@ class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>>
     final MessageStream<OpMessage<T>> inputStream = ctx.getMessageStream(ctx.getInput(transform));
 
     final MessageStream<OpMessage<T>> outputStream =
-        inputStream.flatMap(OpAdapter.adapt(new WindowAssignOp<>(windowFn)));
+        inputStream.flatMapAsync(OpAdapter.adapt(new WindowAssignOp<>(windowFn)));
 
     ctx.registerMessageStream(output, outputStream);
   }
@@ -73,7 +73,7 @@ class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>>
     final MessageStream<OpMessage<T>> inputStream = ctx.getOneInputMessageStream(transform);
 
     final MessageStream<OpMessage<T>> outputStream =
-        inputStream.flatMap(OpAdapter.adapt(new WindowAssignOp<>(windowFn)));
+        inputStream.flatMapAsync(OpAdapter.adapt(new WindowAssignOp<>(windowFn)));
 
     ctx.registerMessageStream(ctx.getOutputId(transform), outputStream);
   }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/FutureUtils.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/FutureUtils.java
new file mode 100644
index 0000000..09ad77b
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/FutureUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** A util class to handle java 8 {@link CompletableFuture} and {@link CompletionStage}. */
+@SuppressWarnings({"rawtypes"})
+public final class FutureUtils {
+  /**
+   * Flattens the input future collection and returns a single future comprising the results of all
+   * the futures.
+   *
+   * @param inputFutures input future collection
+   * @param <T> result type of the input future
+   * @return a single {@link CompletionStage} that contains the results of all the input futures.
+   */
+  public static <T> CompletionStage<Collection<T>> flattenFutures(
+      Collection<CompletionStage<T>> inputFutures) {
+    CompletableFuture<T>[] futures = inputFutures.toArray(new CompletableFuture[0]);
+
+    return CompletableFuture.allOf(futures)
+        .thenApply(
+            ignored -> {
+              final List<T> result =
+                  Stream.of(futures).map(CompletableFuture::join).collect(Collectors.toList());
+              return result;
+            });
+  }
+}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidatorTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidatorTest.java
new file mode 100644
index 0000000..a5b03a2
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidatorTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import static org.apache.beam.runners.samza.SamzaPipelineOptionsValidator.validateBundlingRelatedOptions;
+import static org.apache.samza.config.JobConfig.JOB_CONTAINER_THREAD_POOL_SIZE;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+/** Test for {@link SamzaPipelineOptionsValidator}. */
+public class SamzaPipelineOptionsValidatorTest {
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBundleEnabledInMultiThreadedModeThrowsException() {
+    SamzaPipelineOptions mockOptions = mock(SamzaPipelineOptions.class);
+    Map<String, String> config = ImmutableMap.of(JOB_CONTAINER_THREAD_POOL_SIZE, "10");
+
+    when(mockOptions.getMaxBundleSize()).thenReturn(2L);
+    when(mockOptions.getConfigOverride()).thenReturn(config);
+    validateBundlingRelatedOptions(mockOptions);
+  }
+
+  @Test
+  public void testBundleEnabledInSingleThreadedMode() {
+    SamzaPipelineOptions mockOptions = mock(SamzaPipelineOptions.class);
+    when(mockOptions.getMaxBundleSize()).thenReturn(2L);
+
+    try {
+      Map<String, String> config = ImmutableMap.of(JOB_CONTAINER_THREAD_POOL_SIZE, "1");
+      when(mockOptions.getConfigOverride()).thenReturn(config);
+      validateBundlingRelatedOptions(mockOptions);
+
+      // In the absence of configuration make sure it is treated as single threaded mode.
+      when(mockOptions.getConfigOverride()).thenReturn(Collections.emptyMap());
+      validateBundlingRelatedOptions(mockOptions);
+    } catch (Exception e) {
+      throw new AssertionError("Bundle size > 1 should be supported in single threaded mode");
+    }
+  }
+}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java
index fd61bc0..cd92077 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.samza.adapter;
 
 import static org.apache.beam.runners.samza.adapter.TestSourceHelpers.createElementMessage;
+import static org.apache.beam.runners.samza.adapter.TestSourceHelpers.createEndOfStreamMessage;
 import static org.apache.beam.runners.samza.adapter.TestSourceHelpers.createWatermarkMessage;
 import static org.apache.beam.runners.samza.adapter.TestSourceHelpers.expectWrappedException;
 import static org.junit.Assert.assertEquals;
@@ -102,6 +103,33 @@ public class UnboundedSourceSystemTest {
   }
 
   @Test
+  public void testMaxWatermarkTriggersEndOfStreamMessage()
+      throws IOException, InterruptedException {
+    final TestUnboundedSource<String> source =
+        TestUnboundedSource.<String>createBuilder()
+            .addElements("test")
+            .advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE)
+            .build();
+
+    final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
+        createConsumer(source);
+
+    consumer.register(DEFAULT_SSP, NULL_STRING);
+    consumer.start();
+    List<IncomingMessageEnvelope> actualList =
+        consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS);
+    actualList.addAll(
+        consumeUntilTimeoutOrWatermark(consumer, DEFAULT_SSP, DEFAULT_TIMEOUT_MILLIS));
+    assertEquals(
+        Arrays.asList(
+            createElementMessage(DEFAULT_SSP, offset(0), "test", BoundedWindow.TIMESTAMP_MIN_VALUE),
+            createWatermarkMessage(DEFAULT_SSP, BoundedWindow.TIMESTAMP_MAX_VALUE),
+            createEndOfStreamMessage(DEFAULT_SSP)),
+        actualList);
+    consumer.stop();
+  }
+
+  @Test
   public void testAdvanceTimestamp() throws IOException, InterruptedException {
     final Instant timestamp = Instant.now();
 
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java
new file mode 100644
index 0000000..4baf7be
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.samza.operators.Scheduler;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+/** Unit tests for {@linkplain BundleManager}. */
+@SuppressWarnings({"nullness"})
+// TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+public final class BundleManagerTest {
+  private static final long MAX_BUNDLE_SIZE = 3;
+  private static final long MAX_BUNDLE_TIME_MS = 2000;
+  private static final String BUNDLE_CHECK_TIMER_ID = "bundle-check-test-timer";
+
+  private FutureCollector<String> mockFutureCollector;
+  private BundleManager<String> bundleManager;
+  private BundleManager.BundleProgressListener<String> bundleProgressListener;
+  private Scheduler<KeyedTimerData<Void>> mockScheduler;
+
+  @Before
+  public void setUp() {
+    mockFutureCollector = mock(FutureCollector.class);
+    bundleProgressListener = mock(BundleManager.BundleProgressListener.class);
+    mockScheduler = mock(Scheduler.class);
+    bundleManager =
+        new BundleManager<>(
+            bundleProgressListener,
+            mockFutureCollector,
+            MAX_BUNDLE_SIZE,
+            MAX_BUNDLE_TIME_MS,
+            mockScheduler,
+            BUNDLE_CHECK_TIMER_ID);
+  }
+
+  @Test
+  public void testTryStartBundleStartsBundle() {
+    bundleManager.tryStartBundle();
+
+    verify(bundleProgressListener, times(1)).onBundleStarted();
+    assertEquals(
+        "Expected the number of element in the current bundle to be 1",
+        1L,
+        bundleManager.getCurrentBundleElementCount());
+    assertEquals(
+        "Expected the pending bundle count to be 1", 1L, bundleManager.getPendingBundleCount());
+    assertTrue("tryStartBundle() did not start the bundle", bundleManager.isBundleStarted());
+  }
+
+  @Test
+  public void testTryStartBundleThrowsExceptionAndSignalError() {
+    bundleManager.setCurrentBundleDoneFuture(CompletableFuture.completedFuture(null));
+    try {
+      bundleManager.tryStartBundle();
+    } catch (IllegalArgumentException e) {
+      bundleManager.signalFailure(e);
+    }
+
+    // verify if the signal failure only resets appropriate attributes of bundle
+    verify(mockFutureCollector, times(1)).prepare();
+    verify(mockFutureCollector, times(1)).discard();
+    assertEquals(
+        "Expected the number of element in the current bundle to 0",
+        0L,
+        bundleManager.getCurrentBundleElementCount());
+    assertEquals(
+        "Expected pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
+    assertFalse("Error didn't reset the bundle as expected.", bundleManager.isBundleStarted());
+  }
+
+  @Test
+  public void testTryStartBundleThrowsExceptionFromTheListener() {
+    doThrow(new RuntimeException("User start bundle threw an exception"))
+        .when(bundleProgressListener)
+        .onBundleStarted();
+
+    try {
+      bundleManager.tryStartBundle();
+    } catch (RuntimeException e) {
+      bundleManager.signalFailure(e);
+    }
+
+    // verify if the signal failure only resets appropriate attributes of bundle
+    verify(mockFutureCollector, times(1)).prepare();
+    verify(mockFutureCollector, times(1)).discard();
+    assertEquals(
+        "Expected the number of element in the current bundle to 0",
+        0L,
+        bundleManager.getCurrentBundleElementCount());
+    assertEquals(
+        "Expected pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
+    assertFalse("Error didn't reset the bundle as expected.", bundleManager.isBundleStarted());
+  }
+
+  @Test
+  public void testMultipleStartBundle() {
+    bundleManager.tryStartBundle();
+    bundleManager.tryStartBundle();
+
+    // second invocation should not start the bundle
+    verify(bundleProgressListener, times(1)).onBundleStarted();
+    assertEquals(
+        "Expected the number of element in the current bundle to be 2",
+        2L,
+        bundleManager.getCurrentBundleElementCount());
+    assertEquals(
+        "Expected the pending bundle count to be 1", 1L, bundleManager.getPendingBundleCount());
+    assertTrue("tryStartBundle() did not start the bundle", bundleManager.isBundleStarted());
+  }
+
+  /*
+   * Setup the bundle manager with default max bundle size as 3 and max bundle close timeout to 2 seconds.
+   * The test verifies the following
+   *  1. Bundle gets closed on tryFinishBundle()
+   *     a. pending bundle count == 0
+   *     b. element in current bundle == 0
+   *     c. isBundleStarted == false
+   *  2. onBundleFinished callback is invoked on the progress listener
+   */
+  @Test
+  public void testTryFinishBundleClosesBundle() {
+    OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+    when(mockFutureCollector.finish())
+        .thenReturn(
+            CompletableFuture.completedFuture(Collections.singleton(mock(WindowedValue.class))));
+
+    bundleManager.tryStartBundle();
+    bundleManager.tryStartBundle();
+    bundleManager.tryStartBundle();
+    bundleManager.tryFinishBundle(mockEmitter);
+
+    verify(mockEmitter, times(1)).emitFuture(anyObject());
+    verify(bundleProgressListener, times(1)).onBundleFinished(mockEmitter);
+    assertEquals(
+        "Expected the number of element in the current bundle to be 0",
+        0L,
+        bundleManager.getCurrentBundleElementCount());
+    assertEquals(
+        "Expected the pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
+    assertFalse("tryFinishBundle() did not close the bundle", bundleManager.isBundleStarted());
+  }
+
+  @Test
+  public void testTryFinishBundleClosesBundleOnMaxWatermark() {
+    OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+    when(mockFutureCollector.finish())
+        .thenReturn(
+            CompletableFuture.completedFuture(Collections.singleton(mock(WindowedValue.class))));
+    bundleManager.setBundleWatermarkHold(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    bundleManager.tryStartBundle();
+    bundleManager.tryStartBundle();
+    bundleManager.tryFinishBundle(mockEmitter);
+
+    verify(mockEmitter, times(1)).emitFuture(anyObject());
+    verify(bundleProgressListener, times(1)).onBundleFinished(mockEmitter);
+    assertEquals(
+        "Expected the number of element in the current bundle to be 0",
+        0L,
+        bundleManager.getCurrentBundleElementCount());
+    assertEquals(
+        "Expected the pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
+    assertFalse("tryFinishBundle() did not close the bundle", bundleManager.isBundleStarted());
+  }
+
+  /*
+   * Set up the bundle manager with defaults and ensure the bundle manager doesn't close the current active bundle.
+   */
+  @Test
+  public void testTryFinishBundleShouldNotCloseBundle() {
+    OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+    when(mockFutureCollector.finish())
+        .thenReturn(
+            CompletableFuture.completedFuture(Collections.singleton(mock(WindowedValue.class))));
+
+    bundleManager.tryStartBundle();
+    bundleManager.tryFinishBundle(mockEmitter);
+
+    verify(mockFutureCollector, times(1)).finish();
+    verify(mockEmitter, times(1)).emitFuture(anyObject());
+    verify(bundleProgressListener, times(0)).onBundleFinished(mockEmitter);
+    assertEquals(
+        "Expected the number of element in the current bundle to be 1",
+        1L,
+        bundleManager.getCurrentBundleElementCount());
+    assertEquals(
+        "Expected the pending bundle count to be 1", 1L, bundleManager.getPendingBundleCount());
+    assertTrue("tryFinishBundle() did not close the bundle", bundleManager.isBundleStarted());
+  }
+
+  @Test
+  public void testTryFinishBundleWhenNoBundleInProgress() {
+    OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+    when(mockFutureCollector.finish())
+        .thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
+
+    bundleManager.tryFinishBundle(mockEmitter);
+
+    verify(mockEmitter, times(1)).emitFuture(anyObject());
+    assertNull(
+        "tryFinishBundle() should not set the future when no bundle in progress",
+        bundleManager.getCurrentBundleDoneFuture());
+  }
+
+  @Test
+  public void testProcessWatermarkWhenNoBundleInProgress() {
+    Instant now = Instant.now();
+    OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+    bundleManager.processWatermark(now, mockEmitter);
+    verify(bundleProgressListener, times(1)).onWatermark(now, mockEmitter);
+  }
+
+  /*
+   * The test validates processing watermark during an active bundle in progress and also validates
+   * if the watermark hold is propagated down stream after the output futures are resolved.
+   */
+  @Test
+  public void testProcessWatermarkWithPendingBundles() {
+    CountDownLatch latch = new CountDownLatch(1);
+    Instant watermark = Instant.now();
+    OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+
+    // We need to capture the finish bundle future to know if we can check for output watermark
+    // and verify other callbacks get invoked.
+    Class<CompletionStage<Collection<WindowedValue<String>>>> outputFutureClass =
+        (Class<CompletionStage<Collection<WindowedValue<String>>>>) (Class) CompletionStage.class;
+    ArgumentCaptor<CompletionStage<Collection<WindowedValue<String>>>> captor =
+        ArgumentCaptor.forClass(outputFutureClass);
+
+    when(mockFutureCollector.finish())
+        .thenReturn(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    latch.await();
+                  } catch (InterruptedException e) {
+                    throw new AssertionError("Test interrupted when waiting for latch");
+                  }
+
+                  return Collections.singleton(mock(WindowedValue.class));
+                }));
+
+    testWatermarkHoldWhenPendingBundleInProgress(mockEmitter, captor, watermark);
+    testWatermarkHoldPropagatesAfterFutureResolution(mockEmitter, captor, latch, watermark);
+  }
+
+  @Test
+  public void testMaxWatermarkPropagationForPendingBundle() {
+    Instant watermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+    bundleManager.setPendingBundleCount(1);
+    bundleManager.processWatermark(watermark, mockEmitter);
+    verify(bundleProgressListener, times(1)).onWatermark(watermark, mockEmitter);
+  }
+
+  @Test
+  public void testMaxWatermarkWithBundleInProgress() {
+    Instant watermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+
+    when(mockFutureCollector.finish())
+        .thenReturn(
+            CompletableFuture.completedFuture(Collections.singleton(mock(WindowedValue.class))));
+
+    bundleManager.tryStartBundle();
+    bundleManager.tryStartBundle();
+
+    // should force close bundle
+    bundleManager.processWatermark(watermark, mockEmitter);
+    verify(bundleProgressListener, times(1)).onWatermark(watermark, mockEmitter);
+  }
+
+  @Test
+  public void testProcessTimerWithBundleTimeElapsed() {
+    BundleManager<String> bundleManager =
+        new BundleManager<>(
+            bundleProgressListener,
+            mockFutureCollector,
+            MAX_BUNDLE_SIZE,
+            0,
+            mockScheduler,
+            BUNDLE_CHECK_TIMER_ID);
+    OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+    KeyedTimerData<Void> mockTimer = mock(KeyedTimerData.class);
+    TimerInternals.TimerData mockTimerData = mock(TimerInternals.TimerData.class);
+
+    when(mockFutureCollector.finish())
+        .thenReturn(
+            CompletableFuture.completedFuture(Collections.singleton(mock(WindowedValue.class))));
+    when(mockTimerData.getTimerId()).thenReturn(BUNDLE_CHECK_TIMER_ID);
+    when(mockTimer.getTimerData()).thenReturn(mockTimerData);
+
+    bundleManager.tryStartBundle();
+    bundleManager.processTimer(mockTimer, mockEmitter);
+
+    verify(mockEmitter, times(1)).emitFuture(anyObject());
+    verify(bundleProgressListener, times(1)).onBundleFinished(mockEmitter);
+    assertEquals(
+        "Expected the number of element in the current bundle to be 0",
+        0L,
+        bundleManager.getCurrentBundleElementCount());
+    assertEquals(
+        "Expected the pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
+    assertFalse("tryFinishBundle() did not close the bundle", bundleManager.isBundleStarted());
+  }
+
+  @Test
+  public void testProcessTimerWithTimeLessThanMaxBundleTime() {
+    OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+    KeyedTimerData<Void> mockTimer = mock(KeyedTimerData.class);
+    TimerInternals.TimerData mockTimerData = mock(TimerInternals.TimerData.class);
+
+    when(mockTimerData.getTimerId()).thenReturn(BUNDLE_CHECK_TIMER_ID);
+    when(mockTimer.getTimerData()).thenReturn(mockTimerData);
+
+    when(mockFutureCollector.finish())
+        .thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
+
+    bundleManager.tryStartBundle();
+    bundleManager.processTimer(mockTimer, mockEmitter);
+
+    verify(mockFutureCollector, times(1)).finish();
+    verify(mockEmitter, times(1)).emitFuture(anyObject());
+    verify(bundleProgressListener, times(0)).onBundleFinished(mockEmitter);
+    assertEquals(
+        "Expected the number of element in the current bundle to be 1",
+        1L,
+        bundleManager.getCurrentBundleElementCount());
+    assertEquals(
+        "Expected the pending bundle count to be 1", 1L, bundleManager.getPendingBundleCount());
+    assertTrue("tryFinishBundle() closed the bundle", bundleManager.isBundleStarted());
+  }
+
+  @Test
+  public void testProcessTimerIgnoresNonBundleTimers() {
+    OpEmitter<String> mockEmitter = mock(OpEmitter.class);
+    KeyedTimerData<Void> mockTimer = mock(KeyedTimerData.class);
+    TimerInternals.TimerData mockTimerData = mock(TimerInternals.TimerData.class);
+
+    when(mockTimerData.getTimerId()).thenReturn("NotBundleTimer");
+    when(mockTimer.getTimerData()).thenReturn(mockTimerData);
+
+    bundleManager.tryStartBundle();
+    bundleManager.processTimer(mockTimer, mockEmitter);
+
+    verify(mockFutureCollector, times(0)).finish();
+    verify(mockEmitter, times(0)).emitFuture(anyObject());
+    verify(bundleProgressListener, times(0)).onBundleFinished(mockEmitter);
+    assertEquals(
+        "Expected the number of element in the current bundle to be 1",
+        1L,
+        bundleManager.getCurrentBundleElementCount());
+    assertEquals(
+        "Expected the pending bundle count to be 1", 1L, bundleManager.getPendingBundleCount());
+    assertTrue("tryFinishBundle() closed the bundle", bundleManager.isBundleStarted());
+  }
+
+  @Test
+  public void testSignalFailureResetsTheBundleAndCollector() {
+    bundleManager.tryStartBundle();
+
+    bundleManager.signalFailure(mock(Throwable.class));
+    verify(mockFutureCollector, times(1)).prepare();
+    verify(mockFutureCollector, times(1)).discard();
+    assertEquals(
+        "Expected the number of element in the current bundle to 0",
+        0L,
+        bundleManager.getCurrentBundleElementCount());
+    assertEquals(
+        "Expected pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
+    assertFalse("Error didn't reset the bundle as expected.", bundleManager.isBundleStarted());
+  }
+
+  /*
+   * We validate the following
+   *  1. Process watermark is held since there is a pending bundle.
+   *  2. Watermark propagates down stream once the output future is resolved.
+   *  3. The watermark propagated is the one that was held before closing the bundle
+   *  4. onBundleFinished and onWatermark callbacks are triggered
+   *  5. Pending bundle count is decremented once the future is resolved
+   */
+  private void testWatermarkHoldPropagatesAfterFutureResolution(
+      OpEmitter<String> mockEmitter,
+      ArgumentCaptor<CompletionStage<Collection<WindowedValue<String>>>> captor,
+      CountDownLatch latch,
+      Instant sealedWatermark) {
+    Instant higherWatermark = Instant.now();
+
+    // Process watermark should result in watermark hold again since pending bundle count > 1
+    bundleManager.processWatermark(higherWatermark, mockEmitter);
+    verify(bundleProgressListener, times(0)).onWatermark(higherWatermark, mockEmitter);
+
+    // Resolving the process output futures should result in watermark propagation
+    latch.countDown();
+    CompletionStage<Void> validationFuture =
+        captor
+            .getValue()
+            .thenAccept(
+                results -> {
+                  verify(bundleProgressListener, times(1)).onBundleFinished(mockEmitter);
+                  verify(bundleProgressListener, times(1))
+                      .onWatermark(sealedWatermark, mockEmitter);
+                  assertEquals(
+                      "Expected the pending bundle count to be 0",
+                      0L,
+                      bundleManager.getPendingBundleCount());
+                });
+
+    validationFuture.toCompletableFuture().join();
+  }
+
+  /*
+   * We validate the following
+   *  1. Watermark is held since there is a bundle in progress
+   *  2. Callbacks are not invoked when tryFinishBundle() is invoked since the future is unresolved
+   *  3. Watermark hold is sealed and output future is emitted
+   */
+  private void testWatermarkHoldWhenPendingBundleInProgress(
+      OpEmitter<String> mockEmitter,
+      ArgumentCaptor<CompletionStage<Collection<WindowedValue<String>>>> captor,
+      Instant watermark) {
+    // Starts the bundle and reach the max bundle size so that tryFinishBundle() seals the current
+    // bundle
+    bundleManager.tryStartBundle();
+    bundleManager.tryStartBundle();
+    bundleManager.tryStartBundle();
+
+    bundleManager.processWatermark(watermark, mockEmitter);
+    verify(bundleProgressListener, times(0)).onWatermark(watermark, mockEmitter);
+
+    // Bundle is still unresolved although sealed since count down the latch is not yet decremented.
+    bundleManager.tryFinishBundle(mockEmitter);
+    verify(mockFutureCollector, times(1)).finish();
+    verify(mockEmitter, times(1)).emitFuture(captor.capture());
+    assertFalse("tryFinishBundle() closed the bundle", bundleManager.isBundleStarted());
+  }
+}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java
new file mode 100644
index 0000000..f126dd1
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/** Unit tests for {@linkplain org.apache.beam.runners.samza.runtime.DoFnOp.FutureCollectorImpl}. */
+public final class FutureCollectorImplTest {
+  private static final List<String> RESULTS = ImmutableList.of("hello", "world");
+  private FutureCollector<String> futureCollector = new DoFnOp.FutureCollectorImpl<>();
+
+  @Before
+  public void setup() {
+    futureCollector = new DoFnOp.FutureCollectorImpl<>();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testAddWithoutPrepareCallThrowsException() {
+    futureCollector.add(mock(CompletionStage.class));
+  }
+
+  @Test
+  public void testFinishWithoutPrepareReturnsEmptyCollection() {
+    CompletionStage<Collection<WindowedValue<String>>> resultFuture = futureCollector.finish();
+    CompletionStage<Void> validationFuture =
+        resultFuture.thenAccept(
+            result -> {
+              Assert.assertTrue("Expected the result to be empty", result.isEmpty());
+            });
+    validationFuture.toCompletableFuture().join();
+  }
+
+  @Test
+  public void testFinishReturnsExpectedResults() {
+    WindowedValue<String> mockWindowedValue = mock(WindowedValue.class);
+
+    when(mockWindowedValue.getValue()).thenReturn("hello").thenReturn("world");
+
+    futureCollector.prepare();
+    futureCollector.add(CompletableFuture.completedFuture(mockWindowedValue));
+    futureCollector.add(CompletableFuture.completedFuture(mockWindowedValue));
+
+    CompletionStage<Collection<WindowedValue<String>>> resultFuture = futureCollector.finish();
+    CompletionStage<Void> validationFuture =
+        resultFuture.thenAccept(
+            results -> {
+              List<String> actualResults =
+                  results.stream().map(WindowedValue::getValue).collect(Collectors.toList());
+              Assert.assertEquals(
+                  "Expected the result to be {hello, world}", RESULTS, actualResults);
+            });
+    validationFuture.toCompletableFuture().join();
+  }
+
+  @Test
+  public void testMultiplePrepareCallsWithoutFinishThrowsException() {
+    futureCollector.prepare();
+
+    try {
+      futureCollector.prepare();
+      Assert.fail("Second invocation of prepare should throw IllegalStateException");
+    } catch (IllegalStateException ex) {
+    }
+  }
+}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
index a18d875..d3da93a 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
 
@@ -36,18 +35,14 @@ public class KeyedTimerDataTest {
   private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
   private static final Instant TIMESTAMP =
       new DateTime(2020, 8, 11, 13, 42, 9, DateTimeZone.UTC).toInstant();
-  private static final Instant OUTPUT_TIMESTAMP = TIMESTAMP.plus(Duration.standardSeconds(30));
+  // TODO: LISAMZA-19205 Test OUTPUT_TIMESTAMP after outputTimestamp is encoded
+  // private static final Instant OUTPUT_TIMESTAMP = TIMESTAMP.plus(Duration.standardSeconds(30));
 
   @Test
   public void testCoder() throws Exception {
     final TimerInternals.TimerData td =
         TimerInternals.TimerData.of(
-            "timer",
-            "timerFamily",
-            StateNamespaces.global(),
-            TIMESTAMP,
-            OUTPUT_TIMESTAMP,
-            TimeDomain.EVENT_TIME);
+            "timer", StateNamespaces.global(), TIMESTAMP, TIMESTAMP, TimeDomain.EVENT_TIME);
 
     final String key = "timer-key";
     final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -58,6 +53,7 @@ public class KeyedTimerDataTest {
     final KeyedTimerData.KeyedTimerDataCoder<String> ktdCoder =
         new KeyedTimerData.KeyedTimerDataCoder<>(STRING_CODER, GlobalWindow.Coder.INSTANCE);
 
-    CoderProperties.coderDecodeEncodeEqual(ktdCoder, ktd);
+    // TODO: LISAMZA-19205: use CoderProperties.coderDecodeEncodeEqual
+    CoderProperties.coderDecodeEncodeEqualInContext(ktdCoder, Coder.Context.OUTER, ktd);
   }
 }
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
index 1291e4c..533828a 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
@@ -19,9 +19,11 @@ package org.apache.beam.runners.samza.runtime;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -32,9 +34,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.apache.beam.runners.samza.TestSamzaRunner;
+import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.StateValue;
+import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.StateValueSerdeFactory;
 import org.apache.beam.runners.samza.state.SamzaMapState;
 import org.apache.beam.runners.samza.state.SamzaSetState;
 import org.apache.beam.runners.samza.translation.ConfigBuilder;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -58,6 +64,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.JobContext;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.storage.StorageEngineFactory;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
@@ -206,16 +213,9 @@ public class SamzaStoreStateInternalsTest implements Serializable {
   /** A test store based on InMemoryKeyValueStore. */
   public static class TestStore extends InMemoryKeyValueStore {
     static List<TestKeyValueIteraor> iterators = Collections.synchronizedList(new ArrayList<>());
-    private final KeyValueStoreMetrics metrics;
 
     public TestStore(KeyValueStoreMetrics metrics) {
       super(metrics);
-      this.metrics = metrics;
-    }
-
-    @Override
-    public KeyValueStoreMetrics metrics() {
-      return metrics;
     }
 
     @Override
@@ -285,7 +285,9 @@ public class SamzaStoreStateInternalsTest implements Serializable {
                 KV.of("hello", 97), KV.of("hello", 42), KV.of("hello", 42), KV.of("hello", 12)))
         .apply(ParDo.of(fn));
 
-    Map<String, String> configs = new HashMap(ConfigBuilder.localRunConfig());
+    SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+    options.setRunner(TestSamzaRunner.class);
+    Map<String, String> configs = new HashMap<>(ConfigBuilder.localRunConfig());
     configs.put("stores.foo.factory", TestStorageEngine.class.getName());
     pipeline.getOptions().as(SamzaPipelineOptions.class).setConfigOverride(configs);
     pipeline.run();
@@ -295,4 +297,24 @@ public class SamzaStoreStateInternalsTest implements Serializable {
     assertEquals(8, TestStore.iterators.size());
     TestStore.iterators.forEach(iter -> assertTrue(iter.closed));
   }
+
+  @Test
+  public void testStateValueSerde() throws IOException {
+    StateValueSerdeFactory stateValueSerdeFactory = new StateValueSerdeFactory();
+    Serde<StateValue<Integer>> serde = (Serde) stateValueSerdeFactory.getSerde("Test", null);
+    int value = 123;
+    Coder<Integer> coder = VarIntCoder.of();
+
+    byte[] valueBytes = serde.toBytes(StateValue.of(value, coder));
+    StateValue<Integer> stateValue1 = serde.fromBytes(valueBytes);
+    StateValue<Integer> stateValue2 = StateValue.of(valueBytes);
+    assertEquals(stateValue1.getValue(coder).intValue(), value);
+    assertEquals(stateValue2.getValue(coder).intValue(), value);
+
+    Integer nullValue = null;
+    byte[] nullBytes = serde.toBytes(StateValue.of(nullValue, coder));
+    StateValue<Integer> nullStateValue = serde.fromBytes(nullBytes);
+    assertNull(nullBytes);
+    assertNull(nullStateValue.getValue(coder));
+  }
 }
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
index 9af37a9..4c750b5 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
@@ -38,6 +38,8 @@ import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.ByteArray;
 import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.ByteArraySerdeFactory;
+import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.StateValue;
+import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals.StateValueSerdeFactory;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -48,7 +50,6 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.TaskContext;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.Scheduler;
-import org.apache.samza.serializers.ByteSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.storage.kv.KeyValueStoreMetrics;
@@ -74,7 +75,7 @@ import org.rocksdb.WriteOptions;
 public class SamzaTimerInternalsFactoryTest {
   @Rule public transient TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-  private KeyValueStore<ByteArray, byte[]> createStore() {
+  private KeyValueStore<ByteArray, StateValue<?>> createStore() {
     final Options options = new Options();
     options.setCreateIfMissing(true);
 
@@ -92,12 +93,12 @@ public class SamzaTimerInternalsFactoryTest {
     return new SerializedKeyValueStore<>(
         rocksStore,
         new ByteArraySerdeFactory.ByteArraySerde(),
-        new ByteSerde(),
+        new StateValueSerdeFactory.StateValueSerde(),
         new SerializedKeyValueStoreMetrics("beamStore", new MetricsRegistryMap()));
   }
 
   private static SamzaStoreStateInternals.Factory<?> createNonKeyedStateInternalsFactory(
-      SamzaPipelineOptions pipelineOptions, KeyValueStore<ByteArray, byte[]> store) {
+      SamzaPipelineOptions pipelineOptions, KeyValueStore<ByteArray, StateValue<?>> store) {
     final TaskContext context = mock(TaskContext.class);
     when(context.getStore(anyString())).thenReturn((KeyValueStore) store);
     final TupleTag<?> mainOutputTag = new TupleTag<>("output");
@@ -110,7 +111,7 @@ public class SamzaTimerInternalsFactoryTest {
       Scheduler<KeyedTimerData<String>> timerRegistry,
       String timerStateId,
       SamzaPipelineOptions pipelineOptions,
-      KeyValueStore<ByteArray, byte[]> store) {
+      KeyValueStore<ByteArray, StateValue<?>> store) {
 
     final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
         createNonKeyedStateInternalsFactory(pipelineOptions, store);
@@ -144,7 +145,7 @@ public class SamzaTimerInternalsFactoryTest {
     final SamzaPipelineOptions pipelineOptions =
         PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
 
-    final KeyValueStore<ByteArray, byte[]> store = createStore();
+    final KeyValueStore<ByteArray, StateValue<?>> store = createStore();
     final SamzaTimerInternalsFactory<String> timerInternalsFactory =
         createTimerInternalsFactory(null, "timer", pipelineOptions, store);
 
@@ -178,11 +179,68 @@ public class SamzaTimerInternalsFactoryTest {
   }
 
   @Test
+  public void testRestoreEventBufferSize() throws Exception {
+    final SamzaPipelineOptions pipelineOptions =
+        PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+
+    KeyValueStore<ByteArray, StateValue<?>> store = createStore();
+    final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+        createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+    final String key = "testKey";
+    final StateNamespace nameSpace = StateNamespaces.global();
+    final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key);
+    final TimerInternals.TimerData timer1 =
+        TimerInternals.TimerData.of(
+            "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME);
+    timerInternals.setTimer(timer1);
+
+    store.close();
+
+    // restore by creating a new instance
+    store = createStore();
+
+    final SamzaTimerInternalsFactory<String> restoredFactory =
+        createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+    assertEquals(1, restoredFactory.getEventTimeBuffer().size());
+
+    restoredFactory.setInputWatermark(new Instant(150));
+    Collection<KeyedTimerData<String>> readyTimers = restoredFactory.removeReadyTimers();
+    assertEquals(1, readyTimers.size());
+
+    // Timer 1 should be evicted from buffer
+    assertTrue(restoredFactory.getEventTimeBuffer().isEmpty());
+    final TimerInternals restoredTimerInternals = restoredFactory.timerInternalsForKey(key);
+    final TimerInternals.TimerData timer2 =
+        TimerInternals.TimerData.of(
+            "timer2", nameSpace, new Instant(200), new Instant(200), TimeDomain.EVENT_TIME);
+    restoredTimerInternals.setTimer(timer2);
+
+    // Timer 2 should be added to the Event buffer
+    assertEquals(1, restoredFactory.getEventTimeBuffer().size());
+    // Timer 2 should not be ready
+    readyTimers = restoredFactory.removeReadyTimers();
+    assertEquals(0, readyTimers.size());
+
+    restoredFactory.setInputWatermark(new Instant(250));
+
+    // Timer 2 should be ready
+    readyTimers = restoredFactory.removeReadyTimers();
+    assertEquals(1, readyTimers.size());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    StringUtf8Coder.of().encode(key, baos);
+    byte[] keyBytes = baos.toByteArray();
+    assertEquals(readyTimers, Arrays.asList(new KeyedTimerData<>(keyBytes, key, timer2)));
+
+    store.close();
+  }
+
+  @Test
   public void testRestore() throws Exception {
     final SamzaPipelineOptions pipelineOptions =
         PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
 
-    KeyValueStore<ByteArray, byte[]> store = createStore();
+    KeyValueStore<ByteArray, StateValue<?>> store = createStore();
     final SamzaTimerInternalsFactory<String> timerInternalsFactory =
         createTimerInternalsFactory(null, "timer", pipelineOptions, store);
 
@@ -227,7 +285,7 @@ public class SamzaTimerInternalsFactoryTest {
     final SamzaPipelineOptions pipelineOptions =
         PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
 
-    KeyValueStore<ByteArray, byte[]> store = createStore();
+    KeyValueStore<ByteArray, StateValue<?>> store = createStore();
     TestTimerRegistry timerRegistry = new TestTimerRegistry();
 
     final SamzaTimerInternalsFactory<String> timerInternalsFactory =
@@ -271,7 +329,7 @@ public class SamzaTimerInternalsFactoryTest {
     final SamzaPipelineOptions pipelineOptions =
         PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
 
-    KeyValueStore<ByteArray, byte[]> store = createStore();
+    KeyValueStore<ByteArray, StateValue<?>> store = createStore();
     final SamzaTimerInternalsFactory<String> timerInternalsFactory =
         createTimerInternalsFactory(null, "timer", pipelineOptions, store);
 
@@ -309,6 +367,346 @@ public class SamzaTimerInternalsFactoryTest {
     store.close();
   }
 
+  /**
+   * Test the number of expired event timers for each watermark does not exceed the predefined
+   * limit.
+   */
+  @Test
+  public void testMaxExpiredEventTimersProcessAtOnce() {
+    // If maxExpiredTimersToProcessOnce <= the number of expired timers, then load
+    // "maxExpiredTimersToProcessOnce" timers.
+    testMaxExpiredEventTimersProcessAtOnce(10, 10, 5, 5);
+    testMaxExpiredEventTimersProcessAtOnce(10, 10, 10, 10);
+
+    // If maxExpiredTimersToProcessOnce > the number of expired timers, then load all the ready
+    // timers.
+    testMaxExpiredEventTimersProcessAtOnce(10, 10, 20, 10);
+  }
+
+  private void testMaxExpiredEventTimersProcessAtOnce(
+      int totalNumberOfTimersInStore,
+      int totalNumberOfExpiredTimers,
+      int maxExpiredTimersToProcessOnce,
+      int expectedExpiredTimersToProcess) {
+    final SamzaPipelineOptions pipelineOptions =
+        PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+    pipelineOptions.setMaxReadyTimersToProcessOnce(maxExpiredTimersToProcessOnce);
+
+    final KeyValueStore<ByteArray, StateValue<?>> store = createStore();
+    final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+        createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+    final StateNamespace nameSpace = StateNamespaces.global();
+    final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+
+    TimerInternals.TimerData timer;
+    for (int i = 0; i < totalNumberOfTimersInStore; i++) {
+      timer =
+          TimerInternals.TimerData.of(
+              "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+      timerInternals.setTimer(timer);
+    }
+
+    // Set the timestamp of the input watermark to be the value of totalNumberOfExpiredTimers
+    // so that totalNumberOfExpiredTimers timers are expected be expired with respect to this
+    // watermark.
+    final Instant inputWatermark = new Instant(totalNumberOfExpiredTimers);
+    timerInternalsFactory.setInputWatermark(inputWatermark);
+    final Collection<KeyedTimerData<String>> readyTimers =
+        timerInternalsFactory.removeReadyTimers();
+    assertEquals(expectedExpiredTimersToProcess, readyTimers.size());
+    store.close();
+  }
+
+  /**
+   * Test the number of event time timers maintained in memory does not go beyond the limit defined
+   * in pipeline option.
+   */
+  @Test
+  public void testEventTimeTimersMemoryBoundary1() {
+    final SamzaPipelineOptions pipelineOptions =
+        PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+    pipelineOptions.setEventTimerBufferSize(2);
+
+    final KeyValueStore<ByteArray, StateValue<?>> store = createStore();
+    final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+        createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+    final StateNamespace nameSpace = StateNamespaces.global();
+    final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+
+    // prepare 5 timers.
+    // timers in memory are then timestamped from 0 - 1;
+    // timers in store are then timestamped from 0 - 4.
+    TimerInternals.TimerData timer;
+    for (int i = 0; i < 5; i++) {
+      timer =
+          TimerInternals.TimerData.of(
+              "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+      timerInternals.setTimer(timer);
+    }
+
+    timerInternalsFactory.setInputWatermark(new Instant(2));
+    Collection<KeyedTimerData<String>> readyTimers;
+
+    readyTimers = timerInternalsFactory.removeReadyTimers();
+    assertEquals(2, readyTimers.size());
+    assertEquals(2, timerInternalsFactory.getEventTimeBuffer().size());
+
+    store.close();
+  }
+
+  /**
+   * Test the total number of event time timers reloaded into memory is aligned with the number of
+   * event time timers written to the store.
+   */
+  @Test
+  public void testEventTimeTimersMemoryBoundary2() {
+    final SamzaPipelineOptions pipelineOptions =
+        PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+    pipelineOptions.setEventTimerBufferSize(2);
+
+    final KeyValueStore<ByteArray, StateValue<?>> store = createStore();
+    final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+        createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+    final StateNamespace nameSpace = StateNamespaces.global();
+    final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+
+    // prepare 3 timers.
+    // timers in memory now are timestamped from 0 - 1;
+    // timers in store now are timestamped from 0 - 2.
+    TimerInternals.TimerData timer;
+    for (int i = 0; i < 3; i++) {
+      timer =
+          TimerInternals.TimerData.of(
+              "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+      timerInternals.setTimer(timer);
+    }
+
+    // total number of event time timers to fire equals to the number of timers in store
+    Collection<KeyedTimerData<String>> readyTimers;
+    timerInternalsFactory.setInputWatermark(new Instant(3));
+    readyTimers = timerInternalsFactory.removeReadyTimers();
+    assertEquals(3, readyTimers.size());
+
+    store.close();
+  }
+
+  /**
+   * Test the total number of event time timers reloaded into memory is aligned with the number of
+   * the event time timers written to the store. Moreover, event time timers reloaded into memory is
+   * maintained in order.
+   */
+  @Test
+  public void testEventTimeTimersMemoryBoundary3() {
+    final SamzaPipelineOptions pipelineOptions =
+        PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+    pipelineOptions.setEventTimerBufferSize(5);
+
+    final KeyValueStore<ByteArray, StateValue<?>> store = createStore();
+    final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+        createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+    final StateNamespace nameSpace = StateNamespaces.global();
+    final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+
+    // prepare 8 timers.
+    // timers in memory now are timestamped from 0 - 4;
+    // timers in store now are timestamped from 0 - 7.
+    TimerInternals.TimerData timer;
+    for (int i = 0; i < 8; i++) {
+      timer =
+          TimerInternals.TimerData.of(
+              "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+      timerInternals.setTimer(timer);
+    }
+
+    // fire the first 2 timers.
+    // timers in memory now are timestamped from 2 - 4;
+    // timers in store now are timestamped from 2 - 7.
+    Collection<KeyedTimerData<String>> readyTimers;
+    timerInternalsFactory.setInputWatermark(new Instant(2));
+    long lastTimestamp = 0;
+    readyTimers = timerInternalsFactory.removeReadyTimers();
+    for (KeyedTimerData<String> keyedTimerData : readyTimers) {
+      final long currentTimeStamp = keyedTimerData.getTimerData().getTimestamp().getMillis();
+      assertTrue(lastTimestamp <= currentTimeStamp);
+      lastTimestamp = currentTimeStamp;
+    }
+    assertEquals(2, readyTimers.size());
+
+    // add another 12 timers.
+    // timers in memory (reloaded for three times) now are timestamped from 2 - 4; 5 - 9; 10 - 14;
+    // 15 - 19.
+    // timers in store now are timestamped from 2 - 19.
+    // the total number of timers to fire is 18.
+    for (int i = 8; i < 20; i++) {
+      timer =
+          TimerInternals.TimerData.of(
+              "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+      timerInternals.setTimer(timer);
+    }
+    timerInternalsFactory.setInputWatermark(new Instant(20));
+    lastTimestamp = 0;
+    readyTimers = timerInternalsFactory.removeReadyTimers();
+    for (KeyedTimerData<String> keyedTimerData : readyTimers) {
+      final long currentTimeStamp = keyedTimerData.getTimerData().getTimestamp().getMillis();
+      assertTrue(lastTimestamp <= currentTimeStamp);
+      lastTimestamp = currentTimeStamp;
+    }
+    assertEquals(18, readyTimers.size());
+
+    store.close();
+  }
+
+  /**
+   * Test the total number of event time timers reloaded into memory is aligned with the number of
+   * the event time timers written to the store. Moreover, event time timers reloaded into memory is
+   * maintained in order, even though memory boundary is hit and timer is early than the last timer
+   * in memory.
+   */
+  @Test
+  public void testEventTimeTimersMemoryBoundary4() {
+    final SamzaPipelineOptions pipelineOptions =
+        PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+    pipelineOptions.setEventTimerBufferSize(5);
+
+    final KeyValueStore<ByteArray, StateValue<?>> store = createStore();
+    final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+        createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+    final StateNamespace nameSpace = StateNamespaces.global();
+    final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+
+    // prepare 8 timers.
+    // timers in memory now are timestamped from 0 - 4;
+    // timers in store now are timestamped from 0 - 9.
+    TimerInternals.TimerData timer;
+    for (int i = 0; i < 10; i++) {
+      timer =
+          TimerInternals.TimerData.of(
+              "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+      timerInternals.setTimer(timer);
+    }
+
+    // fire the first 2 timers.
+    // timers in memory now are timestamped from 2 - 4;
+    // timers in store now are timestamped from 2 - 9.
+    Collection<KeyedTimerData<String>> readyTimers;
+    timerInternalsFactory.setInputWatermark(new Instant(2));
+    long lastTimestamp = 0;
+    readyTimers = timerInternalsFactory.removeReadyTimers();
+    for (KeyedTimerData<String> keyedTimerData : readyTimers) {
+      final long currentTimeStamp = keyedTimerData.getTimerData().getTimestamp().getMillis();
+      assertTrue(lastTimestamp <= currentTimeStamp);
+      lastTimestamp = currentTimeStamp;
+    }
+    assertEquals(2, readyTimers.size());
+
+    // add 3 timers.
+    // timers in memory now are timestamped from 0 to 2 prefixed with lateTimer, and 2 to
+    // 4 prefixed with timer, timestamp is in order;
+    // timers in store now are timestamped from 0 to 2 prefixed with lateTimer, and 2 to 9
+    // prefixed with timer, timestamp is in order;
+    for (int i = 0; i < 3; i++) {
+      timer =
+          TimerInternals.TimerData.of(
+              "lateTimer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+      timerInternals.setTimer(timer);
+    }
+
+    // there are 11 timers in state now.
+    // watermark 5 comes, so 6 timers will be evicted because their timestamp is less than 5.
+    // memory will be reloaded once to have 5 to 8 left (reload to have 4 to 8, but 4 is evicted), 5
+    // to 9 left in store.
+    // all of them are in order for firing.
+    timerInternalsFactory.setInputWatermark(new Instant(5));
+    lastTimestamp = 0;
+    readyTimers = timerInternalsFactory.removeReadyTimers();
+    for (KeyedTimerData<String> keyedTimerData : readyTimers) {
+      final long currentTimeStamp = keyedTimerData.getTimerData().getTimestamp().getMillis();
+      assertTrue(lastTimestamp <= currentTimeStamp);
+      lastTimestamp = currentTimeStamp;
+    }
+    assertEquals(6, readyTimers.size());
+    assertEquals(4, timerInternalsFactory.getEventTimeBuffer().size());
+
+    // watermark 10 comes, so all timers will be evicted in order.
+    timerInternalsFactory.setInputWatermark(new Instant(10));
+    readyTimers = timerInternalsFactory.removeReadyTimers();
+    for (KeyedTimerData<String> keyedTimerData : readyTimers) {
+      final long currentTimeStamp = keyedTimerData.getTimerData().getTimestamp().getMillis();
+      assertTrue(lastTimestamp <= currentTimeStamp);
+      lastTimestamp = currentTimeStamp;
+    }
+    assertEquals(5, readyTimers.size());
+    assertEquals(0, timerInternalsFactory.getEventTimeBuffer().size());
+
+    store.close();
+  }
+
+  /** Test buffer could still be filled after restore to a non-full state. */
+  @Test
+  public void testEventTimeTimersMemoryBoundary5() {
+    final SamzaPipelineOptions pipelineOptions =
+        PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+    pipelineOptions.setEventTimerBufferSize(5);
+
+    final KeyValueStore<ByteArray, StateValue<?>> store = createStore();
+    final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+        createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+    final StateNamespace nameSpace = StateNamespaces.global();
+    final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+
+    // prepare (buffer capacity + 1) 6 timers.
+    // timers in memory now are timestamped from 0 - 4;
+    // timer in store now is timestamped 6.
+    TimerInternals.TimerData timer;
+    for (int i = 0; i < 6; i++) {
+      timer =
+          TimerInternals.TimerData.of(
+              "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
+      timerInternals.setTimer(timer);
+    }
+
+    // total number of event time timers to fire equals to the number of timers in store
+    Collection<KeyedTimerData<String>> readyTimers;
+    timerInternalsFactory.setInputWatermark(new Instant(5));
+    readyTimers = timerInternalsFactory.removeReadyTimers();
+    assertEquals(5, readyTimers.size());
+    // reloaded timer5
+    assertEquals(1, timerInternalsFactory.getEventTimeBuffer().size());
+
+    for (int i = 0; i < 7; i++) {
+      timer =
+          TimerInternals.TimerData.of(
+              "timer" + (i + 6),
+              nameSpace,
+              new Instant(i + 6),
+              new Instant(i + 6),
+              TimeDomain.EVENT_TIME);
+      timerInternals.setTimer(timer);
+    }
+    // timers should go into buffer not state
+    assertEquals(5, timerInternalsFactory.getEventTimeBuffer().size());
+
+    // watermark 12 comes, so all timers will be evicted in order.
+    timerInternalsFactory.setInputWatermark(new Instant(11));
+    readyTimers = timerInternalsFactory.removeReadyTimers();
+    long lastTimestamp = 0;
+    for (KeyedTimerData<String> keyedTimerData : readyTimers) {
+      final long currentTimeStamp = keyedTimerData.getTimerData().getTimestamp().getMillis();
+      assertTrue(lastTimestamp <= currentTimeStamp);
+      lastTimestamp = currentTimeStamp;
+    }
+    assertEquals(6, readyTimers.size());
+    assertEquals(2, timerInternalsFactory.getEventTimeBuffer().size());
+
+    store.close();
+  }
+
   @Test
   public void testByteArray() {
     ByteArray key1 = ByteArray.of("hello world".getBytes(StandardCharsets.UTF_8));
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java
index 36317b1..867e7d4 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java
@@ -19,9 +19,11 @@ package org.apache.beam.runners.samza.translation;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Map;
+import java.util.Objects;
 import org.apache.beam.runners.samza.SamzaExecutionEnvironment;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.runners.samza.SamzaRunner;
@@ -32,6 +34,8 @@ import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
@@ -46,6 +50,7 @@ import org.apache.samza.job.yarn.YarnJobFactory;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.runtime.RemoteApplicationRunner;
 import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory;
 import org.apache.samza.zk.ZkJobCoordinatorFactory;
 import org.junit.Test;
 
@@ -58,7 +63,7 @@ public class ConfigGeneratorTest {
   private static final String JOB_FACTORY_CLASS = "job.factory.class";
 
   @Test
-  public void testBeamStoreConfig() {
+  public void testStatefulBeamStoreConfig() {
     SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
     options.setJobName("TestStoreConfig");
     options.setRunner(SamzaRunner.class);
@@ -77,7 +82,7 @@ public class ConfigGeneratorTest {
         RocksDbKeyValueStorageEngineFactory.class.getName(),
         config.get("stores.beamStore.factory"));
     assertEquals("byteArraySerde", config.get("stores.beamStore.key.serde"));
-    assertEquals("byteSerde", config.get("stores.beamStore.msg.serde"));
+    assertEquals("stateValueSerde", config.get("stores.beamStore.msg.serde"));
     assertNull(config.get("stores.beamStore.changelog"));
 
     options.setStateDurable(true);
@@ -88,6 +93,36 @@ public class ConfigGeneratorTest {
   }
 
   @Test
+  public void testStatelessBeamStoreConfig() {
+    SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+    options.setJobName("TestStoreConfig");
+    options.setRunner(SamzaRunner.class);
+
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline.apply(Impulse.create()).apply(Filter.by(Objects::nonNull));
+
+    pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
+
+    final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    final ConfigBuilder configBuilder = new ConfigBuilder(options);
+    SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
+    final Config config = configBuilder.build();
+
+    assertEquals(
+        InMemoryKeyValueStorageEngineFactory.class.getName(),
+        config.get("stores.beamStore.factory"));
+    assertEquals("byteArraySerde", config.get("stores.beamStore.key.serde"));
+    assertEquals("stateValueSerde", config.get("stores.beamStore.msg.serde"));
+    assertNull(config.get("stores.beamStore.changelog"));
+
+    options.setStateDurable(true);
+    SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
+    final Config config2 = configBuilder.build();
+    // For stateless jobs, ignore state durable pipeline option.
+    assertNull(config2.get("stores.beamStore.changelog"));
+  }
+
+  @Test
   public void testSamzaLocalExecutionEnvironmentConfig() {
     SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
     options.setJobName("TestEnvConfig");
@@ -207,7 +242,7 @@ public class ConfigGeneratorTest {
         RocksDbKeyValueStorageEngineFactory.class.getName(),
         config.get("stores.testState.factory"));
     assertEquals("byteArraySerde", config.get("stores.testState.key.serde"));
-    assertEquals("byteSerde", config.get("stores.testState.msg.serde"));
+    assertEquals("stateValueSerde", config.get("stores.testState.msg.serde"));
     assertNull(config.get("stores.testState.changelog"));
 
     options.setStateDurable(true);
@@ -216,4 +251,49 @@ public class ConfigGeneratorTest {
     assertEquals(
         "TestStoreConfig-1-testState-changelog", config2.get("stores.testState.changelog"));
   }
+
+  @Test
+  public void testDuplicateStateIdConfig() {
+    SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+    options.setJobName("TestStoreConfig");
+    options.setRunner(SamzaRunner.class);
+
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline
+        .apply(
+            Create.empty(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())))
+        .apply(
+            ParDo.of(
+                new DoFn<KV<String, String>, KV<String, String>>() {
+                  private static final String testState = "testState";
+
+                  @StateId(testState)
+                  private final StateSpec<ValueState<Integer>> state = StateSpecs.value();
+
+                  @ProcessElement
+                  public void processElement(
+                      ProcessContext context, @StateId(testState) ValueState<Integer> state) {
+                    context.output(context.element());
+                  }
+                }))
+        .apply(
+            ParDo.of(
+                new DoFn<KV<String, String>, Void>() {
+                  private static final String testState = "testState";
+
+                  @StateId(testState)
+                  private final StateSpec<ValueState<Integer>> state = StateSpecs.value();
+
+                  @ProcessElement
+                  public void processElement(
+                      ProcessContext context, @StateId(testState) ValueState<Integer> state) {}
+                }));
+
+    final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    final ConfigBuilder configBuilder = new ConfigBuilder(options);
+
+    assertThrows(
+        IllegalStateException.class,
+        () -> SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder));
+  }
 }
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java
new file mode 100644
index 0000000..8827f1e
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/TranslationContextTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.apache.beam.runners.samza.runtime.OpMessage;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.GenericSystemDescriptor;
+import org.junit.Test;
+
+@SuppressWarnings({"rawtypes"})
+public class TranslationContextTest {
+  private final GenericInputDescriptor testInputDescriptor =
+      new GenericSystemDescriptor("mockSystem", "mockFactoryClassName")
+          .getInputDescriptor("test-input-1", mock(Serde.class));
+  MapFunction<Object, String> keyFn = m -> m.toString();
+  MapFunction<Object, Object> valueFn = m -> m;
+  private final String streamName = "testStream";
+  KVSerde<Object, Object> serde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
+  StreamApplicationDescriptor streamApplicationDescriptor =
+      new StreamApplicationDescriptorImpl(
+          appDesc -> {
+            MessageStream inputStream = appDesc.getInputStream(testInputDescriptor);
+            inputStream.partitionBy(keyFn, valueFn, serde, streamName);
+          },
+          getConfig());
+  Map<PValue, String> idMap = new HashMap<>();
+  TranslationContext translationContext =
+      new TranslationContext(streamApplicationDescriptor, idMap, mock(SamzaPipelineOptions.class));
+
+  @Test
+  public void testRegisterInputMessageStreams() {
+    final PCollection output = mock(PCollection.class);
+    List<String> topics = Arrays.asList("stream1", "stream2");
+    List inputDescriptors =
+        topics.stream()
+            .map(topicName -> createSamzaInputDescriptor(topicName, topicName))
+            .collect(Collectors.toList());
+
+    translationContext.registerInputMessageStreams(output, inputDescriptors);
+
+    assertNotNull(translationContext.getMessageStream(output));
+  }
+
+  public GenericInputDescriptor<KV<String, OpMessage<?>>> createSamzaInputDescriptor(
+      String systemName, String streamId) {
+    final Serde<KV<String, OpMessage<?>>> kvSerde =
+        KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
+    return new GenericSystemDescriptor(systemName, "factoryClass")
+        .getInputDescriptor(streamId, kvSerde);
+  }
+
+  private static Config getConfig() {
+    HashMap<String, String> configMap = new HashMap<>();
+    configMap.put("job.name", "testJobName");
+    configMap.put("job.id", "testJobId");
+    return new MapConfig(configMap);
+  }
+}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/util/FutureUtilsTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/util/FutureUtilsTest.java
new file mode 100644
index 0000000..357fc6f
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/util/FutureUtilsTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Unit tests for {@linkplain FutureUtils}. */
+public final class FutureUtilsTest {
+  private static final List<String> RESULTS = ImmutableList.of("hello", "world");
+
+  @Test
+  public void testFlattenFuturesForCollection() {
+    CompletionStage<Collection<String>> resultFuture =
+        FutureUtils.flattenFutures(
+            ImmutableList.of(
+                CompletableFuture.completedFuture("hello"),
+                CompletableFuture.completedFuture("world")));
+
+    CompletionStage<Void> validationFuture =
+        resultFuture.thenAccept(
+            actualResults -> {
+              Assert.assertEquals(
+                  "Expected flattened results to contain {hello, world}", RESULTS, actualResults);
+            });
+
+    validationFuture.toCompletableFuture().join();
+  }
+
+  @Test
+  public void testFlattenFuturesForFailedFuture() {
+    CompletionStage<Collection<String>> resultFuture =
+        FutureUtils.flattenFutures(
+            ImmutableList.of(
+                CompletableFuture.completedFuture("hello"),
+                createFailedFuture(new RuntimeException())));
+
+    CompletionStage<Void> validationFuture =
+        resultFuture.handle(
+            (results, ex) -> {
+              Assert.assertTrue(
+                  "Expected exception to be of RuntimeException", ex instanceof RuntimeException);
+              return null;
+            });
+
+    validationFuture.toCompletableFuture().join();
+  }
+
+  @Test
+  public void testWaitForAllFutures() {
+    CountDownLatch latch = new CountDownLatch(1);
+    CompletionStage<Collection<String>> resultFuture =
+        FutureUtils.flattenFutures(
+            ImmutableList.of(
+                CompletableFuture.supplyAsync(
+                    () -> {
+                      try {
+                        latch.await();
+                      } catch (InterruptedException e) {
+                        return "";
+                      }
+
+                      return "hello";
+                    }),
+                CompletableFuture.supplyAsync(
+                    () -> {
+                      latch.countDown();
+                      return "world";
+                    })));
+
+    CompletionStage<Void> validationFuture =
+        resultFuture.thenAccept(
+            actualResults -> {
+              Assert.assertEquals(
+                  "Expected flattened results to contain {hello, world}", RESULTS, actualResults);
+            });
+
+    validationFuture.toCompletableFuture().join();
+  }
+
+  private static CompletionStage<String> createFailedFuture(Throwable t) {
+    CompletableFuture<String> future = new CompletableFuture<>();
+    future.completeExceptionally(t);
+    return future;
+  }
+}
diff --git a/website/www/site/content/en/documentation/runners/samza.md b/website/www/site/content/en/documentation/runners/samza.md
index 355d8c0..fc93254 100644
--- a/website/www/site/content/en/documentation/runners/samza.md
+++ b/website/www/site/content/en/documentation/runners/samza.md
@@ -162,6 +162,11 @@ When executing your pipeline with the Samza Runner, you can use the following pi
   <td><code>5000</code></td>
 </tr>
 <tr>
+  <td><code>eventTimerBufferSize</code></td>
+  <td>The maximum number of event-time timers to buffer in memory for a PTransform</td>
+  <td><code>5000</code></td>
+</tr>
+<tr>
   <td><code>maxSourceParallelism</code></td>
   <td>The maximum parallelism allowed for any data source.</td>
   <td><code>1</code></td>


Mime
View raw message