gora-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From djkevincr <...@git.apache.org>
Subject [GitHub] gora pull request #134: GORA-535 Add a data store for Apache Ignite
Date Tue, 21 Aug 2018 10:15:04 GMT
Github user djkevincr commented on a diff in the pull request:

    https://github.com/apache/gora/pull/134#discussion_r211541050
  
    --- Diff: gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java ---
    @@ -0,0 +1,565 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.gora.ignite.store;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.ConcurrentHashMap;
    +import javax.sql.rowset.CachedRowSet;
    +import javax.sql.rowset.RowSetFactory;
    +import javax.sql.rowset.RowSetProvider;
    +import org.apache.avro.Schema;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificDatumWriter;
    +import org.apache.avro.util.Utf8;
    +import org.apache.gora.ignite.query.IgniteQuery;
    +import org.apache.gora.ignite.query.IgniteResult;
    +import org.apache.gora.ignite.utils.IgniteBackendConstants;
    +import org.apache.gora.ignite.utils.IgniteSQLBuilder;
    +import org.apache.gora.persistency.Persistent;
    +import org.apache.gora.persistency.impl.PersistentBase;
    +import org.apache.gora.query.PartitionQuery;
    +import org.apache.gora.query.Query;
    +import org.apache.gora.query.Result;
    +import org.apache.gora.query.impl.PartitionQueryImpl;
    +import org.apache.gora.store.impl.DataStoreBase;
    +import org.apache.gora.util.AvroUtils;
    +import org.apache.gora.util.GoraException;
    +import org.apache.gora.util.IOUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Implementation of a Ignite data store to be used by gora.
    + *
    + * @param <K> class to be used for the key
    + * @param <T> class to be persisted within the store
    + */
    +public class IgniteStore<K, T extends PersistentBase> extends DataStoreBase<K,
