metron-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (METRON-1849) Elasticsearch Index Write Functionality Should be Shared
Date Wed, 05 Dec 2018 13:14:00 GMT

    [ https://issues.apache.org/jira/browse/METRON-1849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710050#comment-16710050
] 

ASF GitHub Bot commented on METRON-1849:
----------------------------------------

Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r239058263
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
---
    @@ -56,90 +60,111 @@
        */
       private transient ElasticsearchClient client;
     
    +  /**
    +   * Responsible for writing documents.
    +   *
    +   * <p>Uses a {@link TupleBasedDocument} to maintain the relationship between
    +   * a {@link Tuple} and the document created from the contents of that tuple. If
    +   * a document cannot be written, the associated tuple needs to be failed.
    +   */
    +  private transient BulkDocumentWriter<TupleBasedDocument> documentWriter;
    +
       /**
        * A simple data formatter used to build the appropriate Elasticsearch index name.
        */
       private SimpleDateFormat dateFormat;
     
    -
       @Override
       public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration
configurations) {
    -
         Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
    -    client = ElasticsearchClientFactory.create(globalConfiguration);
         dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
    +
    +    // only create the document writer, if one does not already exist. useful for testing.
    +    if(documentWriter == null) {
    +      client = ElasticsearchClientFactory.create(globalConfiguration);
    +      documentWriter = new ElasticsearchBulkDocumentWriter<>(client);
    +    }
       }
     
       @Override
    -  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations,
Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
    +  public BulkWriterResponse write(String sensorType,
    +                                  WriterConfiguration configurations,
    +                                  Iterable<Tuple> tuplesIter,
    +                                  List<JSONObject> messages) {
     
         // fetch the field name converter for this sensor type
         FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
    +    String indexPostfix = dateFormat.format(new Date());
    +    String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
    +
    +    // the number of tuples must match the number of messages
    +    List<Tuple> tuples = Lists.newArrayList(tuplesIter);
    +    int batchSize = tuples.size();
    +    if(messages.size() != batchSize) {
    +      throw new IllegalStateException(format("Expect same number of tuples and messages;
|tuples|=%d, |messages|=%d",
    +              tuples.size(), messages.size()));
    +    }
     
    -    final String indexPostfix = dateFormat.format(new Date());
    -    BulkRequest bulkRequest = new BulkRequest();
    -    for(JSONObject message: messages) {
    +    // create a document from each message
    +    List<TupleBasedDocument> documents = new ArrayList<>();
    +    for(int i=0; i<tuples.size(); i++) {
    +      JSONObject message = messages.get(i);
    +      Tuple tuple = tuples.get(i);
     
    -      JSONObject esDoc = new JSONObject();
    +      // transform the message fields to the source fields of the indexed document
    +      JSONObject source = new JSONObject();
           for(Object k : message.keySet()){
    -        copyField(k.toString(), message, esDoc, fieldNameConverter);
    +        copyField(k.toString(), message, source, fieldNameConverter);
           }
     
    -      String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
    -      IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc");
    -      indexRequest.source(esDoc.toJSONString());
    -      String guid = (String)esDoc.get(Constants.GUID);
    -      if(guid != null) {
    -        indexRequest.id(guid);
    +      // define the document id
    +      String guid = String.class.cast(source.get(Constants.GUID));
    --- End diff --
    
    Agreed.


> Elasticsearch Index Write Functionality Should be Shared
> --------------------------------------------------------
>
>                 Key: METRON-1849
>                 URL: https://issues.apache.org/jira/browse/METRON-1849
>             Project: Metron
>          Issue Type: Bug
>            Reporter: Nick Allen
>            Assignee: Nick Allen
>            Priority: Major
>
> The index write functionality is currently duplicated between the ElasticsearchWriter
and the ElasticsearchUpdateDao.  This functionality should be de-duplicated and shared between
the two.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message