gora-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewismc <...@git.apache.org>
Subject [GitHub] gora pull request #134: GORA-535 Add a data store for Apache Ignite
Date Sat, 14 Jul 2018 22:31:27 GMT
Github user lewismc commented on a diff in the pull request:

    https://github.com/apache/gora/pull/134#discussion_r202526802
  
    --- Diff: gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java ---
    @@ -0,0 +1,578 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.gora.ignite.store;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.ConcurrentHashMap;
    +import javax.sql.rowset.CachedRowSet;
    +import javax.sql.rowset.RowSetFactory;
    +import javax.sql.rowset.RowSetProvider;
    +import org.apache.avro.Schema;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificDatumWriter;
    +import org.apache.avro.util.Utf8;
    +import org.apache.gora.ignite.query.IgniteQuery;
    +import org.apache.gora.ignite.query.IgniteResult;
    +import org.apache.gora.ignite.utils.IgniteSQLBuilder;
    +import org.apache.gora.persistency.Persistent;
    +import org.apache.gora.persistency.impl.PersistentBase;
    +import org.apache.gora.query.PartitionQuery;
    +import org.apache.gora.query.Query;
    +import org.apache.gora.query.Result;
    +import org.apache.gora.query.impl.PartitionQueryImpl;
    +import org.apache.gora.store.impl.DataStoreBase;
    +import org.apache.gora.util.AvroUtils;
    +import org.apache.gora.util.GoraException;
    +import org.apache.gora.util.IOUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Implementation of a Ignite data store to be used by gora.
    + *
    + * @param <K> class to be used for the key
    + * @param <T> class to be persisted within the store
    + */
    +public class IgniteStore<K, T extends PersistentBase> extends DataStoreBase<K,