T> {
    +
    +  private static final Logger LOG = LoggerFactory.getLogger(IgniteStore.class);
    +  private static final String PARSE_MAPPING_FILE_KEY = "gora.ignite.mapping.file";
    +  private static final String DEFAULT_MAPPING_FILE = "gora-ignite-mapping.xml";
    +  private IgniteParameters igniteParameters;
    +  private IgniteMapping igniteMapping;
    +  private Connection connection;
    +  private static final ConcurrentHashMap<Schema, SpecificDatumReader<?>>
readerMap = new ConcurrentHashMap<>();
    +  private static final ConcurrentHashMap<Schema, SpecificDatumWriter<?>>
writerMap = new ConcurrentHashMap<>();
    +
    +  @Override
    +  public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties
properties) throws GoraException {
    +    try {
    +      super.initialize(keyClass, persistentClass, properties);
    +      IgniteMappingBuilder<K, T> builder = new IgniteMappingBuilder<K, T>(this);
    +      builder.readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
    +      igniteMapping = builder.getIgniteMapping();
    +      igniteParameters = IgniteParameters.load(properties);
    +      connection = acquireConnection();
    +      LOG.info("Ignite store was successfully initialized");
    +    } catch (ClassNotFoundException | SQLException ex) {
    +      LOG.error("Error while initializing Ignite store", ex);
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  private Connection acquireConnection() throws ClassNotFoundException, SQLException
{
    +    Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
    +    StringBuilder urlBuilder = new StringBuilder();
    +    urlBuilder.append("jdbc:ignite:thin://");
    +    urlBuilder.append(igniteParameters.getHost());
    +    if (igniteParameters.getPort() != null) {
    +      urlBuilder.append(":" + igniteParameters.getPort());
    +    }
    +    if (igniteParameters.getSchema() != null) {
    +      urlBuilder.append("/" + igniteParameters.getSchema());
    +    }
    +    if (igniteParameters.getUser() != null) {
    +      urlBuilder.append(";" + igniteParameters.getUser());
    +    }
    +    if (igniteParameters.getPassword() != null) {
    +      urlBuilder.append(";" + igniteParameters.getPassword());
    +    }
    +    if (igniteParameters.getAdditionalConfigurations() != null) {
    +      urlBuilder.append(igniteParameters.getAdditionalConfigurations());
    +    }
    +    return DriverManager.getConnection(urlBuilder.toString());
    +  }
    +
    +  @Override
    +  public String getSchemaName() {
    +    return igniteMapping.getTableName();
    +  }
    +
    +  @Override
    +  public String getSchemaName(final String mappingSchemaName,
    +      final Class<?> persistentClass) {
    +    return super.getSchemaName(mappingSchemaName, persistentClass);
    +  }
    +
    +  @Override
    +  public void createSchema() throws GoraException {
    +    if (connection == null) {
    +      throw new GoraException(
    +          "Impossible to create the schema as no connection has been initiated.");
    +    }
    +    if (schemaExists()) {
    +      return;
    +    }
    +    try (Statement stmt = connection.createStatement()) {
    +      String createTableSQL = IgniteSQLBuilder.createTable(igniteMapping);
    +      stmt.executeUpdate(createTableSQL);
    +      LOG.info("Table {} has been created for Ignite instance.",
    +          igniteMapping.getTableName());
    +    } catch (SQLException ex) {
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  @Override
    +  public void deleteSchema() throws GoraException {
    +    if (connection == null) {
    +      throw new GoraException(
    +          "Impossible to delete the schema as no connection has been initiated.");
    +    }
    +    try (Statement stmt = connection.createStatement()) {
    +      String dropTableSQL = IgniteSQLBuilder.dropTable(igniteMapping.getTableName());
    +      stmt.executeUpdate(dropTableSQL);
    +      LOG.info("Table {} has been dropped from Ignite instance.",
    +          igniteMapping.getTableName());
    +    } catch (SQLException ex) {
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  @Override
    +  public boolean schemaExists() throws GoraException {
    +    try (Statement stmt = connection.createStatement()) {
    +      String tableExistsSQL = IgniteSQLBuilder.tableExists(igniteMapping.getTableName());
    +      ResultSet executeQuery = stmt.executeQuery(tableExistsSQL);
    +      executeQuery.close();
    +      return true;
    +    } catch (SQLException ex) {
    +      if (ex.getSQLState() != null
    +          && ex.getSQLState().equals(IgniteBackendConstants.DEFAULT_IGNITE_TABLE_NOT_EXISTS_CODE))
{
    +        return false;
    +      } else {
    +        throw new GoraException(ex);
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public T get(K key, String[] fields) throws GoraException {
    +    String[] avFields = getFieldsToQuery(fields);
    +    Object[] keyl = null;
    +    if (igniteMapping.getPrimaryKey().size() == 1) {
    +      keyl = new Object[]{key};
    +    } else {
    +      //Composite key pending
    +    }
    +    //Avro fields to Ignite fields
    +    List<String> dbFields = new ArrayList<>();
    +    for (String af : avFields) {
    +      dbFields.add(igniteMapping.getFields().get(af).getName());
    +    }
    +    String selectQuery = IgniteSQLBuilder.selectGet(igniteMapping, dbFields);
    +    try (PreparedStatement stmt = connection.prepareStatement(selectQuery)) {
    +      IgniteSQLBuilder.fillSelectStatement(stmt, igniteMapping, keyl);
    +      ResultSet rs = stmt.executeQuery();
    +      boolean data = rs.next();
    +      T resp = null;
    +      if (data) {
    +        resp = newInstance(rs, fields);
    +        if (rs.next()) {
    +          LOG.warn("Multiple results for primary key {} in the schema {}, ignoring additional
rows.", keyl, igniteMapping.getTableName());
    +        }
    +      }
    +      rs.close();
    +      return resp;
    +    } catch (SQLException | IOException ex) {
    +      throw new GoraException(ex);
    +    }
    +
    --- End diff --
    
    Please remove unnecessary new lines.


---

Mime
View raw message