beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/4] incubator-beam git commit: [BEAM-616] Update Flink Runner to Flink 1.1.2
Date Thu, 08 Sep 2016 08:12:55 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master f33296c7f -> fb322cc73


[BEAM-616] Update Flink Runner to Flink 1.1.2


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c66caf33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c66caf33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c66caf33

Branch: refs/heads/master
Commit: c66caf3364ae5c20bdf0fbf41a8ef61d4e53c495
Parents: f33296c
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Sep 5 18:17:11 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Sep 8 10:10:50 2016 +0200

----------------------------------------------------------------------
 runners/flink/pom.xml                           |  2 +-
 runners/flink/runner/pom.xml                    |  8 +++++++
 .../wrappers/streaming/DoFnOperator.java        | 24 ++++++++++++--------
 .../wrappers/streaming/FlinkStateInternals.java |  8 ++++---
 .../wrappers/streaming/WindowDoFnOperator.java  |  4 ++--
 5 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index b2f3aaa..68e82d2 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -39,7 +39,7 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-    <flink.version>1.0.3</flink.version>
+    <flink.version>1.1.2</flink.version>
   </properties>
 
   <repositories>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 7c32280..8759591 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -127,6 +127,14 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-runtime_2.10</artifactId>
+      <version>${flink.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
 
     <!-- Beam -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 3b917e2..79aab9c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -199,7 +199,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       if (restoredSideInputState != null) {
         @SuppressWarnings("unchecked,rawtypes")
         HashMap<String, KvStateSnapshot> castRestored = (HashMap) restoredSideInputState;
-        sideInputStateBackend.injectKeyValueStateSnapshots(castRestored, 0L);
+        sideInputStateBackend.injectKeyValueStateSnapshots(castRestored);
         restoredSideInputState = null;
       }
 
@@ -306,15 +306,19 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
             pushedBackDescriptor);
 
     List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
-    for (WindowedValue<InputT> elem: pushedBack.get()) {
 
-      // we need to set the correct key in case the operator is
-      // a (keyed) window operator
-      setKeyContextElement1(new StreamRecord<>(elem));
+    Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.get();
+    if (pushedBackContents != null) {
+      for (WindowedValue<InputT> elem : pushedBackContents) {
 
-      Iterable<WindowedValue<InputT>> justPushedBack =
-          pushbackDoFnRunner.processElementInReadyWindows(elem);
-      Iterables.addAll(newPushedBack, justPushedBack);
+        // we need to set the correct key in case the operator is
+        // a (keyed) window operator
+        setKeyContextElement1(new StreamRecord<>(elem));
+
+        Iterable<WindowedValue<InputT>> justPushedBack =
+            pushbackDoFnRunner.processElementInReadyWindows(elem);
+        Iterables.addAll(newPushedBack, justPushedBack);
+      }
     }
 
 
@@ -385,8 +389,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   }
 
   @Override
-  public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception
{
-    super.restoreState(state, recoveryTimestamp);
+  public void restoreState(StreamTaskState state) throws Exception {
+    super.restoreState(state);
 
     @SuppressWarnings("unchecked,rawtypes")
     StateHandle<HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>>> sideInputStateHandle
=

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
index 2e10400..27e3dc6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
@@ -307,10 +307,12 @@ public class FlinkStateInternals<K> implements StateInternals<K>
{
     @Override
     public Iterable<T> read() {
       try {
-        return flinkStateBackend.getPartitionedState(
+        Iterable<T> result = flinkStateBackend.getPartitionedState(
             namespace.stringKey(),
             StringSerializer.INSTANCE,
             flinkStateDescriptor).get();
+
+        return result != null ? result : Collections.<T>emptyList();
       } catch (Exception e) {
         throw new RuntimeException("Error reading state.", e);
       }
@@ -326,7 +328,7 @@ public class FlinkStateInternals<K> implements StateInternals<K>
{
                 namespace.stringKey(),
                 StringSerializer.INSTANCE,
                 flinkStateDescriptor).get();
-            return Iterables.isEmpty(result);
+            return result == null;
           } catch (Exception e) {
             throw new RuntimeException("Error reading state.", e);
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c66caf33/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 29ae6ae..8b3365d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -248,8 +248,8 @@ public class WindowDoFnOperator<K, InputT, OutputT>
   }
 
   @Override
-  public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception
{
-    super.restoreState(state, recoveryTimestamp);
+  public void restoreState(StreamTaskState state) throws Exception {
+    super.restoreState(state);
 
     @SuppressWarnings("unchecked")
     StateHandle<DataInputView> operatorState =


Mime
View raw message