metron-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmiklavc <...@git.apache.org>
Subject [GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...
Date Tue, 04 Dec 2018 23:50:12 GMT
Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r238881060
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
---
    @@ -56,90 +60,111 @@
        */
       private transient ElasticsearchClient client;
     
    +  /**
    +   * Responsible for writing documents.
    +   *
    +   * <p>Uses a {@link TupleBasedDocument} to maintain the relationship between
    +   * a {@link Tuple} and the document created from the contents of that tuple. If
    +   * a document cannot be written, the associated tuple needs to be failed.
    +   */
    +  private transient BulkDocumentWriter<TupleBasedDocument> documentWriter;
    +
       /**
        * A simple data formatter used to build the appropriate Elasticsearch index name.
        */
       private SimpleDateFormat dateFormat;
     
    -
       @Override
       public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration
configurations) {
    -
         Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
    -    client = ElasticsearchClientFactory.create(globalConfiguration);
         dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
    +
    +    // only create the document writer, if one does not already exist. useful for testing.
    +    if(documentWriter == null) {
    +      client = ElasticsearchClientFactory.create(globalConfiguration);
    +      documentWriter = new ElasticsearchBulkDocumentWriter<>(client);
    +    }
       }
     
       @Override
    -  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations,
Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
    +  public BulkWriterResponse write(String sensorType,
    +                                  WriterConfiguration configurations,
    +                                  Iterable<Tuple> tuplesIter,
    +                                  List<JSONObject> messages) {
     
         // fetch the field name converter for this sensor type
         FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
    +    String indexPostfix = dateFormat.format(new Date());
    +    String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
    +
    +    // the number of tuples must match the number of messages
    +    List<Tuple> tuples = Lists.newArrayList(tuplesIter);
    +    int batchSize = tuples.size();
    +    if(messages.size() != batchSize) {
    +      throw new IllegalStateException(format("Expect same number of tuples and messages;
|tuples|=%d, |messages|=%d",
    +              tuples.size(), messages.size()));
    +    }
     
    -    final String indexPostfix = dateFormat.format(new Date());
    -    BulkRequest bulkRequest = new BulkRequest();
    -    for(JSONObject message: messages) {
    +    // create a document from each message
    +    List<TupleBasedDocument> documents = new ArrayList<>();
    +    for(int i=0; i<tuples.size(); i++) {
    +      JSONObject message = messages.get(i);
    +      Tuple tuple = tuples.get(i);
     
    -      JSONObject esDoc = new JSONObject();
    +      // transform the message fields to the source fields of the indexed document
    +      JSONObject source = new JSONObject();
           for(Object k : message.keySet()){
    -        copyField(k.toString(), message, esDoc, fieldNameConverter);
    +        copyField(k.toString(), message, source, fieldNameConverter);
           }
     
    -      String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
    -      IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc");
    -      indexRequest.source(esDoc.toJSONString());
    -      String guid = (String)esDoc.get(Constants.GUID);
    -      if(guid != null) {
    -        indexRequest.id(guid);
    +      // define the document id
    +      String guid = String.class.cast(source.get(Constants.GUID));
    --- End diff --
    
    Should be using https://github.com/apache/metron/blob/master/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java#L39
for conversions.


---

Mime
View raw message