beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [01/11] beam git commit: NonNull by default in sdk/io
Date Fri, 10 Nov 2017 01:15:12 GMT
Repository: beam
Updated Branches:
  refs/heads/master b203af7fd -> e40e88252


NonNull by default in sdk/io


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/80ae9303
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/80ae9303
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/80ae9303

Branch: refs/heads/master
Commit: 80ae9303bd4c7326e18cd372da6abaf40435034a
Parents: e43edcd
Author: Kenneth Knowles <klk@google.com>
Authored: Mon Oct 23 19:44:59 2017 -0700
Committer: Kenneth Knowles <kenn@apache.org>
Committed: Thu Nov 9 15:01:54 2017 -0800

----------------------------------------------------------------------
 .../apex/translation/utils/ValuesSource.java    |  4 +++-
 .../UnboundedReadFromBoundedSource.java         |  4 ----
 .../streaming/io/UnboundedSocketSource.java     |  3 ++-
 .../gearpump/translators/io/ValuesSource.java   |  2 +-
 .../spark/stateful/StateSpecFunctions.java      |  9 +++-----
 .../java/org/apache/beam/sdk/io/AvroIO.java     |  7 +++---
 .../java/org/apache/beam/sdk/io/AvroSink.java   |  6 ++++-
 .../java/org/apache/beam/sdk/io/AvroSource.java | 20 ++++++++++------
 .../sdk/io/BoundedReadFromUnboundedSource.java  | 11 +++++----
 .../apache/beam/sdk/io/CompressedSource.java    |  4 ++++
 .../beam/sdk/io/ConstantAvroDestination.java    |  2 ++
 .../org/apache/beam/sdk/io/CountingSource.java  |  9 ++++++--
 .../beam/sdk/io/DefaultFilenamePolicy.java      |  6 ++---
 .../beam/sdk/io/DynamicFileDestinations.java    |  3 +++
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 24 ++++++++++++--------
 .../org/apache/beam/sdk/io/FileBasedSource.java |  9 ++++++--
 .../org/apache/beam/sdk/io/LocalResourceId.java |  4 +++-
 .../sdk/io/SerializableAvroCodecFactory.java    |  5 ++--
 .../java/org/apache/beam/sdk/io/TFRecordIO.java | 14 ++++++------
 .../java/org/apache/beam/sdk/io/TextIO.java     |  6 ++---
 .../java/org/apache/beam/sdk/io/TextSink.java   |  4 +++-
 .../java/org/apache/beam/sdk/io/TextSource.java |  7 +++---
 .../org/apache/beam/sdk/io/UnboundedSource.java | 12 ++++++++++
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 18 +++++++--------
 .../org/apache/beam/sdk/io/package-info.java    |  4 ++++
 25 files changed, 126 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
index 193da74..94a8cc1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
@@ -18,6 +18,8 @@
 
 package org.apache.beam.runners.apex.translation.utils;
 
+import static org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NOOP_CHECKPOINT_MARK;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -132,7 +134,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
 
     @Override
     public CheckpointMark getCheckpointMark() {
-      return null;
+      return NOOP_CHECKPOINT_MARK;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 24eb384..f5b1184 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -138,10 +138,6 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin,
PColle
         }
         List<? extends BoundedSource<T>> splits =
             boundedSource.split(desiredBundleSize, options);
