gora-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kamaci <...@git.apache.org>
Subject [GitHub] gora pull request #111: [WIP] GORA-502 Implement Aerospike Data Store
Date Tue, 27 Jun 2017 02:22:38 GMT
Github user kamaci commented on a diff in the pull request:

    https://github.com/apache/gora/pull/111#discussion_r124165139
  
    --- Diff: gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
---
    @@ -0,0 +1,473 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.gora.aerospike.store;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.ArrayList;
    +import java.util.Map;
    +import java.util.Properties;
    +
    +import com.aerospike.client.Key;
    +import com.aerospike.client.Value;
    +import com.aerospike.client.Bin;
    +import com.aerospike.client.Record;
    +import com.aerospike.client.AerospikeClient;
    +import com.aerospike.client.policy.ClientPolicy;
    +import org.apache.avro.Schema;
    +import org.apache.avro.Schema.Field;
    +import org.apache.avro.util.Utf8;
    +import org.apache.gora.persistency.Persistent;
    +import org.apache.gora.persistency.impl.DirtyListWrapper;
    +import org.apache.gora.persistency.impl.DirtyMapWrapper;
    +import org.apache.gora.persistency.impl.PersistentBase;
    +import org.apache.gora.query.PartitionQuery;
    +import org.apache.gora.query.Query;
    +import org.apache.gora.query.Result;
    +import org.apache.gora.store.impl.DataStoreBase;
    +import org.apache.gora.util.AvroUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Implementation of a Aerospike data store to be used by gora.
    + *
    + * @param <K> class to be used for the key
    + * @param <T> class to be persisted within the store
    + */
    +public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K,
T> {
    +
    +  private static final Logger LOG = LoggerFactory.getLogger(AerospikeStore.class);
    +
    +  private static final String PARSE_MAPPING_FILE_KEY = "gora.aerospike.mapping.file";
    +
    +  private static final String DEFAULT_MAPPING_FILE = "gora-aerospike-mapping.xml";
    +
    +  private AerospikeClient aerospikeClient;
    +
    +  private AerospikeParameters aerospikeParameters;
    +
    +  /**
    +   * {@inheritDoc}
    +   * In initializing the aerospike datastore, read the mapping file, sets the basic
    +   * aerospike specific parameters and creates the client with the user defined policies
    +   *
    +   * @param keyClass        key class
    +   * @param persistentClass persistent class
    +   * @param properties      properties
    +   */
    +  @Override
    +  public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties
properties) {
    +    super.initialize(keyClass, persistentClass, properties);
    +
    +    AerospikeMappingBuilder aerospikeMappingBuilder = new AerospikeMappingBuilder();
    +    aerospikeMappingBuilder
    +            .readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE),
