metron-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (METRON-1460) Create a complementary non-split-join enrichment topology
Date Mon, 05 Mar 2018 23:40:00 GMT

    [ https://issues.apache.org/jira/browse/METRON-1460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386974#comment-16386974
] 

ASF GitHub Bot commented on METRON-1460:
----------------------------------------

Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/940#discussion_r172353404
  
    --- Diff: metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
---
    @@ -0,0 +1,415 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.metron.enrichment.bolt;
    +
    +import org.apache.metron.common.Constants;
    +import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
    +import org.apache.metron.common.configuration.ConfigurationType;
    +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
    +import org.apache.metron.common.error.MetronError;
    +import org.apache.metron.common.performance.PerformanceLogger;
    +import org.apache.metron.common.utils.ErrorUtils;
    +import org.apache.metron.common.utils.MessageUtils;
    +import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
    +import org.apache.metron.enrichment.configuration.Enrichment;
    +import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
    +import org.apache.metron.enrichment.parallel.EnrichmentContext;
    +import org.apache.metron.enrichment.parallel.EnrichmentStrategies;
    +import org.apache.metron.enrichment.parallel.ParallelEnricher;
    +import org.apache.metron.enrichment.parallel.WorkerPoolStrategy;
    +import org.apache.metron.stellar.dsl.Context;
    +import org.apache.metron.stellar.dsl.StellarFunction;
    +import org.apache.metron.stellar.dsl.StellarFunctions;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.tuple.Values;
    +import org.json.simple.JSONObject;
    +import org.json.simple.parser.JSONParser;
    +import org.json.simple.parser.ParseException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.UnsupportedEncodingException;
    +import java.lang.invoke.MethodHandles;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +
    +/**
    + * This bolt is a unified enrichment/threat intel bolt.  In contrast to the split/enrich/join
    + * bolts above, this handles the entire enrichment lifecycle in one bolt using a threadpool
to
    + * enrich in parallel.
    + *
    + * From an architectural perspective, this is a divergence from the polymorphism based
strategy we have
    + * used in the split/join bolts.  Rather, this bolt is provided a strategy to use, either
enrichment or threat intel,
    + * through composition.  This allows us to move most of the implementation into components
independent
    + * from Storm.  This will greater facilitate reuse.
    + */
    +public class UnifiedEnrichmentBolt extends ConfiguredEnrichmentBolt {
    +
    +  public static class Perf {} // used for performance logging
    +  private PerformanceLogger perfLog; // not static bc multiple bolts may exist in same
worker
    +
    +  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    +
    +  public static final String STELLAR_CONTEXT_CONF = "stellarContext";
    +
    +  /**
    +   * The number of threads in the threadpool.  One threadpool is created per process.
    +   * This is a topology-level configuration
    +   */
    +  public static final String THREADPOOL_NUM_THREADS_TOPOLOGY_CONF = "metron.threadpool.size";
    +  /**
    +   * The type of threadpool to create. This is a topology-level configuration.
    +   */
    +  public static final String THREADPOOL_TYPE_TOPOLOGY_CONF = "metron.threadpool.type";
    +
    +  /**
    +   * The enricher implementation to use.  This will do the parallel enrichment via a
thread pool.
    +   */
    +  protected ParallelEnricher enricher;
    +
    +  /**
    +   * The strategy to use for this enrichment bolt.  Practically speaking this is either
    +   * enrichment or threat intel.  It is configured in the topology itself.
    +   */
    +  protected EnrichmentStrategies strategy;
    +  private JSONParser parser;
    +  protected OutputCollector collector;
    +  private Context stellarContext;
    +  /**
    +   * An enrichment type to adapter map.  This is configured externally.
    +   */
    +  protected Map<String, EnrichmentAdapter<CacheKey>> enrichmentsByType =
new HashMap<>();
    +
    +  /**
    +   * The total number of elements in a LRU cache.  This cache is used for the enrichments;
if an
    +   * element is in the cache, then the result is returned instead of computed.
    +   */
    +  protected Long maxCacheSize;
    +  /**
    +   * The total amount of time in minutes since write to keep an element in the cache.
    +   */
    +  protected Long maxTimeRetain;
    +  /**
    +   * If the bolt is reloaded, invalidate the cache?
    +   */
    +  protected boolean invalidateCacheOnReload = false;
    +
    +  /**
    +   * The message field to use.  If this is set, then this indicates the field to use
to retrieve the message object.
    +   * IF this is unset, then we presume that the message is coming in as a string version
of a JSON blob on the first
    +   * element of the tuple.
    +   */
    +  protected String messageFieldName;
    +  protected EnrichmentContext enrichmentContext;
    +  protected boolean captureCacheStats = true;
    +
    +  public UnifiedEnrichmentBolt(String zookeeperUrl) {
    +    super(zookeeperUrl);
    +  }
    +
    +  /**
    +   * Specify the enrichments to support.
    +   * @param enrichments enrichment
    +   * @return Instance of this class
    +   */
    +  public UnifiedEnrichmentBolt withEnrichments(List<Enrichment> enrichments) {
    +    for(Enrichment e : enrichments) {
    +      enrichmentsByType.put(e.getType(), e.getAdapter());
    +    }
    +    return this;
    +  }
    +
    +  public UnifiedEnrichmentBolt withCaptureCacheStats(boolean captureCacheStats) {
    +    this.captureCacheStats = captureCacheStats;
    +    return this;
    +  }
    +
    +  /**
    +   * Figure out how many threads to use in the thread pool.  The user can pass an arbitrary
object, so parse it
    +   * according to some rules.  If it's a number, then cast to an int.  IF it's a string
and ends with "C", then strip
    +   * the C and treat it as an integral multiple of the number of cores.  If it's a string
and does not end with a C, then treat
    +   * it as a number in string form.
    +   * @param numThreads
    +   * @return
    +   */
    +  private static int getNumThreads(Object numThreads) {
    +    if(numThreads instanceof Number) {
    +      return ((Number)numThreads).intValue();
    +    }
    +    else if(numThreads instanceof String) {
    +      String numThreadsStr = ((String)numThreads).trim().toUpperCase();
    +      if(numThreadsStr.endsWith("C")) {
    +        Integer factor = Integer.parseInt(numThreadsStr.replace("C", ""));
    +        return factor*Runtime.getRuntime().availableProcessors();
    +      }
    +      else {
    +        return Integer.parseInt(numThreadsStr);
    +      }
    +    }
    +    return 2*Runtime.getRuntime().availableProcessors();
    +  }
    +
    +  /**
    +   * The strategy to use.  This indicates which part of the config that this bolt uses
    +   * to enrich, threat intel or enrichment.  This must conform to one of the EnrichmentStrategies
    +   * enum.
    +   * @param strategy
    +   * @return
    +   */
    +  public UnifiedEnrichmentBolt withStrategy(String strategy) {
    +    this.strategy = EnrichmentStrategies.valueOf(strategy);
    +    return this;
    +  }
    +
    +  /**
    +   * @param maxCacheSize Maximum size of cache before flushing
    +   * @return Instance of this class
    +   */
    +  public UnifiedEnrichmentBolt withMaxCacheSize(long maxCacheSize) {
    +    this.maxCacheSize = maxCacheSize;
    +    return this;
    +  }
    +
    +  /**
    +   * @param maxTimeRetain Maximum time to retain cached entry before expiring
    +   * @return Instance of this class
    +   */
    +
    +  public UnifiedEnrichmentBolt withMaxTimeRetain(long maxTimeRetain) {
    +    this.maxTimeRetain = maxTimeRetain;
    +    return this;
    +  }
    +
    +  /**
    +   * Invalidate the cache on reload of bolt.  By default, we do not.
    +   * @param cacheInvalidationOnReload
    +   * @return
    +   */
    +  public UnifiedEnrichmentBolt withCacheInvalidationOnReload(boolean cacheInvalidationOnReload)
{
    +    this.invalidateCacheOnReload= cacheInvalidationOnReload;
    +    return this;
    +  }
    +
    +
    +  @Override
    +  public void reloadCallback(String name, ConfigurationType type) {
    +    if(invalidateCacheOnReload) {
    +      if(strategy.getCache() != null) {
    +        strategy.getCache().invalidateAll();
    +      }
    +    }
    +    if(type == ConfigurationType.GLOBAL && enrichmentsByType != null) {
    +      for(EnrichmentAdapter adapter : enrichmentsByType.values()) {
    +        adapter.updateAdapter(getConfigurations().getGlobalConfig());
    +      }
    +    }
    +  }
    +
    +
    +  /**
    +   * Fully enrich a message based on the strategy which was used to configure the bolt.
    +   * Each enrichment is done in parallel and the results are joined together.  Each enrichment
    +   * will use a cache so computation is avoided if the result has been computed before.
    +   *
    +   * Errors in the enrichment result in an error message being sent on the "error" stream.
    +   * The successful enrichments will be joined with the original message and the message
will
    +   * be sent along the "message" stream.
    +   *
    +   * @param input The input tuple to be processed.
    +   */
    +  @Override
    +  public void execute(Tuple input) {
    +    JSONObject message = generateMessage(input);
    +    try {
    +      String sourceType = MessageUtils.getSensorType(message);
    +      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
    +      if(config == null) {
    +        LOG.debug("Unable to find SensorEnrichmentConfig for sourceType: {}", sourceType);
    +        config = new SensorEnrichmentConfig();
    +      }
    +      //This is an existing kludge for the stellar adapter to pass information along.
    +      //We should figure out if this can be rearchitected a bit.  This smells.
    +      config.getConfiguration().putIfAbsent(STELLAR_CONTEXT_CONF, stellarContext);
    +      String guid = getGUID(input, message);
    +
    +      // enrich the message
    +      ParallelEnricher.EnrichmentResult result = enricher.apply(message, strategy, config,
perfLog);
    +      JSONObject enriched = result.getResult();
    +      enriched = strategy.postProcess(enriched, config, enrichmentContext);
    +
    +      //we can emit the message now
    +      collector.emit("message",
    +              input,
    +              new Values(guid, enriched));
    +      //and handle each of the errors in turn.  If any adapter errored out, we will have
one message per.
    +      for(Map.Entry<Object, Throwable> t : result.getEnrichmentErrors()) {
    +        LOG.error("[Metron] Unable to enrich message: {}", message, t);
    +        MetronError error = new MetronError()
    +                .withErrorType(strategy.getErrorType())
    +                .withMessage(t.getValue().getMessage())
    +                .withThrowable(t.getValue())
    +                .addRawMessage(t.getKey());
    +        ErrorUtils.handleError(collector, error);
    +      }
    +    } catch (Exception e) {
    +      //If something terrible and unexpected happens then we want to send an error along,
but this
    +      //really shouldn't be happening.
    +      LOG.error("[Metron] Unable to enrich message: {}", message, e);
    +      MetronError error = new MetronError()
    +              .withErrorType(strategy.getErrorType())
    +              .withMessage(e.getMessage())
    +              .withThrowable(e)
    +              .addRawMessage(message);
    +      ErrorUtils.handleError(collector, error);
    +    }
    +    finally {
    +      collector.ack(input);
    +    }
    +  }
    +
    +  /**
    +   * The message field name.  If this is set, then use this field to retrieve the message.
    +   * @param messageFieldName
    +   * @return
    +   */
    +  public UnifiedEnrichmentBolt withMessageFieldName(String messageFieldName) {
    +    this.messageFieldName = messageFieldName;
    +    return this;
    +  }
    +
    +  /**
    +   * Take the tuple and construct the message.
    +   * @param tuple
    +   * @return
    +   */
    +  public JSONObject generateMessage(Tuple tuple) {
    --- End diff --
    
    Isn't this what the `MessageGetStrategy` was intended to solve?  Can we reuse that logic
here instead?


> Create a complementary non-split-join enrichment topology
> ---------------------------------------------------------
>
>                 Key: METRON-1460
>                 URL: https://issues.apache.org/jira/browse/METRON-1460
>             Project: Metron
>          Issue Type: New Feature
>            Reporter: Casey Stella
>            Priority: Major
>
> There are some deficiencies to the split/join topology.
>  * It's hard to reason about
>  * Understanding the latency of enriching a message requires looking at multiple bolts
that each give summary statistics
>  * The join bolt's cache is really hard to reason about when performance tuning
>  * During spikes in traffic, you can overload the join bolt's cache and drop messages
if you aren't careful
>  * In general, it's hard to associate a cache size and a duration kept in cache with
throughput and latency
>  * There are a lot of network hops per message
>  * Right now we are stuck at 2 stages of transformations being done (enrichment and threat
intel).  It's very possible that you might want stellar enrichments to depend on the output
of other stellar enrichments.  In order to implement this in split/join you'd have to create
a cycle in the storm topology
>  
> I propose that we move to a model where we do enrichments in a single bolt in parallel
using a static threadpool (e.g. multiple workers in the same process would share the threadpool). 
IN all other ways, this would be backwards compatible.  A transparent drop-in for the existing
enrichment topology.
> There are some pros/cons about this too:
>  * Pro
>  * Easier to reason about from an individual message perspective
>  * Architecturally decoupled from Storm
>  * This sets us up if we want to consider other streaming technologies
>  * Fewer bolts
>  * spout -> enrichment bolt -> threatintel bolt -> output bolt
>  * Way fewer network hops per message
>  * currently 2n+1 where n is the number of enrichments used (if using stellar subgroups,
each subgroup is a hop)
>  * Easier to reason about from a performance perspective
>  * We trade cache size and eviction timeout for threadpool size
>  * We set ourselves up to have stellar subgroups with dependencies
>  * i.e. stellar subgroups that depend on the output of other subgroups
>  * If we do this, we can shrink the topology to just spout -> enrichment/threat intel
-> output
>  * Con
>  * We can no longer tune stellar enrichments independent from HBase enrichments
>  * To be fair, with enrichments moving to stellar, this is the case in the split/join
approach too
>  * No idea about performance
> What I propose is to submit a PR that will deliver an alternative, completely backwards
compatible topology for enrichment that you can use by adjusting the start_enrichment_topology.sh
script to use remote-unified.yaml instead of remote.yaml.  If we live with it for a while
and have some good experiences with it, maybe we can consider retiring the old enrichment
topology.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message