-        if (splits == null) {
-          LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
-          return ImmutableList.of(this);
-        }
         return Lists.transform(
             splits,
             new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>()
{

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
index 49e4ddc..0368172 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NOOP_CHECKPOINT_MARK;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -238,7 +239,7 @@ public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.Check
 
     @Override
     public CheckpointMark getCheckpointMark() {
-      return null;
+      return NOOP_CHECKPOINT_MARK;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
index c018037..06047b6 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
@@ -146,7 +146,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
 
     @Override
     public CheckpointMark getCheckpointMark() {
-      return null;
+      return CheckpointMark.NOOP_CHECKPOINT_MARK;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index ca54715..2768654 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -191,12 +191,9 @@ public class StateSpecFunctions {
           @SuppressWarnings("unchecked")
           final CheckpointMarkT finishedReadCheckpointMark =
               (CheckpointMarkT) microbatchReader.getCheckpointMark();
-          byte[] codedCheckpoint = new byte[0];
-          if (finishedReadCheckpointMark != null) {
-            codedCheckpoint = CoderHelpers.toByteArray(finishedReadCheckpointMark, checkpointCoder);
-          } else {
-            LOG.info("Skipping checkpoint marking because the reader failed to supply one.");
-          }
+          byte[] codedCheckpoint = CoderHelpers.toByteArray(
+              finishedReadCheckpointMark, checkpointCoder);
+
           // persist the end-of-read (high) watermark for following read, where it will become
           // the next low watermark.
           state.update(new Tuple2<>(codedCheckpoint, highWatermark));

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 2cc0f52..eb13f3b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -877,7 +877,7 @@ public class AvroIO {
     @AutoValue.Builder
     abstract static class Builder<UserT, DestinationT, OutputT> {
       abstract Builder<UserT, DestinationT, OutputT> setFormatFunction(
-          SerializableFunction<UserT, OutputT> formatFunction);
+          @Nullable SerializableFunction<UserT, OutputT> formatFunction);
 
       abstract Builder<UserT, DestinationT, OutputT> setFilenamePrefix(
           ValueProvider<ResourceId> filenamePrefix);
@@ -889,7 +889,8 @@ public class AvroIO {
 
       abstract Builder<UserT, DestinationT, OutputT> setNumShards(int numShards);
 
-      abstract Builder<UserT, DestinationT, OutputT> setShardTemplate(String shardTemplate);
+      abstract Builder<UserT, DestinationT, OutputT> setShardTemplate(
+          @Nullable String shardTemplate);
 
       abstract Builder<UserT, DestinationT, OutputT> setGenericRecords(boolean genericRecords);
 
@@ -1010,7 +1011,7 @@ public class AvroIO {
      * used instead.
      */
     public TypedWrite<UserT, DestinationT, OutputT> withFormatFunction(
-        SerializableFunction<UserT, OutputT> formatFunction) {
+        @Nullable SerializableFunction<UserT, OutputT> formatFunction) {
       return toBuilder().setFormatFunction(formatFunction).build();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index 888db85..0ef4423 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
@@ -75,7 +76,10 @@ class AvroSink<UserT, DestinationT, OutputT> extends FileBasedSink<UserT,
Destin
 
   /** A {@link Writer Writer} for Avro files. */
   private static class AvroWriter<DestinationT, OutputT> extends Writer<DestinationT,
OutputT> {
-    private DataFileWriter<OutputT> dataFileWriter;
+
+    // Initialized in prepareWrite
+    @Nullable private DataFileWriter<OutputT> dataFileWriter;
+
     private final DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations;
     private final boolean genericRecords;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index a2610df..b8449fd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -498,8 +498,8 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     // The number of records in the block.
     private final long numRecords;
 
-    // The current record in the block.
-    private T currentRecord;
+    // The current record in the block. Initialized in readNextRecord.
+    @Nullable private T currentRecord;
 
     // The index of the current record in the block.
     private long currentRecordIndex = 0;
@@ -601,10 +601,12 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    */
   @Experimental(Experimental.Kind.SOURCE_SINK)
   public static class AvroReader<T> extends BlockBasedReader<T> {
-    private AvroMetadata metadata;
+    // Initialized in startReading.
+    @Nullable private AvroMetadata metadata;
 
     // The current block.
-    private AvroBlock<T> currentBlock;
+    // Initialized in readNextRecord.
+    @Nullable private AvroBlock<T> currentBlock;
 
     // A lock used to synchronize block offsets for getRemainingParallelism
     private final Object progressLock = new Object();
@@ -619,13 +621,17 @@ public class AvroSource<T> extends BlockBasedSource<T> {
 
     // Stream used to read from the underlying file.
     // A pushback stream is used to restore bytes buffered during seeking.
-    private PushbackInputStream stream;
+    // Initialized in startReading.
+    @Nullable private PushbackInputStream stream;
+
     // Counts the number of bytes read. Used only to tell how many bytes are taken up in
     // a block's variable-length header.
-    private CountingInputStream countStream;
+    // Initialized in startReading.
+    @Nullable private CountingInputStream countStream;
 
     // Caches the Avro DirectBinaryDecoder used to decode binary-encoded values from the
buffer.
-    private BinaryDecoder decoder;
+    // Initialized in readNextBlock.
+    @Nullable private BinaryDecoder decoder;
 
     /**
      * Reads Avro records of type {@code T} from the specified source.

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 80a03eb..f4d088e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -51,7 +51,8 @@ import org.joda.time.Instant;
 public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
{
   private final UnboundedSource<T, ?> source;
   private final long maxNumRecords;
-  private final Duration maxReadTime;
+  private final @Nullable Duration maxReadTime;
+
   private final BoundedSource<ValueWithRecordId<T>> adaptedSource;
   private static final FluentBackoff BACKOFF_FACTORY =
       FluentBackoff.DEFAULT
@@ -80,7 +81,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin,
PColle
   }
 
   BoundedReadFromUnboundedSource(
-      UnboundedSource<T, ?> source, long maxNumRecords, Duration maxReadTime) {
+      UnboundedSource<T, ?> source, long maxNumRecords, @Nullable Duration maxReadTime)
{
     this.source = source;
     this.maxNumRecords = maxNumRecords;
     this.maxReadTime = maxReadTime;
@@ -153,7 +154,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin,
PColle
     abstract static class Builder<T> {
       abstract Builder<T> setSource(UnboundedSource<T, ?> source);
       abstract Builder<T> setMaxNumRecords(long maxNumRecords);
-      abstract Builder<T> setMaxReadTime(Duration maxReadTime);
+      abstract Builder<T> setMaxReadTime(@Nullable Duration maxReadTime);
       abstract UnboundedToBoundedSourceAdapter<T> build();
     }
 
@@ -229,7 +230,9 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin,
PColle
 
     private class Reader extends BoundedReader<ValueWithRecordId<T>> {
       private long recordsRead = 0L;
-      private Instant endTime = Instant.now().plus(getMaxReadTime());
+
+      @Nullable private Instant endTime;
+
       private UnboundedSource.UnboundedReader<T> reader;
 
       private Reader(UnboundedSource.UnboundedReader<T> reader) {

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index ae55d80..937677c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
@@ -306,8 +307,11 @@ public class CompressedSource<T> extends FileBasedSource<T>
{
     private final Object progressLock = new Object();
     @GuardedBy("progressLock")
     private int numRecordsRead;
+
+    @Nullable // Initialized in startReading
     @GuardedBy("progressLock")
     private CountingChannel channel;
+
     private DecompressingChannelFactory channelFactory;
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
index b006e26..4f8677e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
@@ -89,11 +89,13 @@ class ConstantAvroDestination<UserT, OutputT>
   }
 
   @Override
+  @Nullable
   public Void getDestination(UserT element) {
     return (Void) null;
   }
 
   @Override
+  @Nullable
   public Void getDefaultDestination() {
     return (Void) null;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index d56bb5a..ed1c86f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.DefaultCoder;
@@ -390,8 +391,12 @@ public class CountingSource {
   private static class UnboundedCountingReader extends UnboundedReader<Long> {
     private UnboundedCountingSource source;
     private long current;
-    private Instant currentTimestamp;
-    private Instant firstStarted;
+
+    // Initialized on first advance()
+    @Nullable private Instant currentTimestamp;
+
+    // Initialized in start()
+    @Nullable private Instant firstStarted;
 
     private final Counter elementsRead = SourceMetrics.elementsRead();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 2f22e82..38ed733 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -83,7 +83,7 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
    * objects to be dynamically generated.
    */
   public static class Params implements Serializable {
-    private final ValueProvider<ResourceId> baseFilename;
+    @Nullable private final ValueProvider<ResourceId> baseFilename;
     private final String shardTemplate;
     private final boolean explicitTemplate;
     private final String suffix;
@@ -271,8 +271,8 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
       String suffix,
       int shardNum,
       int numShards,
-      String paneStr,
-      String windowStr) {
+      @Nullable String paneStr,
+      @Nullable String windowStr) {
     String prefix = extractFilename(baseFilename);
     // Matcher API works with StringBuffer, rather than StringBuilder.
     StringBuffer sb = new StringBuffer();

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
index b087bc5..779a928 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
@@ -50,16 +50,19 @@ public class DynamicFileDestinations {
     }
 
     @Override
+    @Nullable
     public Void getDestination(UserT element) {
       return (Void) null;
     }
 
     @Override
+    @Nullable
     public Coder<Void> getDestinationCoder() {
       return null;
     }
 
     @Override
+    @Nullable
     public Void getDefaultDestination() {
       return (Void) null;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 78ba071..830e16b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -231,7 +231,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view);
     }
 
-    private SideInputAccessor sideInputAccessor;
+    @Nullable private SideInputAccessor sideInputAccessor;
 
     static class SideInputAccessorViaProcessContext implements SideInputAccessor {
       private DoFn<?, ?>.ProcessContext processContext;
@@ -259,6 +259,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
      * #getSideInputs()}.
      */
     protected final <SideInputT> SideInputT sideInput(PCollectionView<SideInputT>
view) {
+      checkState(
+          sideInputAccessor != null,
+          "sideInput called on %s but side inputs have not been initialized",
+          getClass().getName());
       return sideInputAccessor.sideInput(view);
     }
 
@@ -804,18 +808,18 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     private final WriteOperation<DestinationT, OutputT> writeOperation;
 
     /** Unique id for this output bundle. */
-    private String id;
+    private @Nullable String id;
 
-    private BoundedWindow window;
-    private PaneInfo paneInfo;
+    private @Nullable BoundedWindow window;
+    private @Nullable PaneInfo paneInfo;
     private int shard = -1;
-    private DestinationT destination;
+    private @Nullable DestinationT destination;
 
     /** The output file for this bundle. May be null if opening failed. */
     private @Nullable ResourceId outputFile;
 
     /** The channel to write to. */
-    private WritableByteChannel channel;
+    private @Nullable WritableByteChannel channel;
 
     /**
      * The MIME type used in the creation of the output channel (if the file system supports
it).
@@ -825,7 +829,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
      * default but if {@link Compression#BZIP2} is set then the MIME type will be overridden
to
      * {@link MimeTypes#BINARY}.
      */
-    private final String mimeType;
+    @Nullable private final String mimeType;
 
     /** Construct a new {@link Writer} that will produce files of the given MIME type. */
     public Writer(WriteOperation<DestinationT, OutputT> writeOperation, String mimeType)
{
@@ -1027,7 +1031,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
   public static final class FileResult<DestinationT> {
     private final ResourceId tempFilename;
     private final int shard;
-    private final BoundedWindow window;
+    private final @Nullable BoundedWindow window;
     private final PaneInfo paneInfo;
     private final DestinationT destination;
 
@@ -1035,7 +1039,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     public FileResult(
         ResourceId tempFilename,
         int shard,
-        BoundedWindow window,
+        @Nullable BoundedWindow window,
         PaneInfo paneInfo,
         DestinationT destination) {
       this.tempFilename = tempFilename;
@@ -1058,7 +1062,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       return new FileResult<>(tempFilename, shard, window, paneInfo, destination);
     }
 
-    public BoundedWindow getWindow() {
+    public @Nullable BoundedWindow getWindow() {
       return window;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index dabda84..1d98c1e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -124,7 +124,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
     this.fileOrPatternSpec = StaticValueProvider.of(fileMetadata.resourceId().toString());
 
     // This field will be unused in this mode.
-    this.emptyMatchTreatment = null;
+    this.emptyMatchTreatment = EmptyMatchTreatment.DISALLOW;
   }
 
   /**
@@ -425,6 +425,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
    * methods defined here will not be accessed by more than one thread concurrently.
    */
   public abstract static class FileBasedReader<T> extends OffsetBasedReader<T>
{
+
+    // Initialized in startImpl
+    @Nullable
     private ReadableByteChannel channel = null;
 
     /**
@@ -532,7 +535,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
     private final FileBasedSource<T> source;
     private final List<FileBasedReader<T>> fileReaders;
     final ListIterator<FileBasedReader<T>> fileReadersIterator;
-    FileBasedReader<T> currentReader = null;
+
+    // Initialized in start
+    @Nullable FileBasedReader<T> currentReader = null;
 
     public FilePatternReader(FileBasedSource<T> source, List<FileBasedReader<T>>
fileReaders) {
       this.source = source;

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
index 4e61473..c21408a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
@@ -38,7 +38,9 @@ import org.apache.commons.lang3.SystemUtils;
 class LocalResourceId implements ResourceId {
 
   private final String pathString;
-  private transient volatile Path cachedPath;
+
+  @Nullable private transient volatile Path cachedPath;
+
   private final boolean isDirectory;
 
   static LocalResourceId fromPath(Path path, boolean isDirectory) {

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java
index 92c2a64..af22071 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java
@@ -34,6 +34,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import javax.annotation.Nullable;
 import org.apache.avro.file.CodecFactory;
 
 /**
@@ -48,9 +49,9 @@ class SerializableAvroCodecFactory implements Externalizable {
   private static final Pattern deflatePattern = Pattern.compile(DEFLATE_CODEC + "-(?<level>-?\\d)");
   private static final Pattern xzPattern = Pattern.compile(XZ_CODEC + "-(?<level>\\d)");
 
-  private CodecFactory codecFactory;
+  private @Nullable CodecFactory codecFactory;
 
-  // For java.io.Serializable only
+  // For java.io.Externalizable
   public SerializableAvroCodecFactory() {}
 
   public SerializableAvroCodecFactory(CodecFactory codecFactory) {

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 55287ca..1d33646 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -222,7 +222,7 @@ public class TFRecordIO {
 
       abstract Builder setShardTemplate(String shardTemplate);
 
-      abstract Builder setFilenameSuffix(String filenameSuffix);
+      abstract Builder setFilenameSuffix(@Nullable String filenameSuffix);
 
       abstract Builder setNumShards(int numShards);
 
@@ -448,9 +448,9 @@ public class TFRecordIO {
       private long startOfRecord;
       private volatile long startOfNextRecord;
       private volatile boolean elementIsPresent;
-      private byte[] currentValue;
-      private ReadableByteChannel inChannel;
-      private TFRecordCodec codec;
+      private @Nullable byte[] currentValue;
+      private @Nullable ReadableByteChannel inChannel;
+      private @Nullable TFRecordCodec codec;
 
       private TFRecordReader(TFRecordSource source) {
         super(source);
@@ -536,8 +536,8 @@ public class TFRecordIO {
 
     /** A {@link Writer Writer} for TFRecord files. */
     private static class TFRecordWriter extends Writer<Void, byte[]> {
-      private WritableByteChannel outChannel;
-      private TFRecordCodec codec;
+      private @Nullable WritableByteChannel outChannel;
+      private @Nullable TFRecordCodec codec;
 
       private TFRecordWriter(WriteOperation<Void, byte[]> writeOperation) {
         super(writeOperation, MimeTypes.BINARY);
@@ -586,7 +586,7 @@ public class TFRecordIO {
       return HEADER_LEN + data.length + FOOTER_LEN;
     }
 
-    public byte[] read(ReadableByteChannel inChannel) throws IOException {
+    public @Nullable byte[] read(ReadableByteChannel inChannel) throws IOException {
       header.clear();
       int headerBytes = inChannel.read(header);
       if (headerBytes <= 0) {

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index fb01634..865875d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -638,7 +638,7 @@ public class TextIO {
           ValueProvider<ResourceId> filenamePrefix);
 
       abstract Builder<UserT, DestinationT> setTempDirectory(
-          ValueProvider<ResourceId> tempDirectory);
+          @Nullable ValueProvider<ResourceId> tempDirectory);
 
       abstract Builder<UserT, DestinationT> setShardTemplate(@Nullable String shardTemplate);
 
@@ -660,7 +660,7 @@ public class TextIO {
       abstract Builder<UserT, DestinationT> setEmptyDestination(Params emptyDestination);
 
       abstract Builder<UserT, DestinationT> setFormatFunction(
-          SerializableFunction<UserT, String> formatFunction);
+          @Nullable SerializableFunction<UserT, String> formatFunction);
 
       abstract Builder<UserT, DestinationT> setNumShards(int numShards);
 
@@ -755,7 +755,7 @@ public class TextIO {
      * used instead.
      */
     public TypedWrite<UserT, DestinationT> withFormatFunction(
-        SerializableFunction<UserT, String> formatFunction) {
+        @Nullable SerializableFunction<UserT, String> formatFunction) {
       return toBuilder().setFormatFunction(formatFunction).build();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
index 387e0ac..24f63a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -77,7 +77,9 @@ class TextSink<UserT, DestinationT> extends FileBasedSink<UserT,
DestinationT, S
     private static final String NEWLINE = "\n";
     @Nullable private final String header;
     @Nullable private final String footer;
-    private OutputStreamWriter out;
+
+    // Initialized in prepareWrite
+    @Nullable private OutputStreamWriter out;
 
     public TextWriter(
         WriteOperation<DestinationT, String> writeOperation,

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index f3e4f77..cb238ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
@@ -97,9 +98,9 @@ class TextSource extends FileBasedSource<String> {
     private volatile long startOfNextRecord;
     private volatile boolean eof;
     private volatile boolean elementIsPresent;
-    private String currentValue;
-    private ReadableByteChannel inChannel;
-    private byte[] delimiter;
+    private @Nullable String currentValue;
+    private @Nullable ReadableByteChannel inChannel;
+    private @Nullable byte[] delimiter;
 
     private TextBasedReader(TextSource source, byte[] delimiter) {
       super(source);

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
index b99d5ee..c3507fd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
@@ -127,6 +127,18 @@ public abstract class UnboundedSource<
      * </ul>
      */
     void finalizeCheckpoint() throws IOException;
+
+    NoopCheckpointMark NOOP_CHECKPOINT_MARK = new NoopCheckpointMark();
+
+    /**
+     * A checkpoint mark that does nothing when finalized.
+     */
+    final class NoopCheckpointMark implements UnboundedSource.CheckpointMark {
+      @Override
+      public void finalizeCheckpoint() throws IOException {
+        // nothing to do
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 72ce5d0..7e37e78 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -125,14 +125,13 @@ public class WriteFiles<UserT, DestinationT, OutputT>
 
   static final int UNKNOWN_SHARDNUM = -1;
   private FileBasedSink<UserT, DestinationT, OutputT> sink;
-  private WriteOperation<DestinationT, OutputT> writeOperation;
+  private @Nullable WriteOperation<DestinationT, OutputT> writeOperation;
   // This allows the number of shards to be dynamically computed based on the input
   // PCollection.
-  @Nullable private final PTransform<PCollection<UserT>, PCollectionView<Integer>>
computeNumShards;
+  private final @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>>
computeNumShards;
   // We don't use a side input for static sharding, as we want this value to be updatable
   // when a pipeline is updated.
-  @Nullable
-  private final ValueProvider<Integer> numShardsProvider;
+  private final @Nullable ValueProvider<Integer> numShardsProvider;
   private final boolean windowedWrites;
   private int maxNumWritersPerBundle;
   // This is the set of side inputs used by this transform. This is usually populated by
the users's
@@ -231,12 +230,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    * {@link #withSharding(PTransform)}), or runner-determined (by {@link
    * #withRunnerDeterminedSharding()}.
    */
-  @Nullable
-  public PTransform<PCollection<UserT>, PCollectionView<Integer>> getSharding()
{
+  public @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>>
getSharding() {
     return computeNumShards;
   }
 
-  public ValueProvider<Integer> getNumShards() {
+  public @Nullable ValueProvider<Integer> getNumShards() {
     return numShardsProvider;
   }
 
@@ -386,7 +384,9 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     private final Coder<DestinationT> destinationCoder;
     private final boolean windowedWrites;
 
-    private Map<WriterKey<DestinationT>, Writer<DestinationT, OutputT>>
writers;
+    // Initialized in startBundle()
+    private @Nullable Map<WriterKey<DestinationT>, Writer<DestinationT, OutputT>>
writers;
+
     private int spilledShardNum = UNKNOWN_SHARDNUM;
 
     WriteBundles(
@@ -562,7 +562,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   }
 
   private class ApplyShardingKey extends DoFn<UserT, KV<ShardedKey<Integer>,
UserT>> {
-    private final PCollectionView<Integer> numShardsView;
+    private final @Nullable PCollectionView<Integer> numShardsView;
     private final ValueProvider<Integer> numShardsProvider;
     private final Coder<DestinationT> destinationCoder;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/80ae9303/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
index dd6d009..6f57b98 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
@@ -32,4 +32,8 @@
  * numbers.apply(TextIO.write().to("gs://my_bucket/path/to/numbers"));
  * } </pre>
  */
+@DefaultAnnotation(NonNull.class)
 package org.apache.beam.sdk.io;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;


Mime
View raw message