T> {
    +
    +  public static final Logger LOG = LoggerFactory.getLogger(IgniteStore.class);
    +  private static final String PARSE_MAPPING_FILE_KEY = "gora.ignite.mapping.file";
    +  private static final String DEFAULT_MAPPING_FILE = "gora-ignite-mapping.xml";
    +  private IgniteParameters igniteParameters;
    +  private IgniteMapping igniteMapping;
    +  private Connection connection;
    +
    +  /*
    +   * Create a threadlocal map for the datum readers and writers, because they
    +   * are not thread safe, at least not before Avro 1.4.0 (See AVRO-650). When
    +   * they are thread safe, it is possible to maintain a single reader and writer
    +   * pair for every schema, instead of one for every thread.
    +   */
    +  public static final ConcurrentHashMap<Schema, SpecificDatumReader<?>> readerMap
= new ConcurrentHashMap<>();
    +
    +  public static final ConcurrentHashMap<Schema, SpecificDatumWriter<?>> writerMap
= new ConcurrentHashMap<>();
    +
    +  @Override
    +  public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties
properties) throws GoraException {
    +
    +    try {
    +      super.initialize(keyClass, persistentClass, properties);
    +      IgniteMappingBuilder builder = new IgniteMappingBuilder(this);
    +      builder.readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
    +      igniteMapping = builder.getIgniteMapping();
    +      igniteParameters = IgniteParameters.load(properties, conf);
    +      connection = acquiereConnection();
    +      LOG.info("Ignite store was successfully initialized");
    +    } catch (ClassNotFoundException | SQLException ex) {
    +      LOG.error("Error while initializing Ignite store", ex);
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  private Connection acquiereConnection() throws ClassNotFoundException, SQLException
{
    +    Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
    +    StringBuilder urlBuilder = new StringBuilder();
    +    urlBuilder.append("jdbc:ignite:thin://");
    +    urlBuilder.append(igniteParameters.getHost());
    +    if (igniteParameters.getPort() != null) {
    +      urlBuilder.append(":" + igniteParameters.getPort());
    +    }
    +    if (igniteParameters.getSchema() != null) {
    +      urlBuilder.append("/" + igniteParameters.getSchema());
    +    }
    +    if (igniteParameters.getUser() != null) {
    +      urlBuilder.append(";" + igniteParameters.getUser());
    +    }
    +    if (igniteParameters.getPassword() != null) {
    +      urlBuilder.append(";" + igniteParameters.getPassword());
    +    }
    +    if (igniteParameters.getAdditionalConfigurations() != null) {
    +      urlBuilder.append(igniteParameters.getAdditionalConfigurations());
    +    }
    +    Connection conn = DriverManager.getConnection(urlBuilder.toString());
    +    return conn;
    +  }
    +
    +  @Override
    +  public String getSchemaName() {
    +    return igniteMapping.getTableName();
    +  }
    +
    +  @Override
    +  public String getSchemaName(final String mappingSchemaName,
    +      final Class<?> persistentClass) {
    +    return super.getSchemaName(mappingSchemaName, persistentClass);
    +  }
    +
    +  @Override
    +  public void createSchema() throws GoraException {
    +    if (connection == null) {
    +      throw new GoraException(
    +          "Impossible to create the schema as no connection has been initiated.");
    +    }
    +    if (schemaExists()) {
    +      return;
    +    }
    +    try (Statement stmt = connection.createStatement()) {
    +      String createTableSQL = IgniteSQLBuilder.createTable(igniteMapping);
    +      stmt.executeUpdate(createTableSQL);
    +      LOG.info("Table {} has been created for Ignite instance.",
    +          igniteMapping.getTableName());
    +    } catch (SQLException ex) {
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  @Override
    +  public void deleteSchema() throws GoraException {
    +    if (connection == null) {
    +      throw new GoraException(
    +          "Impossible to delete the schema as no connection has been initiated.");
    +    }
    +    try (Statement stmt = connection.createStatement()) {
    +      String dropTableSQL = IgniteSQLBuilder.dropTable(igniteMapping.getTableName());
    +      stmt.executeUpdate(dropTableSQL);
    +      LOG.info("Table {} has been dropped from Ignite instance.",
    +          igniteMapping.getTableName());
    +    } catch (SQLException ex) {
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  @Override
    +  public boolean schemaExists() throws GoraException {
    +    boolean exists = false;
    +    try (Statement stmt = connection.createStatement()) {
    +      String tableExistsSQL = IgniteSQLBuilder.tableExists(igniteMapping.getTableName());
    +      ResultSet executeQuery = stmt.executeQuery(tableExistsSQL);
    +      executeQuery.close();
    +      exists = true;
    +    } catch (SQLException ex) {
    +      /**
    +       * a 42000 error code is thrown by Ignite when a non-existent table
    +       * queried. More details:
    +       * https://apacheignite-sql.readme.io/docs/jdbc-error-codes
    +       */
    +      if (ex.getSQLState() != null && ex.getSQLState().equals("42000")) {
    +        exists = false;
    +      } else {
    +        throw new GoraException(ex);
    +      }
    +    }
    +    return exists;
    +  }
    +
    +  @Override
    +  public T get(K key, String[] fields) throws GoraException {
    +    String[] avFields = getFieldsToQuery(fields);
    +    Object[] keyl = null;
    +    if (igniteMapping.getPrimaryKey().size() == 1) {
    +      keyl = new Object[]{key};
    +    } else {
    +      //Composite key pending
    +    }
    +    //Avro fields to Ignite fields
    +    List<String> dbFields = new ArrayList<>();
    +    for (String af : avFields) {
    +      dbFields.add(igniteMapping.getFields().get(af).getName());
    +    }
    +    String selectQuery = IgniteSQLBuilder.selectGet(igniteMapping, dbFields);
    +    try (PreparedStatement stmt = connection.prepareStatement(selectQuery)) {
    +      IgniteSQLBuilder.fillSelectStatement(stmt, igniteMapping, keyl);
    +      ResultSet rs = stmt.executeQuery();
    +      boolean data = rs.next();
    +      T resp = null;
    +      if (data) {
    +        resp = newInstance(rs, fields);
    +        if (rs.next()) {
    +          LOG.warn("Multiple results for primary key {} in the schema {}, ignoring additional
rows.", keyl, igniteMapping.getTableName());
    +        }
    +      }
    +      rs.close();
    +      return resp;
    +    } catch (SQLException | IOException ex) {
    +      throw new GoraException(ex);
    +    }
    +
    +  }
    +
    +  public T newInstance(ResultSet rs, String[] fields) throws GoraException, SQLException,
IOException {
    +    fields = getFieldsToQuery(fields);
    +    T persistent = newPersistent();
    +    for (String f : fields) {
    +      Schema.Field field = fieldMap.get(f);
    +      Schema fieldSchema = field.schema();
    +      String dbField = igniteMapping.getFields().get(f).getName();
    +      Object sv = rs.getObject(dbField);
    +      if (sv == null) {
    +        continue;
    +      }
    +      Object v = deserializeFieldValue(field, fieldSchema, sv, persistent);
    +      persistent.put(field.pos(), v);
    +      persistent.setDirty(field.pos());
    +    }
    +    return persistent;
    +  }
    +
    +  private Object deserializeFieldValue(Schema.Field field, Schema fieldSchema,
    +      Object igniteValue, T persistent) throws IOException {
    +    Object fieldValue = null;
    +    switch (fieldSchema.getType()) {
    +      case MAP:
    +      case ARRAY:
    +      case RECORD:
    +        @SuppressWarnings("rawtypes") SpecificDatumReader reader = getDatumReader(fieldSchema);
    +        fieldValue = IOUtils.deserialize((byte[]) igniteValue, reader,
    +            persistent.get(field.pos()));
    +        break;
    +      case ENUM:
    +        fieldValue = AvroUtils.getEnumValue(fieldSchema, igniteValue.toString());
    +        break;
    +      case FIXED:
    +        break;
    +      case BYTES:
    +        fieldValue = ByteBuffer.wrap((byte[]) igniteValue);
    +        break;
    +      case STRING:
    +        fieldValue = new Utf8(igniteValue.toString());
    +        break;
    +      case UNION:
    +        if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema)) {
    +          int schemaPos = getUnionSchema(igniteValue, fieldSchema);
    +          Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
    +          fieldValue = deserializeFieldValue(field, unionSchema, igniteValue, persistent);
    +        } else {
    +          reader = getDatumReader(fieldSchema);
    +          fieldValue = IOUtils.deserialize((byte[]) igniteValue, reader,
    +              persistent.get(field.pos()));
    +        }
    +        break;
    +      default:
    +        fieldValue = igniteValue;
    +    }
    +    return fieldValue;
    +
    +  }
    +
    +  @Override
    +  public void put(K key, T obj) throws GoraException {
    +    try {
    +      if (obj.isDirty()) {
    +        Schema schema = obj.getSchema();
    +        List<Schema.Field> fields = schema.getFields();
    +        Map<Column, Object> data = new HashMap<>();
    +        if (igniteMapping.getPrimaryKey().size() == 1) {
    +          Column getKey = igniteMapping.getPrimaryKey().get(0);
    +          data.put(getKey, key);
    +        } else {
    +          //Composite keys pending..
    +        }
    +        for (Schema.Field field : fields) {
    +          Column get = igniteMapping.getFields().get(field.name());
    +          Object v = obj.get(field.pos());
    +          if (get != null && v != null) {
    +            Schema fieldSchema = field.schema();
    +            Object serializedObj = serializeFieldValue(get, fieldSchema, v);
    +            data.put(get, serializedObj);
    +          }
    +        }
    +        String baseInsertStatement = IgniteSQLBuilder.baseInsertStatement(igniteMapping,
data);
    +        try (PreparedStatement stmt = connection.prepareStatement(baseInsertStatement))
{
    +          IgniteSQLBuilder.fillInsertStatement(stmt, data);
    +          stmt.executeUpdate();
    +        } catch (SQLException ex) {
    +          throw new GoraException(ex);
    +        }
    +      } else {
    +        LOG.info("Ignored putting object {} in the store as it is neither "
    +            + "new, neither dirty.", new Object[]{obj});
    +      }
    +    } catch (Exception e) {
    +      throw new GoraException(e);
    +    }
    +  }
    +
    +  @Override
    +  public boolean delete(K key) throws GoraException {
    +    String deleteQuery = null;
    +    Object[] keyl = null;
    +    if (igniteMapping.getPrimaryKey().size() == 1) {
    +      deleteQuery = IgniteSQLBuilder.delete(igniteMapping);
    +      keyl = new Object[]{key};
    +    } else {
    +      //Composite key pending
    +    }
    +    try (PreparedStatement stmt = connection.prepareStatement(deleteQuery)) {
    +      IgniteSQLBuilder.fillDeleteStatement(stmt, igniteMapping, keyl);
    +      stmt.executeUpdate();
    +      return true;
    +    } catch (SQLException ex) {
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  @Override
    +  public long deleteByQuery(Query<K, T> query) throws GoraException {
    +
    +    String deleteQuery;
    +    if (query.getFields() != null && query.getFields().length < igniteMapping.getFields().size())
{
    +      List<String> dbFields = new ArrayList<>();
    +      for (String af : query.getFields()) {
    +        dbFields.add(igniteMapping.getFields().get(af).getName());
    +      }
    +      deleteQuery = IgniteSQLBuilder.deleteQueryFields(igniteMapping, dbFields);
    +    } else {
    +      deleteQuery = IgniteSQLBuilder.deleteQuery(igniteMapping);
    +    }
    +    String selectQueryWhere = IgniteSQLBuilder.selectQueryWhere(igniteMapping, query.getStartKey(),
query.getEndKey(), query.getLimit());
    +    try (PreparedStatement stmt = connection.prepareStatement(deleteQuery + selectQueryWhere))
{
    +      IgniteSQLBuilder.fillSelectQuery(stmt, query.getStartKey(), query.getEndKey());
    +      stmt.executeUpdate();
    +      return 0;
    +    } catch (SQLException ex) {
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  @Override
    +  public Result<K, T> execute(Query<K, T> query) throws GoraException {
    +    String[] fields = getFieldsToQuery(query.getFields());
    +    //Avro fields to Ignite fields
    +    List<String> dbFields = new ArrayList<>();
    +    for (String af : fields) {
    +      dbFields.add(igniteMapping.getFields().get(af).getName());
    +    }
    +    String selectQuery = IgniteSQLBuilder.selectQuery(igniteMapping, dbFields);
    +    String selectQueryWhere = IgniteSQLBuilder.selectQueryWhere(igniteMapping, query.getStartKey(),
query.getEndKey(), query.getLimit());
    +    try {
    +      PreparedStatement stmt = connection.prepareStatement(selectQuery + selectQueryWhere);
    +      RowSetFactory factory = RowSetProvider.newFactory();
    +      CachedRowSet rowset = factory.createCachedRowSet();
    +      IgniteSQLBuilder.fillSelectQuery(stmt, query.getStartKey(), query.getEndKey());
    +      ResultSet executeQuery = stmt.executeQuery();
    +      rowset.populate(executeQuery);
    +      IgniteResult<K, T> igniteResult = new IgniteResult<>(this, query);
    +      igniteResult.setResultSet(rowset);
    +      return igniteResult;
    +    } catch (SQLException ex) {
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  public K extractKey(ResultSet r) throws SQLException {
    +    assert igniteMapping.getPrimaryKey().size() == 1;
    +    return (K) r.getObject(igniteMapping.getPrimaryKey().get(0).getName());
    +  }
    +
    +  @Override
    +  public Query<K, T> newQuery() {
    +    IgniteQuery<K, T> query = new IgniteQuery<>(this);
    +    query.setFields(getFieldsToQuery(null));
    +    return query;
    +  }
    +
    +  @Override
    +  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
throws IOException {
    +    List<PartitionQuery<K, T>> partitions = new ArrayList<>();
    +    PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
    +        query);
    +    partitionQuery.setConf(getConf());
    +    partitions.add(partitionQuery);
    +    return partitions;
    +  }
    +
    +  @Override
    +  public void flush() throws GoraException {
    +    //Auto-commit mode by default
    +  }
    +
    +  @Override
    +  public void close() {
    +    try {
    +      connection.close();
    +      LOG.info("Ignite datastore destroyed successfully.");
    +    } catch (Exception ex) {
    +      LOG.error(ex.getMessage(), ex);
    +    }
    +  }
    +
    +  private Object serializeFieldValue(Column get, Schema fieldSchema, Object fieldValue)
{
    +    Object output = fieldValue;
    +    switch (fieldSchema.getType()) {
    +      case ARRAY:
    +      case MAP:
    +      case RECORD:
    +        byte[] data = null;
    +        try {
    +          @SuppressWarnings("rawtypes")
    +          SpecificDatumWriter writer = getDatumWriter(fieldSchema);
    +          data = IOUtils.serialize(writer, fieldValue);
    --- End diff --
    
    Type safety: Unchecked invocation serialize(SpecificDatumWriter, Object) of the generic
method serialize(SpecificDatumWriter< T >, T) of type IOUtils


---

Mime
View raw message