keyClass,
    +                    persistentClass);
    +    aerospikeParameters = new AerospikeParameters(aerospikeMappingBuilder.getAerospikeMapping(),
    +            properties);
    +    ClientPolicy policy = new ClientPolicy();
    +    policy.writePolicyDefault = aerospikeParameters.getAerospikeMapping().getWritePolicy();
    +    policy.readPolicyDefault = aerospikeParameters.getAerospikeMapping().getReadPolicy();
    +
    +    aerospikeClient = new AerospikeClient(aerospikeParameters.getHost(),
    +            aerospikeParameters.getPort());
    +    aerospikeParameters.setServerSpecificParameters(aerospikeClient);
    +    aerospikeParameters.validateServerBinConfiguration(persistentClass.getFields());
    +    LOG.info("Aerospike Gora datastore initialized successfully.");
    +  }
    +
    +  /**
    +   * Aerospike, being a schemaless database does not support explicit schema creation
through the
    +   * provided java client. When the records are added to the database, the schema is
created on
    +   * the fly. Thus, schema related functionality is unavailable in gora-aerospike module.
    +   *
    +   * @return null
    +   */
    +  @Override
    +  public String getSchemaName() {
    +    return null;
    +  }
    +
    +  /**
    +   * Aerospike, being a schemaless database does not support explicit schema creation
through the
    +   * provided java client. When the records are added to the database, the schema is
created on
    +   * the fly. Thus, schema creation functionality is unavailable in gora-aerospike module.
    +   */
    +  @Override
    +  public void createSchema() {
    +  }
    +
    +  /**
    +   * Aerospike, being a schemaless database does not support explicit schema creation
through the
    +   * provided java client. When the records are added to the database, the schema is
created on
    +   * the fly. Thus, schema deletion functionality is unavailable in gora-aerospike module.
    +   */
    +  @Override
    +  public void deleteSchema() {
    +  }
    +
    +  /**
    +   * Aerospike, being a schemaless database does not support explicit schema creation
through the
    +   * provided java client. When the records are added to the database, the schema is
created on
    +   * the fly. Thus, schema exists functionality is unavailable in gora-aerospike module.
    +   */
    +  @Override
    +  public boolean schemaExists() {
    +    return true;
    +  }
    +
    +  /**
    +   * {@inheritDoc}
    +   *
    +   * @param key    the key of the object
    +   * @param fields the fields required in the object. Pass null, to retrieve all fields
    +   * @return the Object corresponding to the key or null if it cannot be found
    +   */
    +  @Override
    +  public T get(K key, String[] fields) {
    +
    +    Key recordKey = getAerospikeKey(key);
    +    fields = getFieldsToQuery(fields);
    +
    +    Record record = aerospikeClient
    +            .get(aerospikeParameters.getAerospikeMapping().getReadPolicy(), recordKey,
fields);
    +    if (record == null) {
    +      return null;
    +    }
    +    return createPersistentInstance(record, fields);
    +  }
    +
    +  /**
    +   * Method to insert the persistent objects with the given key to the aerospike database
server.
    +   * In writing the records, the policy defined in the mapping file is used to decide
on the
    +   * behaviour of transaction handling.
    +   *
    +   * @param key        key of the object
    +   * @param persistent object to be persisted
    +   */
    +  @Override
    +  public void put(K key, T persistent) {
    +
    +    Key recordKey = getAerospikeKey(key);
    +
    +    List<Field> fields = persistent.getSchema().getFields();
    +    for (int i = 0; i < fields.size(); i++) {
    +      Object persistentValue = persistent.get(i);
    +      if (persistentValue != null) {
    +        String mappingBinName = aerospikeParameters.getAerospikeMapping().getBinMapping()
    +                .get(fields.get(i).name());
    +        if (mappingBinName == null) {
    +          LOG.error(
    +                  "Aerospike mapping for field {}#{} not found. Wrong gora-aerospike-mapping.xml?",
    +                  persistent.getClass().getName(), fields.get(i).name());
    +          throw new RuntimeException(
    +                  "Aerospike mapping for field [" + persistent.getClass().getName() +
"#" + fields
    +                          .get(i).name() + "] not found. Wrong gora-aerospike-mapping.xml?");
    +        }
    +        Bin bin = new Bin(mappingBinName,
    +                getSerializableValue(persistentValue, fields.get(i).schema()));
    +        aerospikeClient
    +                .put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey,
bin);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * {@inheritDoc}
    +   *
    +   * @param key the key of the object
    +   * @return whether the object was successfully deleted
    +   */
    +  @Override
    +  public boolean delete(K key) {
    +    Key recordKey = getAerospikeKey(key);
    +    return aerospikeClient
    +            .delete(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey);
    +  }
    +
    +  @Override
    +  public long deleteByQuery(Query<K, T> query) {
    +    return 0;
    +  }
    +
    +  @Override
    +  public Result<K, T> execute(Query<K, T> query) {
    +    return null;
    +  }
    +
    +  @Override
    +  public Query<K, T> newQuery() {
    +    return null;
    +  }
    +
    +  @Override
    +  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
throws IOException {
    +    return null;
    +  }
    +
    +  @Override
    +  public void flush() {
    +  }
    +
    +  /**
    +   * Method to close aerospike client connections to database server nodes
    +   */
    +  @Override
    +  public void close() {
    +    aerospikeClient.close();
    +    LOG.info("Aerospike Gora datastore destroyed successfully.");
    +  }
    +
    +  /**
    +   * Method to get the aerospike key from the provided K
    +   *
    +   * @param key persistent key
    +   * @return aerospike key for the record
    +   */
    +  public Key getAerospikeKey(K key) {
    +    Value keyValue;
    +    if (keyClass.getSimpleName().equalsIgnoreCase("string")) {
    +      keyValue = Value.get(key.toString());
    +    } else {
    +      keyValue = Value.get(key);
    +    }
    +
    +    return new Key(aerospikeParameters.getAerospikeMapping().getNamespace(),
    +            aerospikeParameters.getAerospikeMapping().getSet(), keyValue);
    +  }
    +
    +  /**
    +   * Method to get the value serializable in database from the Avro persistent object
    +   *
    +   * @param object persistent object
    +   * @param schema schema of the persistent object
    +   * @return serializable value
    +   */
    +  private Value getSerializableValue(Object object, Schema schema) {
    +
    +    Value value = null;
    +    switch (schema.getType()) {
    +      case UNION:
    +        if (object != null) {
    +          int schemaPos = getUnionSchema(object, schema);
    +          Schema unionSchema = schema.getTypes().get(schemaPos);
    +          value = getSerializableValue(object, unionSchema);
    +        }
    +        break;
    +      case STRING:
    +        value = Value.get(object.toString());
    +        break;
    +      case BYTES:
    +        value = Value.get(((ByteBuffer) object).array());
    +        break;
    +      case MAP:
    +        Map<Object, Object> newMap = new HashMap<>();
    +        Map<?, ?> fieldMap = (Map<?, ?>) object;
    +        for (Object key : fieldMap.keySet()) {
    +          newMap.put(key.toString(),
    +                  getSerializableValue(fieldMap.get(key), schema.getValueType()));
    +        }
    +        value = Value.get(newMap);
    +        break;
    +      case ARRAY:
    +        List<Object> objectList = new ArrayList<>();
    +        for (Object obj : (List<Object>) object) {
    +          objectList.add(getSerializableValue(obj, schema.getElementType()));
    +        }
    +        value = Value.get(objectList);
    +        break;
    +      default:
    +        value = Value.get(object);
    +        break;
    +    }
    +    return value;
    +  }
    +
    +  /**
    +   * Method to create a persistent object given the retrieved record
    +   * from Aerospike database
    +   *
    +   * @param record record retrieved from database
    +   * @param fields fields
    +   * @return persistent object created
    +   */
    +  private T createPersistentInstance(Record record, String[] fields) {
    +
    +    T persistent = newPersistent();
    +    for (String field : fields) {
    +      setPersistentField(field, record, persistent);
    +    }
    +    return persistent;
    +  }
    +
    +  /**
    +   * Method to set a field in the persistent object
    +   *
    +   * @param fieldName  field name
    +   * @param record     record retrieved from database
    +   * @param persistent persistent object for the field to be set
    +   */
    +  private void setPersistentField(String fieldName, Record record, T persistent) {
    +
    +    String binName = aerospikeParameters.getAerospikeMapping().getBinName(fieldName);
    +    if (binName == null) {
    +      LOG.error("Aerospike mapping for field {} not found. Wrong gora-aerospike-mapping.xml",
    +              fieldName);
    +      throw new RuntimeException("Aerospike mapping for field [" + fieldName + "] not
found. "
    +              + "Wrong gora-aerospike-mapping.xml?");
    +    }
    +    if (record.bins.get(fieldName) == null) {
    +      return;
    +    }
    +    String binDataType = record.bins.get(fieldName).getClass().getSimpleName();
    +    Object binValue = record.bins.get(binName);
    +
    +    persistent.put(fieldName,
    +            getDeserializedObject(binValue, binDataType, fieldMap.get(fieldName).schema()));
    +  }
    +
    +  /**
    +   * Method to get Avro mapped persistent object from the record retrieved from the database
    +   *
    +   * @param binValue    value retrieved from database
    +   * @param binDataType data type of the database value
    +   * @param schema      corresponding schema in the persistent class
    +   * @return persistent object
    +   */
    +  private Object getDeserializedObject(Object binValue, String binDataType, Schema schema)
{
    +
    +    Object result;
    +    switch (schema.getType()) {
    +
    +      case MAP:
    +        Map<String, Object> rawMap = (Map<String, Object>) binValue;
    +        Map<Utf8, Object> deserializableMap = new HashMap<>();
    +        if (rawMap == null) {
    +          result = new DirtyMapWrapper(deserializableMap);
    +          break;
    +        }
    +        for (Map.Entry<?, ?> e : rawMap.entrySet()) {
    +          Schema innerSchema = schema.getValueType();
    +          Object obj = getDeserializedObject(e.getValue(), e.getValue().getClass().getSimpleName(),
    +                  innerSchema);
    +          if (e.getKey().getClass().getSimpleName().equalsIgnoreCase("Utf8")) {
    +            deserializableMap.put((Utf8) e.getKey(), obj);
    +          } else {
    +            deserializableMap.put(new Utf8((String) e.getKey()), obj);
    +          }
    +        }
    +        result = new DirtyMapWrapper<>(deserializableMap);
    +        break;
    +
    +      case ARRAY:
    +        List<Object> rawList = (List<Object>) binValue;
    +        List<Object> deserializableList = new ArrayList<>();
    +        if (rawList == null) {
    +          return new DirtyListWrapper(deserializableList);
    +        }
    +        for (Object item : rawList) {
    +          Object obj = getDeserializedObject(item, item.getClass().getSimpleName(),
    +                  schema.getElementType());
    +          deserializableList.add(obj);
    +        }
    +        result = new DirtyListWrapper<>(deserializableList);
    +        break;
    +
    +      case RECORD:
    +        result = (PersistentBase) binValue;
    +        break;
    +
    +      case UNION:
    +        int index = getUnionSchema(binValue, schema);
    +        Schema resolvedSchema = schema.getTypes().get(index);
    +        result = getDeserializedObject(binValue, binDataType, resolvedSchema);
    +        break;
    +
    +      case ENUM:
    +        result = AvroUtils.getEnumValue(schema, (String) binValue);
    +        break;
    +
    +      case BYTES:
    +        result = ByteBuffer.wrap((byte[]) binValue);
    +        break;
    +
    +      case STRING:
    +        if (binValue instanceof org.apache.avro.util.Utf8)
    +          result = binValue;
    +        else
    +          result = new Utf8((String) binValue);
    +        break;
    +
    +      case INT:
    +        if (binDataType.equalsIgnoreCase("long")) {
    +          result = Math.toIntExact((Long) binValue);
    +        } else {
    +          result = binValue;
    +        }
    +        break;
    +
    +      default:
    +        result = binValue;
    +    }
    +    return result;
    +  }
    +
    +  /**
    +   * Method to retrieve the corresponding schema type index of a particular object having
UNION
    +   * schema. As UNION type can have one or more types and at a given instance, it holds
an object
    +   * of only one type of the defined types, this method is used to figure out the corresponding
    +   * instance's
    +   * schema type index.
    +   *
    +   * @param instanceValue value that the object holds
    +   * @param unionSchema   union schema containing all of the data types
    +   * @return the unionSchemaPosition corresponding schema position
    +   */
    +  private int getUnionSchema(Object instanceValue, Schema unionSchema) {
    +    int unionSchemaPos = 0;
    +    for (Schema currentSchema : unionSchema.getTypes()) {
    +      Schema.Type schemaType = currentSchema.getType();
    +      if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING))
    +        return unionSchemaPos;
    +      else if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES))
    --- End diff --
    
    `else` after a `return` statement?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message