metron-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <>
Subject [jira] [Commented] (METRON-322) Global Batching and Flushing
Date Thu, 16 Mar 2017 20:27:41 GMT


ASF GitHub Bot commented on METRON-322:

GitHub user mattf-horton opened a pull request:

    METRON-322 Global Batching and Flushing

    This patch starts by adding "batchTimeout" essentially every place "batchSize" is used.
 Since the configuration-related classes are many-layered, this addition is trivial in many
    The flush-on-timeout logic is fairly straightforward. It was implemented by a refactoring
of `BulkWriterComponent` to extract an explicit "flush" from the "write" logic, and then call
flush from Tick Tuple processing in `BulkMessageWriterBolt`.
    The tricky part was figuring out the appropriate setting for `topology.tick.tuple.freq.secs`
if the administrator configures non-default batchTimeouts. It is necessary to enumerate the
batchTimeout settings for all configured sensorNames, which is implemented in `IndexingConfigurations::getAllConfiguredTimeouts()`.
Then multiple other factors must be taken into account to determine the allowed and recommended
settings, which is implemented in `BatchTimeoutHelper`. If there are better ways to accomplish
these things, please share your ideas.
    For test and review, it may be useful to know that the default value of `batchTimeout`,
and hence for `topology.tick.tuple.freq.secs`, is 14 seconds (which is the default value of
`topology.message.timeout.secs` (30 sec) / 2 - 1).
    To unit test this feature, we added `Clock` logic and implemented a writable `FakeClock`.
 This may be generally useful for testing other time-based behavior as we add such functionality.
    All configuration issues were done in such a way as to default to "good" behavior, with
no changes needed in current configs of previously installed systems.  In fact, recommended
configuration is to leave batchTimeout unconfigured, or set it to zero, thereby accepting
the system calculated default.
    After this patch is reviewed and accepted, similar work needs to be done for the `ParserWriter`,
and possibly other sub-components.  That will be in a separate PR.
    It is possible to split this patch into sections related to each of sub-tasks METRON-516,
METRON-577, METRON-329 and METRON-330.  Let me know if you would like me to do so.  In the
past, people seem to prefer to see a whole feature development in one PR, even though it gets
a little long, so that's the way I'm submitting this one, for now.
    ## Pull Request Checklist
    Thank you for submitting a contribution to Apache Metron (Incubating).  
    Please refer to our [Development Guidelines](
for the complete guide to follow for contributions.  
    Please refer also to our [Build Verification Guidelines](
for complete smoke testing guides.  
    In order to streamline the review of the contribution we ask you follow these guidelines
and ask you to double check the following:
    ### For all changes:
    - [x] Is there a JIRA ticket associated with this PR? If not one needs to be created at
[Metron Jira](

    - [x] Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are
trying to resolve? Pay particular attention to the hyphen "-" character.
    - [x] Has your PR been rebased against the latest commit within the target branch (typically
    ### For code changes:
    - [NA] Have you included steps to reproduce the behavior or problem that is being changed
or addressed?
    - [x] Have you included steps or a guide to how the change may be verified and tested
        Yes, see the two modified .md files, and the above information.
    - [x] Have you ensured that the full suite of tests and checks have been executed in the
root incubating-metron folder via:
      mvn -q clean integration-test install && build_utils/ 
    - [x] Have you written or updated unit tests and or integration tests to verify your changes?
    - [NA] If adding new dependencies to the code, are these dependencies licensed in a way
that is compatible for inclusion under [ASF 2.0](

    - [IN PROCESS] Have you verified the basic functionality of the build by building and
running locally with Vagrant full-dev environment or the equivalent?
    ### For documentation related changes:
    - [x] Have you ensured that format looks appropriate for the output in which it is rendered
by building and verifying the site-book? 

You can merge this pull request into a Git repository by running:

    $ git pull METRON-322

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #481
commit c2a700e78143b4c7a143a2b061ffb210cdb2d36f
Author: mattf-horton <>
Date:   2017-03-07T01:29:13Z

    METRON-322 Global Batching and Flushing
    changes for sub-tasks METRON-516 and METRON-577, METRON-329 and METRON-330.


> Global Batching and Flushing
> ----------------------------
>                 Key: METRON-322
>                 URL:
>             Project: Metron
>          Issue Type: Improvement
>            Reporter: Ajay Yadav
>            Assignee: Matt Foley
> All Writers and other bolts that maintain an internal "batch" queue, need to have a timeout
flush, to prevent messages from low-volume telemetries from sitting in their queues indefinitely.
 Storm has a timeout value (topology.message.timeout.secs) that prevents it from waiting for
too long. If the Writer does not process the queue before the timeout, then Storm recycles
the tuples through the topology. This has multiple undesirable consequences, including data
duplication and waste of compute resources. We would like to be able to specify an interval
after which the queues would flush, even if the batch size is not met.
> We will utilize the Storm Tick Tuple to trigger timeout flushing, following the recommendations
of the article at 
> Since every Writer processes its queue somewhat differently, every bolt that has a "batchSize"
parameter will be given a "batchTimeout" parameter too.  It will default to 1/2 the value
of "topology.message.timeout.secs", as recommended, and will ignore settings larger than the
default, which could cause failure to flush in time.  In the Enrichment topology, where two
Writers may be placed one after the other (enrichment and threat intel), the default timeout
interval will be 1/4 the value of "topology.message.timeout.secs".  The default value of "topology.message.timeout.secs"
in Storm is 30 seconds.
> In addition, Storm provides a limit on the number of pending messages that have not been
acked. If more than "topology.max.spout.pending" messages are waiting in a topology, then
Storm will recycle them through the topology. However, the default value of "topology.max.spout.pending"
is null, and if set to non-null value, the user can manage the consequences by setting batchSize
limits appropriately.  Having the timeout flush will also ameliorate this issue.  So we do
not need to address "topology.max.spout.pending" directly in this task.

This message was sent by Atlassian JIRA

View raw message