crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [crunch] branch master updated: CRUNCH-678: Avoid unnecessary last modified time retrieval
Date Wed, 20 Feb 2019 00:25:37 GMT
This is an automated email from the ASF dual-hosted git repository.

jwills pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/crunch.git


The following commit(s) were added to refs/heads/master by this push:
     new 571b90c  CRUNCH-678: Avoid unnecessary last modified time retrieval
571b90c is described below

commit 571b90c03e3010e7bb9badf4e6e441ab2164be56
Author: Andrew Olson <aolson1@cerner.com>
AuthorDate: Tue Feb 19 16:46:20 2019 -0600

    CRUNCH-678: Avoid unnecessary last modified time retrieval
    
    Signed-off-by: Josh Wills <jwills@apache.org>
---
 crunch-core/src/main/java/org/apache/crunch/Target.java       |  2 +-
 .../java/org/apache/crunch/impl/dist/DistributedPipeline.java |  6 ++++--
 .../apache/crunch/impl/dist/collect/BaseUnionCollection.java  | 11 ++++++++---
 .../org/apache/crunch/impl/dist/collect/BaseUnionTable.java   | 11 ++++++++---
 .../src/main/java/org/apache/crunch/impl/mem/MemPipeline.java |  2 ++
 .../main/java/org/apache/crunch/io/impl/FileTargetImpl.java   |  6 +++++-
 6 files changed, 28 insertions(+), 10 deletions(-)

diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java b/crunch-core/src/main/java/org/apache/crunch/Target.java
index 03b6eef..4dec831 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Target.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Target.java
@@ -72,7 +72,7 @@ public interface Target {
    * 
    * @param writeMode The strategy for handling existing outputs
    * @param lastModifiedAt the time of the most recent modification to one of the source
inputs for handling based
-   *                       on the provided {@code writeMode}.
+   *                       on the provided {@code writeMode}, or -1 if not relevant for the
provided {@code writeMode}
    * @param conf The ever-useful {@code Configuration} instance
    * @return true if the target did exist
    */
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
index 0afa766..abd318c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
@@ -238,8 +238,10 @@ public abstract class DistributedPipeline implements Pipeline {
       pcollection = pcollection.parallelDo("UnionCollectionWrapper",
           (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
     }
-    boolean exists = target.handleExisting(writeMode, ((PCollectionImpl) pcollection).getLastModifiedAt(),
-        getConfiguration());
+    // Last modified time is only relevant when write mode is checkpoint
+    long lastModifiedAt = (writeMode == Target.WriteMode.CHECKPOINT)
+        ? ((PCollectionImpl) pcollection).getLastModifiedAt() : -1;
+    boolean exists = target.handleExisting(writeMode, lastModifiedAt, getConfiguration());
     if (exists && writeMode == Target.WriteMode.CHECKPOINT) {
       SourceTarget<?> st = target.asSourceTarget(pcollection.getPType());
       if (st == null) {
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionCollection.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionCollection.java
index ef10ee7..855bd18 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionCollection.java
@@ -50,9 +50,6 @@ public class BaseUnionCollection<S> extends PCollectionImpl<S>
{
         throw new IllegalStateException("Cannot union PCollections from different Pipeline
instances");
       }
       size += parent.getSize();
-      if (parent.getLastModifiedAt() > lastModifiedAt) {
-        this.lastModifiedAt = parent.getLastModifiedAt();
-      }
     }
   }
 
@@ -72,6 +69,14 @@ public class BaseUnionCollection<S> extends PCollectionImpl<S>
{
 
   @Override
   public long getLastModifiedAt() {
+    if (lastModifiedAt == -1) {
+      for (PCollectionImpl<S> parent : parents) {
+        long parentLastModifiedAt = parent.getLastModifiedAt();
+        if (parentLastModifiedAt > lastModifiedAt) {
+          lastModifiedAt = parentLastModifiedAt;
+        }
+      }
+    }
     return lastModifiedAt;
   }
   
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionTable.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionTable.java
index 4d688c3..58617fa 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionTable.java
@@ -56,9 +56,6 @@ public class BaseUnionTable<K, V> extends PTableBase<K, V> {
       }
       this.parents.add(parent);
       size += parent.getSize();
-      if (parent.getLastModifiedAt() > lastModifiedAt) {
-        this.lastModifiedAt = parent.getLastModifiedAt();
-      }
     }
   }
 
@@ -77,6 +74,14 @@ public class BaseUnionTable<K, V> extends PTableBase<K, V>
{
 
   @Override
   public long getLastModifiedAt() {
+    if (lastModifiedAt == -1) {
+      for (PCollectionImpl<Pair<K, V>> parent : parents) {
+        long parentLastModifiedAt = parent.getLastModifiedAt();
+        if (parentLastModifiedAt > lastModifiedAt) {
+          lastModifiedAt = parentLastModifiedAt;
+        }
+      }
+    }
     return lastModifiedAt;
   }
   
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 4d4d5dd..9b1345c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -200,6 +200,8 @@ public class MemPipeline implements Pipeline {
 
   @Override
   public void write(PCollection<?> collection, Target target, Target.WriteMode writeMode)
{
+    // Last modified time does not need to be retrieved for this
+    // pipeline implementation
     target.handleExisting(writeMode, -1, getConfiguration());
     if (writeMode != Target.WriteMode.APPEND && activeTargets.contains(target)) {
       throw new CrunchRuntimeException("Target " + target
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index e8b1dfe..17efabb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -373,7 +373,11 @@ public class FileTargetImpl implements PathTarget {
       exists = fs.exists(path);
       if (exists) {
         successful = fs.exists(getSuccessIndicator());
-        lastModForTarget = SourceTargetHelper.getLastModifiedAt(fs, path);
+        // Last modified time is only relevant when the path exists and the
+        // write mode is checkpoint
+        if (successful && strategy == WriteMode.CHECKPOINT) {
+          lastModForTarget = SourceTargetHelper.getLastModifiedAt(fs, path);
+        }
       }
     } catch (IOException e) {
       LOG.error("Exception checking existence of path: {}", path, e);


Mime
View raw message