beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-3702) Support system properties source for pipeline options
Date Mon, 03 Sep 2018 18:14:01 GMT

     [ https://issues.apache.org/jira/browse/BEAM-3702?focusedWorklogId=140691&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140691
]

ASF GitHub Bot logged work on BEAM-3702:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Sep/18 18:14
            Start Date: 03/Sep/18 18:14
    Worklog Time Spent: 10m 
      Work Description: stale[bot] closed pull request #4683: [BEAM-3702] adding fromJvm to
create pipelineoptions from the system properties
URL: https://github.com/apache/beam/pull/4683
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index 7630fb3e167..3e724c04b18 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -20,12 +20,16 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Locale.ROOT;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
@@ -38,9 +42,12 @@
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.RowSortedTable;
 import com.google.common.collect.Sets;
@@ -63,11 +70,13 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Properties;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.SortedMap;
@@ -75,9 +84,10 @@
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
-import javax.annotation.Nonnull;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.Validation.Required;
@@ -174,6 +184,13 @@ public static Builder fromArgs(String... args) {
     return new Builder().fromArgs(args);
   }
 
+  /**
+   * @return a builder instance enabling you to build pipeline options.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
   /**
    * After creation we will validate that {@code <T>} conforms to all the
    * validation criteria. See
@@ -187,20 +204,20 @@ public Builder withValidation() {
   /** A fluent {@link PipelineOptions} builder. */
   public static class Builder {
     private final String defaultAppName;
-    private final String[] args;
+    private final Collection<Function<Builder, ListMultimap<String, String>>>
options;
     private final boolean validation;
     private final boolean strictParsing;
     private final boolean isCli;
 
     // Do not allow direct instantiation
     private Builder() {
-      this(null, false, true, false);
+      this(new ArrayList<>(), false, true, false);
     }
 
-    private Builder(String[] args, boolean validation,
-        boolean strictParsing, boolean isCli) {
+    private Builder(Collection<Function<Builder, ListMultimap<String, String>>>
options,
+                    boolean validation, boolean strictParsing, boolean isCli) {
       this.defaultAppName = findCallersClassName();
-      this.args = args;
+      this.options = options;
       this.validation = validation;
       this.strictParsing = strictParsing;
       this.isCli = isCli;
@@ -242,10 +259,36 @@ private Builder(String[] args, boolean validation,
      * {@code --help=PipelineOptionsClassName} will print out detailed usage information
about the
      * specifically requested PipelineOptions by invoking
      * {@link PipelineOptionsFactory#printHelp(PrintStream, Class)}.
+     *
+     * <p>Important point to note is that since the release 2.5.0, calling twice this
method will
+     * <b>append</b> the arguments instead of replacing it.
      */
     public Builder fromArgs(String... args) {
       checkNotNull(args, "Arguments should not be null.");
-      return new Builder(args, validation, strictParsing, true);
+      options.add(current -> parseCommandLine(args, current.strictParsing));
+      return new Builder(options, validation, strictParsing, true);
+    }
+
+    /**
+     * <p>A key/value set of options. This reuses the same logic than
+     * {@link PipelineOptionsFactory#fromArgs(String...)} but using
+     * the configuration parameter as input. Keys don't have <pre>--</pre>
+     * as a prefix in this case.</p>
+     *
+     * <p>Simple list style properties are able to be bound to {@code boolean[]}, {@code
char[]},
+     * {@code short[]}, {@code int[]}, {@code long[]}, {@code float[]}, {@code double[]},
+     * {@code Class[]}, enum arrays, {@code String[]}, and {@code List<String>}.
+     *
+     * <p>For more information about the argument handling please see
+     * {@link Builder#fromArgs(String...)}.
+     *
+     * @param configuration the options to convert to PipelineOptions.
+     * @return a new builder for pipeline options.
+     */
+    public Builder fromMap(final Map<String, String> configuration) {
+      checkNotNull(configuration, "Arguments should not be null.");
+      options.add(builder -> LinkedListMultimap.create(Multimaps.forMap(configuration)));
+      return new Builder(options, validation, strictParsing, true);
     }
 
     /**
@@ -255,7 +298,7 @@ public Builder fromArgs(String... args) {
      * validation.
      */
     public Builder withValidation() {
-      return new Builder(args, true, strictParsing, isCli);
+      return new Builder(options, true, strictParsing, isCli);
     }
 
     /**
@@ -263,7 +306,7 @@ public Builder withValidation() {
      * arguments.
      */
     public Builder withoutStrictParsing() {
-      return new Builder(args, validation, false, isCli);
+      return new Builder(options, validation, false, isCli);
     }
 
     /**
@@ -289,12 +332,13 @@ public PipelineOptions create() {
     public <T extends PipelineOptions> T as(Class<T> klass) {
       Map<String, Object> initialOptions = Maps.newHashMap();
 
-      // Attempt to parse the arguments into the set of initial options to use
-      if (args != null) {
-        ListMultimap<String, String> options = parseCommandLine(args, strictParsing);
+      if (!options.isEmpty()) {
         LOG.debug("Provided Arguments: {}", options);
-        printHelpUsageAndExitIfNeeded(options, System.out, true /* exit */);
-        initialOptions = parseObjects(klass, options, strictParsing);
+        final ListMultimap<String, String> optionsInstance = options.stream()
+          .map(fn -> fn.apply(this))
+          .collect(LinkedListMultimap::create, Multimap::putAll, Multimap::putAll);
+        printHelpUsageAndExitIfNeeded(optionsInstance, System.out, true /* exit */);
+        initialOptions = parseObjects(klass, optionsInstance, strictParsing);
       }
 
       // Create our proxy
@@ -333,7 +377,7 @@ public PipelineOptions create() {
    * {@code printStream} and {@code exit} used for testing.
    */
   @SuppressWarnings("unchecked")
-  static boolean printHelpUsageAndExitIfNeeded(ListMultimap<String, String> options,
+  static boolean printHelpUsageAndExitIfNeeded(Multimap<String, String> options,
       PrintStream printStream, boolean exit) {
     if (options.containsKey("help")) {
       final String helpOption = Iterables.getOnlyElement(options.get("help"));
@@ -903,10 +947,9 @@ private static void validateReturnType(Class<? extends PipelineOptions>
iface) {
     List<MultipleDefinitions> multipleDefinitions = Lists.newArrayList();
     for (Map.Entry<Method, Collection<Method>> entry
         : methodNameToMethodMap.asMap().entrySet()) {
-      Set<Class<?>> returnTypes = FluentIterable.from(entry.getValue())
-          .transform(ReturnTypeFetchingFunction.INSTANCE).toSet();
-      SortedSet<Method> collidingMethods = FluentIterable.from(entry.getValue())
-          .toSortedSet(MethodComparator.INSTANCE);
+      Set<Class<?>> returnTypes = entry.getValue().stream()
+        .map(ReturnTypeFetchingFunction.INSTANCE::apply).collect(toSet());
+      SortedSet<Method> collidingMethods = SortedSet.class.cast(entry.getValue());
       if (returnTypes.size() > 1) {
         MultipleDefinitions defs = new MultipleDefinitions();
         defs.method = entry.getKey();
@@ -972,52 +1015,34 @@ private static void validateGettersHaveConsistentAnnotation(
       SortedSet<Method> getters = methodNameToAllMethodMap.get(descriptor.getReadMethod());
       SortedSet<Method> gettersWithTheAnnotation =
           Sets.filter(getters, annotationPredicates.forMethod);
-      Set<Annotation> distinctAnnotations = Sets.newLinkedHashSet(FluentIterable
-          .from(gettersWithTheAnnotation)
-          .transformAndConcat(new Function<Method, Iterable<? extends Annotation>>()
{
-            @Nonnull
-            @Override
-            public Iterable<? extends Annotation> apply(@Nonnull Method method) {
-              return FluentIterable.from(method.getAnnotations());
-            }
-          })
-          .filter(annotationPredicates.forAnnotation));
+      Set<Annotation> distinctAnnotations = gettersWithTheAnnotation.stream()
+        .flatMap(m -> Stream.of(m.getAnnotations()))
+        .filter(annotationPredicates.forAnnotation::apply)
+        .collect(toSet());
 
 
       if (distinctAnnotations.size() > 1) {
         throw new IllegalArgumentException(String.format(
             "Property [%s] is marked with contradictory annotations. Found [%s].",
             descriptor.getName(),
-            FluentIterable.from(gettersWithTheAnnotation)
-                .transformAndConcat(new Function<Method, Iterable<String>>()
{
-                  @Nonnull
-                  @Override
-                  public Iterable<String> apply(final @Nonnull Method method) {
-                    return FluentIterable.from(method.getAnnotations())
-                        .filter(annotationPredicates.forAnnotation)
-                        .transform(new Function<Annotation, String>() {
-                          @Nonnull
-                          @Override
-                          public String apply(@Nonnull Annotation annotation) {
-                            return String.format(
-                                "[%s on %s]",
-                                ReflectHelpers.ANNOTATION_FORMATTER.apply(annotation),
-                                ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(method));
-                          }
-                        });
-
-                  }
-                })
-                .join(Joiner.on(", "))));
+            gettersWithTheAnnotation.stream()
+              .flatMap(method -> Stream.of(method.getAnnotations())
+              .filter(annotationPredicates.forAnnotation::apply)
+              .map(annotation -> String.format(
+                      "[%s on %s]",
+                      ReflectHelpers.ANNOTATION_FORMATTER.apply(annotation),
+                      ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(method))))
+              .collect(joining(", "))));
       }
 
-      Iterable<String> getterClassNames = FluentIterable.from(getters)
-          .transform(MethodToDeclaringClassFunction.INSTANCE)
-          .transform(ReflectHelpers.CLASS_NAME);
-      Iterable<String> gettersWithTheAnnotationClassNames =
-          FluentIterable.from(gettersWithTheAnnotation)
-          .transform(MethodToDeclaringClassFunction.INSTANCE)
-          .transform(ReflectHelpers.CLASS_NAME);
+      Iterable<String> getterClassNames = getters.stream()
+        .map(MethodToDeclaringClassFunction.INSTANCE::apply)
+        .map(ReflectHelpers.CLASS_NAME::apply)
+        .collect(toList());
+      Iterable<String> gettersWithTheAnnotationClassNames = gettersWithTheAnnotation.stream()
+        .map(MethodToDeclaringClassFunction.INSTANCE::apply)
+        .map(ReflectHelpers.CLASS_NAME::apply)
+        .collect(toList());
 
       if (!(gettersWithTheAnnotation.isEmpty()
             || getters.size() == gettersWithTheAnnotation.size())) {
@@ -1049,10 +1074,10 @@ private static void validateSettersDoNotHaveAnnotation(
           methodNameToAllMethodMap.get(descriptor.getWriteMethod()),
           annotationPredicates.forMethod);
 
-      Iterable<String> settersWithTheAnnotationClassNames =
-          FluentIterable.from(settersWithTheAnnotation)
-          .transform(MethodToDeclaringClassFunction.INSTANCE)
-          .transform(ReflectHelpers.CLASS_NAME);
+      Iterable<String> settersWithTheAnnotationClassNames = settersWithTheAnnotation.stream()
+          .map(MethodToDeclaringClassFunction.INSTANCE::apply)
+          .map(ReflectHelpers.CLASS_NAME::apply)
+          .collect(toList());
 
       if (!settersWithTheAnnotation.isEmpty()) {
         AnnotatedSetter annotated = new AnnotatedSetter();
@@ -1797,4 +1822,112 @@ private synchronized void register(Class<? extends PipelineOptions>
iface) {
       return combinedCache.get(interfaces).getPropertyDescriptors();
     }
   }
+
+  /**
+   * For more information please see {@link PipelineOptionsFactory#fromMap(Map)}.
+   *
+   * @param prefix a prefix filter on the map keys.
+   * @param options the options to convert to a pipeline options.
+   * @return a pipeline options instance based on the specified configuration.
+   */
+  @Experimental
+  public static PipelineOptions fromMap(final String prefix, final Map<String, String>
options) {
+    checkArgument(prefix != null, "prefix should not be null.");
+    checkArgument(options != null, "options should not be null.");
+    final Map<String, String> filtered = options.entrySet()
+                                        .stream()
+                                        .filter(e -> e.getKey()
+                                                      .startsWith(prefix))
+                                        .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
+    return fromMap(filtered);
+  }
+
+  /**
+   * For more information please see {@link PipelineOptionsFactory.Builder#fromMap(Map)}.
+   *
+   * @param options the options to convert to a pipeline options.
+   * @return a pipeline options instance based on the specified configuration.
+   */
+  public static PipelineOptions fromMap(final Map<String, String> options) {
+    checkArgument(options != null, "options should not be null.");
+    return new Builder().fromMap(options).create();
+  }
+
+  /**
+   * <p>For more information please see {@link PipelineOptionsFactory#fromMap(Map)}.</p>
+   *
+   * <p>This method uses {@link Properties#stringPropertyNames()} to ensure to filter
+   * only {@link String} key/values.</p>
+   *
+   * @param properties the properties to use to create the pipeline options instance.
+   * @return a pipeline options instance based on the specified properties.
+   */
+  @Experimental
+  public static PipelineOptions fromProperties(final Properties properties) {
+    checkArgument(properties != null, "properties should not be null.");
+    return fromMap(properties.stringPropertyNames()
+                    .stream()
+                    .collect(toMap(identity(), properties::getProperty)));
+  }
+
+  /**
+   * For more information please see {@link PipelineOptionsFactory#fromMap(String, Map)}
+   * and {@link PipelineOptionsFactory#fromProperties(Properties)}.
+   *
+   * @param prefix the prefix filter applied on system properties keys.
+   * @param properties the properties to use to create the pipeline options instance.
+   * @return a pipeline options instance based on the specified properties
+   * filtered with the specified prefix.
+   */
+  @Experimental
+  public static PipelineOptions fromProperties(final String prefix, final Properties properties)
{
+    checkArgument(prefix != null, "prefix should not be null.");
+    checkArgument(properties != null, "properties should not be null.");
+    final Properties instantiationProperties = properties.stringPropertyNames()
+      .stream()
+      .filter(k -> k.startsWith(prefix))
+      .collect(
+        Properties::new,
+        (p, k) -> p.setProperty(k.substring(prefix.length()), properties.getProperty(k)),
+        Hashtable::putAll);
+    return fromProperties(instantiationProperties);
+  }
+
+  /**
+   * <p>For more information please see
+   * {@link PipelineOptionsFactory#fromProperties(String, Properties)}.</p>
+   *
+   * <p>The intent of this method is to let you extract from system properties
+   * key/values to build {@link PipelineOptions} very quickly without duplicating
+   * the mapping each time you need to do it yourself.</p>
+   *
+   * <p>If you don't develop a library or have a special case,
+   * it is recommended to use {@link PipelineOptionsFactory#fromSystemProperties()}
+   * as a default factory.</p>
+   *
+   * @param prefix the not null prefix filter applied on system properties keys.
+   * @return a pipeline options instance based on system properties key/values
+   * and filtered with a custom prefix.
+   */
+  @Experimental
+  public static PipelineOptions fromSystemProperties(final String prefix) {
+    checkArgument(prefix != null, "prefix should not be null.");
+    return fromProperties(prefix, System.getProperties());
+  }
+
+  /**
+   * <p>For more information please see
+   * {@link PipelineOptionsFactory#fromSystemProperties(String)}.</p>
+   *
+   * <p>This method will filter system properties based on the conventional prefix
+   * <pre>beam.</pre> to create a {@link PipelineOptions} instance from the filtered
+   * key/values.
+   *
+   * @return a pipeline options instance based on system properties
+   * filtered based on <code>beam.</code> prefix.
+   */
+  @Experimental
+  public static PipelineOptions fromSystemProperties() {
+    return fromSystemProperties("beam.");
+  }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index be92f2ac4d0..7560d7e00d7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.options;
 
+import static java.util.Collections.singletonList;
 import static java.util.Locale.ROOT;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -55,6 +56,7 @@
 import java.io.PrintStream;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
@@ -1797,6 +1799,82 @@ public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator
jso
     }
   }
 
+  @Test
+  public void testPipelineOptionsFactoryFromProperties() {
+    assertEquals("testAppName", PipelineOptionsFactory.fromProperties(new Properties() {{
+      put("appName", "testAppName");
+    }}).as(ApplicationNameOptions.class).getAppName());
+  }
+
+  @Test
+  public void testPipelineOptionsFactoryFromSystemProperties() {
+    final String prefix = getClass().getName() + ".testPipelineOptionsFactoryFromSystemProperties.";
+    System.setProperty(prefix + "appName", "testAppName");
+    assertEquals("testAppName",
+      PipelineOptionsFactory.fromSystemProperties(prefix)
+        .as(ApplicationNameOptions.class).getAppName());
+    System.clearProperty(prefix + "appName");
+  }
+
+  @Test
+  public void testPipelineOptionsFactoryFromPropertiesWithPrefix() {
+    PipelineOptionsFactory.register(PipelineOptionsFactoryFromPropertiesTestOptions.class);
+    { // expected case, set and not set
+      final PipelineOptions options =
+        PipelineOptionsFactory.fromProperties("prefix.", new Properties() {{
+          put("prefix.appName", "testAppName");
+          put("prefix.option", "test");
+        }});
+      final ApplicationNameOptions opts = options.as(ApplicationNameOptions.class);
+      assertEquals("testAppName", opts.getAppName());
+      assertNull(opts.getTempLocation());
+      assertEquals(
+        "test",
+        options.as(PipelineOptionsFactoryFromPropertiesTestOptions.class).getOption());
+    }
+    { // non string value - nobody should care about that case except us as a testing coverage
+      final PipelineOptionsFactoryFromPropertiesTestOptions opts =
+              PipelineOptionsFactory.fromProperties("prefix.", new Properties() {{
+                put("prefix.appName", singletonList("test"));
+              }}).as(PipelineOptionsFactoryFromPropertiesTestOptions.class);
+      assertNull(opts.getOption());
+    }
+    { // non string key - nobody should care about that case except us as a testing coverage
+      final PipelineOptionsFactoryFromPropertiesTestOptions opts =
+              PipelineOptionsFactory.fromProperties("prefix.", new Properties() {{
+                put(singletonList("prefix.appName"), "test");
+              }}).as(PipelineOptionsFactoryFromPropertiesTestOptions.class);
+      assertNull(opts.getOption());
+    }
+  }
+
+  /** Used to test properties typing handling. */
+  public interface PipelineOptionsFactoryFromPropertiesTestOptions extends PipelineOptions
{
+    @Description("A test option.")
+    String getOption();
+    void setOption(String b);
+  }
+
+  @Test
+  public void testPipelineOptionsFactoryFromPropertiesWithLongPrefix() {
+    assertEquals(
+      "testAppName",
+      PipelineOptionsFactory.fromProperties("org.apache.beam.", new Properties() {{
+        put("org.apache.beam.appName", "testAppName");
+      }}).as(ApplicationNameOptions.class).getAppName());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testPipelineOptionsFactoryFromNullProperties() {
+    PipelineOptionsFactory.fromProperties(null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testPipelineOptionsFactoryFromPropertiesAndNullPrefix() {
+    PipelineOptionsFactory.fromProperties(null, new Properties() {{
+      put("appName", "testAppName");
+    }}).as(ApplicationNameOptions.class).getAppName();
+  }
   /** Used to test that the thread context class loader is used when creating proxies. */
   public interface ClassLoaderTestOptions extends PipelineOptions {
     @Default.Boolean(true)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 140691)
    Time Spent: 11.5h  (was: 11h 20m)

> Support system properties source for pipeline options
> -----------------------------------------------------
>
>                 Key: BEAM-3702
>                 URL: https://issues.apache.org/jira/browse/BEAM-3702
>             Project: Beam
>          Issue Type: Task
>          Components: sdk-java-core
>            Reporter: Romain Manni-Bucau
>            Assignee: Romain Manni-Bucau
>            Priority: Major
>          Time Spent: 11.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message