metron-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (METRON-1051) Enable the ability to update indexed messages
Date Thu, 03 Aug 2017 15:11:00 GMT

    [ https://issues.apache.org/jira/browse/METRON-1051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112897#comment-16112897
] 

ASF GitHub Bot commented on METRON-1051:
----------------------------------------

Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/metron/pull/666#discussion_r131162332
  
    --- Diff: metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
---
    @@ -0,0 +1,226 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.metron.elasticsearch.integration;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.core.type.TypeReference;
    +import com.google.common.collect.Iterables;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HBaseConfiguration;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.metron.common.Constants;
    +import org.apache.metron.common.utils.JSONUtils;
    +import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
    +import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
    +import org.apache.metron.hbase.mock.MockHTable;
    +import org.apache.metron.hbase.mock.MockHBaseTableProvider;
    +import org.apache.metron.indexing.dao.*;
    +import org.apache.metron.indexing.dao.update.Document;
    +import org.apache.metron.indexing.dao.update.ReplaceRequest;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import java.io.File;
    +import java.text.SimpleDateFormat;
    +import java.util.*;
    +
    +
    +public class ElasticsearchUpdateIntegrationTest {
    +  private static final int MAX_RETRIES = 10;
    +  private static final int SLEEP_MS = 500;
    +  private static final String SENSOR_NAME= "test";
    +  private static final String TABLE_NAME = "modifications";
    +  private static final String CF = "p";
    +  private static String indexDir = "target/elasticsearch_mutation";
    +  private static String dateFormat = "yyyy.MM.dd.HH";
    +  private static String index = SENSOR_NAME + "_index_" + new SimpleDateFormat(dateFormat).format(new
Date());
    +  private static MockHTable table;
    +  private static IndexDao esDao;
    +  private static IndexDao hbaseDao;
    +  private static MultiIndexDao dao;
    +  private static ElasticSearchComponent es;
    +
    +  @BeforeClass
    +  public static void setup() throws Exception {
    +    Configuration config = HBaseConfiguration.create();
    +    MockHBaseTableProvider tableProvider = new MockHBaseTableProvider();
    +    tableProvider.addToCache(TABLE_NAME, CF);
    +    table = (MockHTable)tableProvider.getTable(config, TABLE_NAME);
    +    // setup the client
    +    es = new ElasticSearchComponent.Builder()
    +            .withHttpPort(9211)
    +            .withIndexDir(new File(indexDir))
    +            .build();
    +    es.start();
    +
    +    hbaseDao = new HBaseDao();
    +    AccessConfig accessConfig = new AccessConfig();
    +    accessConfig.setTableProvider(tableProvider);
    +    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
    +      put("es.clustername", "metron");
    +      put("es.port", "9300");
    +      put("es.ip", "localhost");
    +      put("es.date.format", dateFormat);
    +      put(HBaseDao.HBASE_TABLE, TABLE_NAME);
    +      put(HBaseDao.HBASE_CF, CF);
    +    }};
    +    accessConfig.setGlobalConfigSupplier(() -> globalConfig);
    +
    +    esDao = new ElasticsearchDao();
    +
    +    dao = new MultiIndexDao(hbaseDao, esDao);
    +    dao.init(accessConfig);
    +
    +  }
    +
    +  @AfterClass
    +  public static void teardown() {
    +    if(es != null) {
    +      es.stop();
    +    }
    +  }
    +
    +
    +
    +  @Test
    +  public void test() throws Exception {
    +    List<Map<String, Object>> inputData = new ArrayList<>();
    +    for(int i = 0; i < 10;++i) {
    +      final String name = "message" + i;
    +      inputData.add(
    +              new HashMap<String, Object>() {{
    +                put("source:type", SENSOR_NAME);
    +                put("name" , name);
    +                put("timestamp", System.currentTimeMillis());
    +                put(Constants.GUID, name);
    +              }}
    +                             );
    +    }
    +    es.add(index, SENSOR_NAME
    +          , Iterables.transform(inputData,
    +                    m -> {
    +                      try {
    +                        return JSONUtils.INSTANCE.toJSON(m, true);
    +                      } catch (JsonProcessingException e) {
    +                        throw new IllegalStateException(e.getMessage(), e);
    +                      }
    +                    }
    +                    )
    +    );
    +    List<Map<String,Object>> docs = null;
    +    for(int t = 0;t < MAX_RETRIES;++t, Thread.sleep(SLEEP_MS)) {
    +      docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
    +      if(docs.size() >= 10) {
    +        break;
    +      }
    +    }
    +    Assert.assertEquals(10, docs.size());
    +    //modify the first message and add a new field
    +    {
    +      Map<String, Object> message0 = new HashMap<String, Object>(inputData.get(0))
{{
    +        put("new-field", "metron");
    +      }};
    +      String guid = "" + message0.get(Constants.GUID);
    +      dao.replace(new ReplaceRequest(){{
    +        setReplacement(message0);
    +        setGuid(guid);
    +        setSensorType(SENSOR_NAME);
    +      }}, Optional.empty());
    +      Assert.assertEquals(1, table.size());
    +      Document doc = dao.getLatest(guid, SENSOR_NAME);
    +      Assert.assertEquals(message0, doc.getDocument());
    +      {
    +        //ensure hbase is up to date
    +        Get g = new Get(guid.getBytes());
    +        Result r = table.get(g);
    +        NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
    +        Assert.assertEquals(1, columns.size());
    +        Assert.assertEquals(message0
    +                , JSONUtils.INSTANCE.load(new String(columns.lastEntry().getValue())
    +                        , new TypeReference<Map<String, Object>>() {})
    +        );
    +      }
    +      {
    +        //ensure ES is up-to-date
    +        long cnt = 0;
    +        for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS))
{
    +          docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
    +          cnt = docs
    +                  .stream()
    +                  .filter(d -> {
    +                    Object newfield = d.get("new-field");
    +                    return newfield != null && newfield.equals(message0.get("new-field"));
    +                  }).count();
    +        }
    +        if (cnt == 0) {
    +          Assert.fail("Elasticsearch is not updated!");
    +        }
    +      }
    +    }
    +    //modify the same message and modify the new field
    +    {
    +      Map<String, Object> message0 = new HashMap<String, Object>(inputData.get(0))
{{
    +        put("new-field", "metron2");
    +      }};
    +      String guid = "" + message0.get(Constants.GUID);
    +      dao.replace(new ReplaceRequest(){{
    +        setReplacement(message0);
    +        setGuid(guid);
    +        setSensorType(SENSOR_NAME);
    +      }}, Optional.empty());
    +      Assert.assertEquals(1, table.size());
    +      Document doc = dao.getLatest(guid, SENSOR_NAME);
    +      Assert.assertEquals(message0, doc.getDocument());
    +      {
    +        //ensure hbase is up to date
    +        Get g = new Get(guid.getBytes());
    +        Result r = table.get(g);
    +        NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
    +        Assert.assertEquals(2, columns.size());
    +        Assert.assertEquals(message0, JSONUtils.INSTANCE.load(new String(columns.lastEntry().getValue())
    +                        , new TypeReference<Map<String, Object>>() {})
    +        );
    +        Assert.assertNotEquals(message0, JSONUtils.INSTANCE.load(new String(columns.firstEntry().getValue())
    +                        , new TypeReference<Map<String, Object>>() {})
    +        );
    +      }
    +      {
    +        //ensure ES is up-to-date
    +        long cnt = 0;
    +        for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t,Thread.sleep(SLEEP_MS))
{
    +          docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
    +          cnt = docs
    +                  .stream()
    +                  .filter(d -> {
    +                    Object newfield = d.get("new-field");
    +                    return newfield != null && newfield.equals(message0.get("new-field"));
    +                  }).count();
    +        }
    +        if (cnt == 0) {
    --- End diff --
    
    Nitpick, can we simplify this to
    ```
            Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0);
    ```


> Enable the ability to update indexed messages
> ---------------------------------------------
>
>                 Key: METRON-1051
>                 URL: https://issues.apache.org/jira/browse/METRON-1051
>             Project: Metron
>          Issue Type: Improvement
>            Reporter: Casey Stella
>
> Currently our indexing layer is immutable.  It would be useful to be able to:
> * Update data in the indices
> * Track lineage for updates
> Current version of data should be able to be exposed via a batch interface. 
> Updates should be available via a Java DAO API as well as via the REST API.
> Implementation should ensure that the ability to read the current data in batch is possible.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message