metron-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (METRON-227) Add Time-Based Flushing to Writer Bolt
Date Wed, 13 Jul 2016 20:34:20 GMT

    [ https://issues.apache.org/jira/browse/METRON-227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15375693#comment-15375693
] 

ASF GitHub Bot commented on METRON-227:
---------------------------------------

Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/188#discussion_r70703025
  
    --- Diff: metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
---
    @@ -34,21 +34,37 @@
     import java.util.function.Function;
     
     public class BulkWriterComponent<MESSAGE_T> {
    -  public static final Logger LOG = LoggerFactory
    -            .getLogger(BulkWriterComponent.class);
    +  public static final Logger LOG = LoggerFactory.getLogger(BulkWriterComponent.class);
       private Map<String, Collection<Tuple>> sensorTupleMap = new HashMap<>();
       private Map<String, List<MESSAGE_T>> sensorMessageMap = new HashMap<>();
       private OutputCollector collector;
       private boolean handleCommit = true;
       private boolean handleError = true;
    +  private Long lastFlushTime;
    +  private Long flushIntervalInMs;
    +  private boolean flush = false;
    +
    +
       public BulkWriterComponent(OutputCollector collector) {
         this.collector = collector;
    +    this.lastFlushTime = System.currentTimeMillis();
    +    this.flush = false;
       }
     
       public BulkWriterComponent(OutputCollector collector, boolean handleCommit, boolean
handleError) {
         this(collector);
         this.handleCommit = handleCommit;
         this.handleError = handleError;
    +    this.lastFlushTime = System.currentTimeMillis();
    +    this.flush = false;
    +  }
    +
    +  public void setFlush(boolean flush) {
    --- End diff --
    
    Since you're deciding whether to flush based on variables in the global config, we don't
need member variables and setters for `flush` and `flushInterval`.  I don't like setting a
member variable in the class every time we call write and we don't need it, they can be local
variables to write


> Add Time-Based Flushing to Writer Bolt
> --------------------------------------
>
>                 Key: METRON-227
>                 URL: https://issues.apache.org/jira/browse/METRON-227
>             Project: Metron
>          Issue Type: Bug
>            Reporter: Domenic Puzio
>            Assignee: Ajay Yadav
>              Labels: 0.2.1BETA
>             Fix For: 0.2.1BETA
>
>
> We need to change the BulkMessageWriterBolt and BulkWriterComponent to use time-based
flushing when writing data to Elasticsearch or Solr.
> Currently, we set a batch size, and the Writer waits for that number of tuples to build
up; however, Storm has a timeout value that prevents it from waiting for too long. If the
Writer does not get the batch size before the timeout, then it recycles the tuples through
the topology. In addition, Storm only allows so many pending messages that have not been acked
- if too many messages are waiting for the bulk Writer, then it will recycle them through
the topology. This is not desired behavior and directly impacts the performance of this Writer.
We would like to be able to specify a unit of time for which the topology would flush, writing
the data it's currently holding to Elasticsearch or Solr even if the batch size is not met.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message