gora-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewismc <...@git.apache.org>
Subject [GitHub] gora pull request #134: GORA-535 Add a data store for Apache Ignite
Date Sat, 14 Jul 2018 22:31:11 GMT
Github user lewismc commented on a diff in the pull request:

    https://github.com/apache/gora/pull/134#discussion_r202526790
  
    --- Diff: gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java ---
    @@ -0,0 +1,578 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.gora.ignite.store;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.ConcurrentHashMap;
    +import javax.sql.rowset.CachedRowSet;
    +import javax.sql.rowset.RowSetFactory;
    +import javax.sql.rowset.RowSetProvider;
    +import org.apache.avro.Schema;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificDatumWriter;
    +import org.apache.avro.util.Utf8;
    +import org.apache.gora.ignite.query.IgniteQuery;
    +import org.apache.gora.ignite.query.IgniteResult;
    +import org.apache.gora.ignite.utils.IgniteSQLBuilder;
    +import org.apache.gora.persistency.Persistent;
    +import org.apache.gora.persistency.impl.PersistentBase;
    +import org.apache.gora.query.PartitionQuery;
    +import org.apache.gora.query.Query;
    +import org.apache.gora.query.Result;
    +import org.apache.gora.query.impl.PartitionQueryImpl;
    +import org.apache.gora.store.impl.DataStoreBase;
    +import org.apache.gora.util.AvroUtils;
    +import org.apache.gora.util.GoraException;
    +import org.apache.gora.util.IOUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Implementation of a Ignite data store to be used by gora.
    + *
    + * @param <K> class to be used for the key
    + * @param <T> class to be persisted within the store
    + */
    +public class IgniteStore<K, T extends PersistentBase> extends DataStoreBase<K,
