crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mkw...@apache.org
Subject [crunch] branch master updated: CRUNCH-685 Use whitelist and blacklist for .fileSystem() properties (#25)
Date Fri, 12 Jul 2019 21:37:02 GMT
This is an automated email from the ASF dual-hosted git repository.

mkwhit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/crunch.git


The following commit(s) were added to refs/heads/master by this push:
     new ef0c7e8  CRUNCH-685 Use whitelist and blacklist for .fileSystem() properties (#25)
ef0c7e8 is described below

commit ef0c7e882921c2c99fcd0feb9a50d8438c327822
Author: Ben Roling <ben.roling@gmail.com>
AuthorDate: Fri Jul 12 16:36:57 2019 -0500

    CRUNCH-685 Use whitelist and blacklist for .fileSystem() properties (#25)
    
    * CRUNCH-685 Use whitelist and blacklist for .fileSystem() properties
    
    * CRUNCH-685 fix noisy logging
    
    * CRUNCH-686 Fix FormatBundle to hide redacted properties
---
 .../org/apache/crunch/ExternalFilesystemIT.java    |   2 +-
 .../java/org/apache/crunch/io/FormatBundle.java    | 166 +++++++++++++++++++--
 .../org/apache/crunch/io/impl/FileSourceImpl.java  |  13 +-
 .../org/apache/crunch/io/impl/FileTargetImpl.java  |  12 +-
 .../org/apache/crunch/io/FormatBundleTest.java     | 138 +++++++++++++++++
 5 files changed, 298 insertions(+), 33 deletions(-)

diff --git a/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java b/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
index 75a2837..0ca396c 100644
--- a/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
@@ -186,7 +186,7 @@ public class ExternalFilesystemIT {
     }
 
     private static Configuration getDfsConf(String nsName, MiniDFSCluster cluster) {
-        Configuration conf = new Configuration();
+        Configuration conf = new Configuration(false);
         conf.set("dfs.nameservices", nsName);
         conf.set("dfs.client.failover.proxy.provider." + nsName,
             "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
index 0b50080..1a89f8d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
@@ -17,6 +17,8 @@
  */
 package org.apache.crunch.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -25,25 +27,31 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
-
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.regex.Pattern;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.OutputFormat;
-
-import com.google.common.collect.Maps;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A combination of an {@link InputFormat} or {@link OutputFormat} and any extra 
@@ -55,9 +63,30 @@ import com.google.common.collect.Maps;
  */
 public class FormatBundle<K> implements Serializable, Writable, Configurable {
 
+  private final Logger LOG = LoggerFactory.getLogger(FormatBundle.class);
+  /**
+   * A comma-separated list of properties whose value will be redacted.
+   * MR config to redact job conf properties: https://issues.apache.org/jira/browse/MAPREDUCE-6741
+   */
+  private static final String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties";
+  private static final String REDACTION_REPLACEMENT_VAL = "*********(redacted)";
+
+  private final String FILESYSTEM_BLACKLIST_PATTERNS_KEY = "crunch.fs.props.blacklist.patterns";
+  private final String[] FILESYSTEM_BLACKLIST_PATTERNS_DEFAULT =
+      new String[] {
+          "^fs\\.defaultFS$",
+          "^fs\\.default\\.name$"};
+
+  private final String FILESYSTEM_WHITELIST_PATTERNS_KEY = "crunch.fs.props.whitelist.patterns";
+  private final String[] FILESYSTEM_WHITELIST_PATTERNS_DEFAULT =
+      new String[] {
+          "^fs\\..*",
+          "^dfs\\..*"};
+
   private Class<K> formatClass;
   private Map<String, String> extraConf;
   private Configuration conf;
+  private FileSystem fileSystem;
   
   public static <T> FormatBundle<T> fromSerialized(String serialized, Configuration
conf) {
     ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serialized));
@@ -82,8 +111,9 @@ public class FormatBundle<K> implements Serializable, Writable, Configurable
{
   public FormatBundle() {
     // For Writable support
   }
-  
-  private FormatBundle(Class<K> formatClass) {
+
+  @VisibleForTesting
+  FormatBundle(Class<K> formatClass) {
     this.formatClass = formatClass;
     this.extraConf = Maps.newHashMap();
   }
@@ -93,26 +123,121 @@ public class FormatBundle<K> implements Serializable, Writable,
Configurable {
     return this;
   }
 
+  public FormatBundle<K> setFileSystem(FileSystem fileSystem) {
+    this.fileSystem = fileSystem;
+    return this;
+  }
+
+  public FileSystem getFileSystem() {
+    return fileSystem;
+  }
+
   public Class<K> getFormatClass() {
     return formatClass;
   }
 
   public Configuration configure(Configuration conf) {
+    // first configure fileystem properties
+    Map<String, String> appliedFsProperties = configureFileSystem(conf);
+
+    // then apply extraConf properties
     for (Map.Entry<String, String> e : extraConf.entrySet()) {
       String key = e.getKey();
       String value = e.getValue();
-      // merge the value if it is DFS_NAMESERVICES to support additional filesystems
+      conf.set(key, value);
+      if (appliedFsProperties.get(key) != null) {
+        LOG.info("{}={} from extraConf overrode {}={} from filesystem conf",
+            new Object[] {key, value, key, appliedFsProperties.get(key)});
+      }
+    }
+    return conf;
+  }
+
+  private Map<String,String> configureFileSystem(Configuration conf) {
+    if (fileSystem == null) {
+      return Collections.emptyMap();
+    }
+
+    Collection<Pattern> blacklistPatterns =
+        compilePatterns(
+            conf.getStrings(FILESYSTEM_BLACKLIST_PATTERNS_KEY,
+                FILESYSTEM_BLACKLIST_PATTERNS_DEFAULT));
+    Collection<Pattern> whitelistPatterns =
+        compilePatterns(
+            conf.getStrings(FILESYSTEM_WHITELIST_PATTERNS_KEY,
+                FILESYSTEM_WHITELIST_PATTERNS_DEFAULT));
+
+    Configuration fileSystemConf = fileSystem.getConf();
+    Map<String, String> appliedProperties = new HashMap<>();
+    Collection<String> redactedProperties = conf.getTrimmedStringCollection(MR_JOB_REDACTED_PROPERTIES);
+
+    for (Entry<String, String> e : fileSystemConf) {
+      String key = e.getKey();
+      String value = fileSystemConf.get(key);
+      String originalValue = conf.get(key);
+
+      if (value.equals(originalValue)) {
+        continue;
+      }
+
+      Pattern matchingBlacklistPattern = matchingPattern(key, blacklistPatterns);
+      if (matchingBlacklistPattern != null) {
+        LOG.info("{}={} matches blacklist pattern '{}', omitted",
+            new Object[] {key, value, matchingBlacklistPattern});
+        continue;
+      }
+      Pattern matchingWhitelistPattern = matchingPattern(key, whitelistPatterns);
+      if (matchingWhitelistPattern == null) {
+        LOG.info("{}={} matches no whitelist pattern from {}, omitted",
+            new Object[] {key, value, whitelistPatterns});
+        continue;
+      }
+
       if (key.equals(DFSConfigKeys.DFS_NAMESERVICES)) {
-        String[] originalValue = conf.getStrings(key);
+        String[] originalArrayValue = conf.getStrings(key);
         if (originalValue != null) {
           String[] newValue = value != null ? value.split(",") : new String[0];
-          conf.setStrings(key, mergeValues(originalValue, newValue));
+          String[] merged = mergeValues(originalArrayValue, newValue);
+          LOG.info("Merged '{}' into '{}' with result '{}'",
+              new Object[] {newValue, DFSConfigKeys.DFS_NAMESERVICES, merged});
+          conf.setStrings(key, merged);
+          appliedProperties.put(key, StringUtils.arrayToString(merged));
           continue;
         }
       }
+
+      String message = "Applied {}={} from FS '{}'";
+      if (originalValue != null) {
+        message += ", overriding '{}'";
+      }
+      if (redactedProperties.contains(key)) {
+        LOG.info(message,
+            new Object[]{key, REDACTION_REPLACEMENT_VAL, fileSystem.getUri(), REDACTION_REPLACEMENT_VAL});
+      } else {
+        LOG.info(message,
+            new Object[]{key, value, fileSystem.getUri(), originalValue});
+      }
       conf.set(key, value);
+      appliedProperties.put(key, value);
     }
-    return conf;
+    return appliedProperties;
+  }
+
+  private static Pattern matchingPattern(String s, Collection<Pattern> patterns) {
+    for (Pattern pattern : patterns) {
+      if (pattern.matcher(s).find()) {
+        return pattern;
+      }
+    }
+    return null;
+  }
+
+  private static Collection<Pattern> compilePatterns(String[] patterns) {
+    Collection<Pattern> compiledPatterns = new ArrayList<>(patterns.length);
+    for (String pattern : patterns) {
+      compiledPatterns.add(Pattern.compile(pattern));
+    }
+    return compiledPatterns;
   }
 
   private static String[] mergeValues(String[] value1, String[] value2) {
@@ -139,7 +264,9 @@ public class FormatBundle<K> implements Serializable, Writable,
Configurable {
 
   @Override
   public int hashCode() {
-    return new HashCodeBuilder().append(formatClass).append(extraConf).toHashCode();
+    return new HashCodeBuilder().append(formatClass)
+        .append(fileSystem)
+        .append(extraConf).toHashCode();
   }
 
   @Override
@@ -148,7 +275,9 @@ public class FormatBundle<K> implements Serializable, Writable,
Configurable {
       return false;
     }
     FormatBundle<K> oib = (FormatBundle<K>) other;
-    return Objects.equals(formatClass, oib.formatClass) && Objects.equals(extraConf,
oib.extraConf);
+    return Objects.equals(formatClass, oib.formatClass)
+        && Objects.equals(fileSystem, oib.fileSystem)
+        && Objects.equals(extraConf, oib.extraConf);
   }
 
   @Override
@@ -161,6 +290,12 @@ public class FormatBundle<K> implements Serializable, Writable,
Configurable {
       String value = Text.readString(in);
       extraConf.put(key, value);
     }
+    if (in.readBoolean()) {
+      String fileSystemUri = Text.readString(in);
+      Configuration filesystemConf = new Configuration(false);
+      filesystemConf.readFields(in);
+      this.fileSystem = FileSystem.get(URI.create(fileSystemUri), filesystemConf);
+    }
   }
 
   @Override
@@ -171,6 +306,11 @@ public class FormatBundle<K> implements Serializable, Writable,
Configurable {
       Text.writeString(out, e.getKey());
       Text.writeString(out, e.getValue());
     }
+    out.writeBoolean(fileSystem != null);
+    if (fileSystem != null) {
+      Text.writeString(out, fileSystem.getUri().toString());
+      fileSystem.getConf().write(out);
+    }
   }
   
   private Class readClass(DataInput in) throws IOException {
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index 98c0fb8..7788da5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -55,7 +55,6 @@ public class FileSourceImpl<T> implements ReadableSource<T>
{
   protected List<Path> paths;
   protected final PType<T> ptype;
   protected final FormatBundle<? extends InputFormat> inputBundle;
-  private FileSystem fileSystem;
 
   public FileSourceImpl(Path path, PType<T> ptype, Class<? extends InputFormat>
inputFormatClass) {
     this(path, ptype, FormatBundle.forInput(inputFormatClass));
@@ -91,7 +90,7 @@ public class FileSourceImpl<T> implements ReadableSource<T>
{
 
   @Override
   public FileSystem getFileSystem() {
-    return fileSystem;
+    return inputBundle.getFileSystem();
   }
 
   @Override
@@ -102,23 +101,17 @@ public class FileSourceImpl<T> implements ReadableSource<T>
{
 
   @Override
   public Source<T> fileSystem(FileSystem fileSystem) {
-    if (this.fileSystem != null) {
+    if (inputBundle.getFileSystem() != null) {
       throw new IllegalStateException("Filesystem already set. Change is not supported.");
     }
 
-    this.fileSystem = fileSystem;
-
     if (fileSystem != null) {
       List<Path> qualifiedPaths = new ArrayList<>(paths.size());
       for (Path path : paths) {
         qualifiedPaths.add(fileSystem.makeQualified(path));
       }
       paths = qualifiedPaths;
-
-      Configuration fsConf = fileSystem.getConf();
-      for (Entry<String, String> entry : fsConf) {
-        inputBundle.set(entry.getKey(), entry.getValue());
-      }
+      inputBundle.setFileSystem(fileSystem);
     }
     return this;
   }
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index d48ac31..fc3d2a8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -69,7 +69,6 @@ public class FileTargetImpl implements PathTarget {
   protected Path path;
   private final FormatBundle<? extends FileOutputFormat> formatBundle;
   private final FileNamingScheme fileNamingScheme;
-  private FileSystem fileSystem;
 
   public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
                         FileNamingScheme fileNamingScheme) {
@@ -96,26 +95,21 @@ public class FileTargetImpl implements PathTarget {
 
   @Override
   public Target fileSystem(FileSystem fileSystem) {
-    if (this.fileSystem != null) {
+    if (formatBundle.getFileSystem() != null) {
       throw new IllegalStateException("Filesystem already set. Change is not supported.");
     }
 
     if (fileSystem != null) {
       path = fileSystem.makeQualified(path);
 
-      this.fileSystem = fileSystem;
-
-      Configuration fsConf = fileSystem.getConf();
-      for (Entry<String, String> entry : fsConf) {
-        formatBundle.set(entry.getKey(), entry.getValue());
-      }
+      formatBundle.setFileSystem(fileSystem);
     }
     return this;
   }
 
   @Override
   public FileSystem getFileSystem() {
-    return fileSystem;
+    return formatBundle.getFileSystem();
   }
 
   @Override
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/FormatBundleTest.java b/crunch-core/src/test/java/org/apache/crunch/io/FormatBundleTest.java
new file mode 100644
index 0000000..7949a52
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/FormatBundleTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.hamcrest.CoreMatchers.is;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FormatBundleTest {
+  @Test
+  public void testFileSystemConfs() throws Exception {
+    Configuration fsConf = new Configuration(false);
+    fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///tmp/foo");
+    fsConf.set("foo", "bar");
+    fsConf.set("fs.fake.impl", "FakeFileSystem");
+    fsConf.set("dfs.overridden", "fsValue");
+    fsConf.set("dfs.extraOverridden", "fsExtra");
+    fsConf.set(DFSConfigKeys.DFS_NAMESERVICES, "fs-cluster");
+
+    FileSystem fs = FileSystem.newInstance(fsConf);
+
+    FormatBundle<TextInputFormat> formatBundle = new FormatBundle<>(TextInputFormat.class);
+    formatBundle.setFileSystem(fs);
+    formatBundle.set("dfs.extraOverridden", "extraExtra");
+
+    Configuration conf = new Configuration();
+    conf.set(DFSConfigKeys.DFS_NAMESERVICES, "pipeline-cluster");
+    conf.set("dfs.overridden", "pipelineValue");
+    formatBundle.configure(conf);
+
+    // should be filtered by blacklist
+    Assert.assertFalse(conf.get(FileSystem.FS_DEFAULT_NAME_KEY).equals("hdfs://my-hdfs"));
+
+    // shouldn't be on whitelist
+    Assert.assertFalse(conf.get("foo") != null);
+
+    // should get through both blacklist and whitelist
+    Assert.assertEquals("FakeFileSystem", conf.get("fs.fake.impl"));
+
+    // should use value from fsConf
+    Assert.assertEquals("fsValue", conf.get("dfs.overridden"));
+
+    // should use value from 'extraConf'
+    Assert.assertEquals("extraExtra", conf.get("dfs.extraOverridden"));
+
+    // dfs.nameservices should be merged
+    Assert.assertArrayEquals(new String [] {"pipeline-cluster", "fs-cluster"},
+        conf.getStrings(DFSConfigKeys.DFS_NAMESERVICES));
+  }
+  @Test
+  public void testRedactedFileSystemConfs() throws Exception {
+    Configuration fsConf = new Configuration(false);
+    fsConf.set("fs.s3a.access.key", "accessKey");
+    fsConf.set("fs.s3a.secret.key", "secretKey");
+    fsConf.set("fs.fake.impl", "FakeFileSystem");
+    FileSystem fs = FileSystem.newInstance(fsConf);
+
+    FormatBundle<TextInputFormat> formatBundle = new FormatBundle<>(TextInputFormat.class);
+    formatBundle.setFileSystem(fs);
+
+    Configuration conf = new Configuration();
+    conf.set("mapreduce.job.redacted-properties", "fs.s3a.access.key,fs.s3a.secret.key");
+
+    final FormatBundleTestAppender appender = new FormatBundleTestAppender();
+    final Logger logger = Logger.getRootLogger();
+    logger.addAppender(appender);
+    try {
+      Logger.getLogger(FormatBundleTest.class);
+      formatBundle.configure(conf);
+    } finally {
+      logger.removeAppender(appender);
+    }
+
+    final List<LoggingEvent> log = appender.getLog();
+
+    // redacted value: accesskey
+    Assert.assertThat(log.get(0).getMessage().toString(),
+        is("Applied fs.s3a.access.key=*********(redacted) from FS 'file:///'"));
+
+    // fake non redacted value: fs.fake.impl
+    Assert.assertThat(log.get(1).getMessage().toString(),
+        is("Applied fs.fake.impl=FakeFileSystem from FS 'file:///'"));
+
+    // redacted value: secretKey
+    Assert.assertThat(log.get(2).getMessage().toString(),
+        is("Applied fs.s3a.secret.key=*********(redacted) from FS 'file:///'"));
+  }
+
+
+  class FormatBundleTestAppender extends AppenderSkeleton {
+
+    private final List<LoggingEvent> log = new ArrayList<>();
+
+    @Override
+    public boolean requiresLayout() {
+      return false;
+    }
+
+    @Override
+    protected void append(final LoggingEvent loggingEvent) {
+      log.add(loggingEvent);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    public List<LoggingEvent> getLog() {
+      return new ArrayList<>(log);
+    }
+  }
+
+}
\ No newline at end of file


Mime
View raw message