metron-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <>
Subject [jira] [Commented] (METRON-322) Global Batching and Flushing
Date Thu, 23 Mar 2017 20:56:41 GMT


ASF GitHub Bot commented on METRON-322:

Github user mattf-horton commented on a diff in the pull request:
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/
    @@ -92,22 +177,51 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector
           configurationTransformation = x -> x;
         try {
    -      bulkMessageWriter.init(stormConf
    -                            , configurationTransformation.apply(new IndexingWriterConfiguration(bulkMessageWriter.getName(),
    -                            );
    +      WriterConfiguration writerconf = configurationTransformation.apply(
    +              new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()));
    +      if (defaultBatchTimeout == 0) {
    +        //This means getComponentConfiguration was never called to initialize defaultBatchTimeout,
    +        //probably because we are in a unit test scenario.  So calculate it here.
    +        BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts,
    +        defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
    +      }
    +      writerComponent.setDefaultBatchTimeout(defaultBatchTimeout);
    +      bulkMessageWriter.init(stormConf, writerconf);
         } catch (Exception e) {
           throw new RuntimeException(e);
    +  /**
    +   * Used only for unit testing.
    +   */
    +  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector,
Clock clock) {
    +    prepare(stormConf, context, collector);
    +    writerComponent.withClock(clock);
    +  }
       public void execute(Tuple tuple) {
    -    JSONObject message = (JSONObject) messageGetStrategy.get(tuple);
    -    String sensorType = MessageUtils.getSensorType(message);
    -      WriterConfiguration writerConfiguration = configurationTransformation.apply(new
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()));
    +      if (isTick(tuple)) {
    +        if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
    +          //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick.
    +          LOG.debug("Flushing message queues older than their batchTimeouts");
    +          writerComponent.flushTimeouts(bulkMessageWriter, configurationTransformation.apply(
    +                  new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
    +                  , messageGetStrategy);
    +        }
    +        collector.ack(tuple);
    --- End diff --
    Good idea, will change to separate "try" wrappers around the tick tuple vs message tuple

> Global Batching and Flushing
> ----------------------------
>                 Key: METRON-322
>                 URL:
>             Project: Metron
>          Issue Type: Improvement
>            Reporter: Ajay Yadav
>            Assignee: Matt Foley
> All Writers and other bolts that maintain an internal "batch" queue, need to have a timeout
flush, to prevent messages from low-volume telemetries from sitting in their queues indefinitely.
 Storm has a timeout value (topology.message.timeout.secs) that prevents it from waiting for
too long. If the Writer does not process the queue before the timeout, then Storm recycles
the tuples through the topology. This has multiple undesirable consequences, including data
duplication and waste of compute resources. We would like to be able to specify an interval
after which the queues would flush, even if the batch size is not met.
> We will utilize the Storm Tick Tuple to trigger timeout flushing, following the recommendations
of the article at 
> Since every Writer processes its queue somewhat differently, every bolt that has a "batchSize"
parameter will be given a "batchTimeout" parameter too.  It will default to 1/2 the value
of "topology.message.timeout.secs", as recommended, and will ignore settings larger than the
default, which could cause failure to flush in time.  In the Enrichment topology, where two
Writers may be placed one after the other (enrichment and threat intel), the default timeout
interval will be 1/4 the value of "topology.message.timeout.secs".  The default value of "topology.message.timeout.secs"
in Storm is 30 seconds.
> In addition, Storm provides a limit on the number of pending messages that have not been
acked. If more than "topology.max.spout.pending" messages are waiting in a topology, then
Storm will recycle them through the topology. However, the default value of "topology.max.spout.pending"
is null, and if set to non-null value, the user can manage the consequences by setting batchSize
limits appropriately.  Having the timeout flush will also ameliorate this issue.  So we do
not need to address "topology.max.spout.pending" directly in this task.

This message was sent by Atlassian JIRA

View raw message