T> {
    +
    +  public static final Logger LOG = LoggerFactory.getLogger(IgniteStore.class);
    +  private static final String PARSE_MAPPING_FILE_KEY = "gora.ignite.mapping.file";
    +  private static final String DEFAULT_MAPPING_FILE = "gora-ignite-mapping.xml";
    +  private IgniteParameters igniteParameters;
    +  private IgniteMapping igniteMapping;
    +  private Connection connection;
    +
    +  /*
    +   * Create a threadlocal map for the datum readers and writers, because they
    +   * are not thread safe, at least not before Avro 1.4.0 (See AVRO-650). When
    +   * they are thread safe, it is possible to maintain a single reader and writer
    +   * pair for every schema, instead of one for every thread.
    +   */
    +  public static final ConcurrentHashMap<Schema, SpecificDatumReader<?>> readerMap
= new ConcurrentHashMap<>();
    +
    +  public static final ConcurrentHashMap<Schema, SpecificDatumWriter<?>> writerMap
= new ConcurrentHashMap<>();
    +
    +  @Override
    +  public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties
properties) throws GoraException {
    +
    +    try {
    +      super.initialize(keyClass, persistentClass, properties);
    +      IgniteMappingBuilder builder = new IgniteMappingBuilder(this);
    +      builder.readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
    +      igniteMapping = builder.getIgniteMapping();
    +      igniteParameters = IgniteParameters.load(properties, conf);
    +      connection = acquiereConnection();
    +      LOG.info("Ignite store was successfully initialized");
    +    } catch (ClassNotFoundException | SQLException ex) {
    +      LOG.error("Error while initializing Ignite store", ex);
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  private Connection acquiereConnection() throws ClassNotFoundException, SQLException
{
    +    Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
    +    StringBuilder urlBuilder = new StringBuilder();
    +    urlBuilder.append("jdbc:ignite:thin://");
    +    urlBuilder.append(igniteParameters.getHost());
    +    if (igniteParameters.getPort() != null) {
    +      urlBuilder.append(":" + igniteParameters.getPort());
    +    }
    +    if (igniteParameters.getSchema() != null) {
    +      urlBuilder.append("/" + igniteParameters.getSchema());
    +    }
    +    if (igniteParameters.getUser() != null) {
    +      urlBuilder.append(";" + igniteParameters.getUser());
    +    }
    +    if (igniteParameters.getPassword() != null) {
    +      urlBuilder.append(";" + igniteParameters.getPassword());
    +    }
    +    if (igniteParameters.getAdditionalConfigurations() != null) {
    +      urlBuilder.append(igniteParameters.getAdditionalConfigurations());
    +    }
    +    Connection conn = DriverManager.getConnection(urlBuilder.toString());
    +    return conn;
    +  }
    +
    +  @Override
    +  public String getSchemaName() {
    +    return igniteMapping.getTableName();
    +  }
    +
    +  @Override
    +  public String getSchemaName(final String mappingSchemaName,
    +      final Class<?> persistentClass) {
    +    return super.getSchemaName(mappingSchemaName, persistentClass);
    +  }
    +
    +  @Override
    +  public void createSchema() throws GoraException {
    +    if (connection == null) {
    +      throw new GoraException(
    +          "Impossible to create the schema as no connection has been initiated.");
    +    }
    +    if (schemaExists()) {
    +      return;
    +    }
    +    try (Statement stmt = connection.createStatement()) {
    +      String createTableSQL = IgniteSQLBuilder.createTable(igniteMapping);
    +      stmt.executeUpdate(createTableSQL);
    +      LOG.info("Table {} has been created for Ignite instance.",
    +          igniteMapping.getTableName());
    +    } catch (SQLException ex) {
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  @Override
    +  public void deleteSchema() throws GoraException {
    +    if (connection == null) {
    +      throw new GoraException(
    +          "Impossible to delete the schema as no connection has been initiated.");
    +    }
    +    try (Statement stmt = connection.createStatement()) {
    +      String dropTableSQL = IgniteSQLBuilder.dropTable(igniteMapping.getTableName());
    +      stmt.executeUpdate(dropTableSQL);
    +      LOG.info("Table {} has been dropped from Ignite instance.",
    +          igniteMapping.getTableName());
    +    } catch (SQLException ex) {
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  @Override
    +  public boolean schemaExists() throws GoraException {
    +    boolean exists = false;
    +    try (Statement stmt = connection.createStatement()) {
    +      String tableExistsSQL = IgniteSQLBuilder.tableExists(igniteMapping.getTableName());
    +      ResultSet executeQuery = stmt.executeQuery(tableExistsSQL);
    +      executeQuery.close();
    +      exists = true;
    +    } catch (SQLException ex) {
    +      /**
    +       * a 42000 error code is thrown by Ignite when a non-existent table
    +       * queried. More details:
    +       * https://apacheignite-sql.readme.io/docs/jdbc-error-codes
    +       */
    +      if (ex.getSQLState() != null && ex.getSQLState().equals("42000")) {
    +        exists = false;
    +      } else {
    +        throw new GoraException(ex);
    +      }
    +    }
    +    return exists;
    +  }
    +
    +  @Override
    +  public T get(K key, String[] fields) throws GoraException {
    +    String[] avFields = getFieldsToQuery(fields);
    +    Object[] keyl = null;
    +    if (igniteMapping.getPrimaryKey().size() == 1) {
    +      keyl = new Object[]{key};
    +    } else {
    +      //Composite key pending
    +    }
    +    //Avro fields to Ignite fields
    +    List<String> dbFields = new ArrayList<>();
    +    for (String af : avFields) {
    +      dbFields.add(igniteMapping.getFields().get(af).getName());
    +    }
    +    String selectQuery = IgniteSQLBuilder.selectGet(igniteMapping, dbFields);
    +    try (PreparedStatement stmt = connection.prepareStatement(selectQuery)) {
    +      IgniteSQLBuilder.fillSelectStatement(stmt, igniteMapping, keyl);
    +      ResultSet rs = stmt.executeQuery();
    +      boolean data = rs.next();
    +      T resp = null;
    +      if (data) {
    +        resp = newInstance(rs, fields);
    +        if (rs.next()) {
    +          LOG.warn("Multiple results for primary key {} in the schema {}, ignoring additional
rows.", keyl, igniteMapping.getTableName());
    +        }
    +      }
    +      rs.close();
    +      return resp;
    +    } catch (SQLException | IOException ex) {
    +      throw new GoraException(ex);
    +    }
    +
    +  }
    +
    +  public T newInstance(ResultSet rs, String[] fields) throws GoraException, SQLException,
IOException {
    +    fields = getFieldsToQuery(fields);
    +    T persistent = newPersistent();
    +    for (String f : fields) {
    +      Schema.Field field = fieldMap.get(f);
    +      Schema fieldSchema = field.schema();
    +      String dbField = igniteMapping.getFields().get(f).getName();
    +      Object sv = rs.getObject(dbField);
    +      if (sv == null) {
    +        continue;
    +      }
    +      Object v = deserializeFieldValue(field, fieldSchema, sv, persistent);
    +      persistent.put(field.pos(), v);
    +      persistent.setDirty(field.pos());
    +    }
    +    return persistent;
    +  }
    +
    +  private Object deserializeFieldValue(Schema.Field field, Schema fieldSchema,
    +      Object igniteValue, T persistent) throws IOException {
    +    Object fieldValue = null;
    +    switch (fieldSchema.getType()) {
    +      case MAP:
    +      case ARRAY:
    +      case RECORD:
    +        @SuppressWarnings("rawtypes") SpecificDatumReader reader = getDatumReader(fieldSchema);
    +        fieldValue = IOUtils.deserialize((byte[]) igniteValue, reader,
    --- End diff --
    
    Type safety: Unchecked invocation deserialize(byte[], SpecificDatumReader, Object) of
the generic method deserialize(byte[], SpecificDatumReader< T >, T) of type IOUtils


---

Mime
View raw message