gora-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [gora] madhawa-gunasekara commented on a change in pull request #219: [GORA-655] RethinkDB datastore Implementation
Date Mon, 24 Aug 2020 21:30:36 GMT

madhawa-gunasekara commented on a change in pull request #219:
URL: https://github.com/apache/gora/pull/219#discussion_r475904601



##########
File path: gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/store/RethinkDBStore.java
##########
@@ -0,0 +1,774 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.rethinkdb.store;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.List;
+import java.util.ArrayList;
+
+import com.rethinkdb.RethinkDB;
+import com.rethinkdb.gen.ast.ReqlExpr;
+import com.rethinkdb.model.MapObject;
+import com.rethinkdb.net.Connection;
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.rethinkdb.query.RethinkDBQuery;
+import org.apache.gora.persistency.impl.BeanFactoryImpl;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.rethinkdb.query.RethinkDBResult;
+import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.util.AvroUtils;
+import org.apache.gora.util.ClassLoadingUtils;
+import org.apache.gora.util.GoraException;
+
+
+/**
+ * {@inheritDoc}
+ * {@link RethinkDBStore} is the primary class
+ * responsible for facilitating GORA CRUD operations on RethinkDB documents.
+ */
+public class RethinkDBStore<K, T extends PersistentBase> extends DataStoreBase<K,
T> {
+
+  public static final String DEFAULT_MAPPING_FILE = "/gora-rethinkdb-mapping.xml";
+  private RethinkDBStoreParameters rethinkDBStoreParameters;
+  private RethinkDBMapping rethinkDBMapping;
+  public static final RethinkDB r = RethinkDB.r;
+  public Connection connection;
+
+  /**
+   * {@inheritDoc}
+   * Initialize the RethinkDB dataStore by {@link Properties} parameters.
+   *
+   * @param keyClass        key class type for dataStore.
+   * @param persistentClass persistent class type for dataStore.
+   * @param properties      RethinkDB dataStore properties EG:- RethinkDB client credentials.
+   */
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties
properties) throws GoraException {
+    super.initialize(keyClass, persistentClass, properties);
+    try {
+      rethinkDBStoreParameters = RethinkDBStoreParameters.load(properties);
+      connection = r.connection()
+              .hostname(rethinkDBStoreParameters.getServerHost())
+              .port(Integer.valueOf(rethinkDBStoreParameters.getServerPort()))
+              .user(rethinkDBStoreParameters.getUserName(), rethinkDBStoreParameters.getUserPassword())
+              .connect();
+      String databaseIdentifier = rethinkDBStoreParameters.getDatabaseName();
+      if (!r.dbList()
+              .run(connection, ArrayList.class)
+              .first()
+              .stream()
+              .anyMatch(db -> db.equals(databaseIdentifier))) {
+        r.dbCreate(rethinkDBStoreParameters.getDatabaseName()).run(connection);
+      }
+
+      RethinkDBMappingBuilder<K, T> builder = new RethinkDBMappingBuilder<>(this);
+      rethinkDBMapping = builder.fromFile(rethinkDBStoreParameters.getMappingFile()).build();
+      if (!schemaExists()) {
+        createSchema();
+      }
+    } catch (Exception e) {
+      LOG.error("Error while initializing RethinkDB dataStore: {}",
+              new Object[]{e.getMessage()});
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String getSchemaName(final String mappingSchemaName,
+                              final Class<?> persistentClass) {
+    return super.getSchemaName(mappingSchemaName, persistentClass);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String getSchemaName() {
+    return rethinkDBMapping.getDocumentClass();
+  }
+
+  /**
+   * {@inheritDoc}
+   * Create a new class of RethinkDB documents if necessary. Enforce specified schema over
the document class.
+   */
+  @Override
+  public void createSchema() throws GoraException {
+    if (schemaExists()) {
+      return;
+    }
+    try {
+      r.db(rethinkDBStoreParameters.getDatabaseName())
+              .tableCreate(rethinkDBMapping.getDocumentClass())
+              .run(connection);
+    } catch (Exception e) {
+      throw new GoraException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * Deletes enforced schema over RethinkDB Document class.
+   */
+  @Override
+  public void deleteSchema() throws GoraException {
+    try {
+      r.db(rethinkDBStoreParameters.getDatabaseName())
+              .tableDrop(rethinkDBMapping.getDocumentClass())
+              .run(connection);
+    } catch (Exception e) {
+      throw new GoraException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * Check whether there exist a schema enforced over RethinkDB document class.
+   */
+  @Override
+  public boolean schemaExists() throws GoraException {
+    try {
+      String collectionIdentifier = rethinkDBMapping.getDocumentClass();
+      return r.db(rethinkDBStoreParameters.getDatabaseName())
+              .tableList()
+              .run(connection, ArrayList.class).first()
+              .stream()
+              .anyMatch(db -> db.equals(collectionIdentifier));
+    } catch (Exception e) {
+      throw new GoraException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public T get(K key, String[] fields) throws GoraException {
+    try {
+      boolean isExists = r.db(rethinkDBStoreParameters.getDatabaseName())
+              .table(rethinkDBMapping.getDocumentClass())
+              .getAll(key)
+              .count()
+              .run(connection, Boolean.class)
+              .first();
+      if (isExists) {
+        String[] dbFields = getFieldsToQuery(fields);
+        MapObject<String, Object> document = r.db(rethinkDBStoreParameters.getDatabaseName())
+                .table(rethinkDBMapping.getDocumentClass())
+                .get(key)
+                .run(connection, MapObject.class)
+                .first();
+        return convertRethinkDBDocToAvroBean(document, dbFields);
+      } else {
+        return null;
+      }
+    } catch (Exception e) {
+      throw new GoraException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void put(K key, T val) throws GoraException {
+    if (val.isDirty()) {
+      try {
+        boolean isExists = r.db(rethinkDBStoreParameters.getDatabaseName())
+                .table(rethinkDBMapping.getDocumentClass())
+                .getAll(key)
+                .count()
+                .run(connection, Boolean.class)
+                .first();
+        if (!isExists) {
+          MapObject<String, Object> document = convertAvroBeanToRethinkDBDocument(key,
val);
+          r.db(rethinkDBStoreParameters.getDatabaseName())
+                  .table(rethinkDBMapping.getDocumentClass())
+                  .insert(document)
+                  .run(connection);
+        } else {
+          MapObject<String, Object> document = convertAvroBeanToRethinkDBDocument(key,
val);
+          r.db(rethinkDBStoreParameters.getDatabaseName())
+                  .table(rethinkDBMapping.getDocumentClass())
+                  .get(key)
+                  .replace(document)
+                  .run(connection);
+        }
+      } catch (Exception e) {
+        throw new GoraException(e);
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.info("Ignored putting persistent bean {} in the store as it is neither "
+                + "new, neither dirty.", new Object[]{val});
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean delete(K key) throws GoraException {
+    try {
+      r.db(rethinkDBStoreParameters.getDatabaseName())
+              .table(rethinkDBMapping.getDocumentClass())
+              .get(key)
+              .delete()
+              .run(connection);
+      return true;
+    } catch (Exception e) {
+      throw new GoraException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long deleteByQuery(Query<K, T> query) throws GoraException {
+    if (query.getFields() == null || (query.getFields().length == getFields().length)) {
+      String[] fields = getFieldsToQuery(query.getFields());
+      RethinkDBQuery dataStoreQuery = ((RethinkDBQuery) query);
+      dataStoreQuery.populateRethinkDBQuery(rethinkDBMapping, rethinkDBStoreParameters, fields,
getFields());
+      ReqlExpr reqlExpr = dataStoreQuery.getRethinkDBDbQuery();
+      MapObject<String, Object> document = reqlExpr.delete().run(connection, MapObject.class).first();
+      int deleteCount = Integer.valueOf(document.get("deleted").toString());
+      if (deleteCount > 0) {
+        return deleteCount;
+      } else {
+        return 0;
+      }
+    } else {
+      RethinkDBQuery<K, T> dataStoreQuery = new RethinkDBQuery<>(this);
+      dataStoreQuery.setStartKey(query.getStartKey());
+      dataStoreQuery.setEndKey(query.getEndKey());
+      dataStoreQuery.populateRethinkDBQuery(rethinkDBMapping, rethinkDBStoreParameters,
+              getFieldsToQuery(null), getFields());
+      ReqlExpr reqlExpr = dataStoreQuery.getRethinkDBDbQuery();
+      String[] projection = new String[query.getFields().length];
+      int counter = 0;
+      for (String k : query.getFields()) {
+        String dbFieldName = rethinkDBMapping.getDocumentField(k);
+        if (dbFieldName != null && dbFieldName.length() > 0) {
+          projection[counter] = dbFieldName;
+          counter++;
+        }
+      }
+      MapObject<String, Object> document = reqlExpr.replace(row -> row.without(projection))
+              .run(connection, MapObject.class).first();
+      int replacedCount = Integer.valueOf(document.get("replaced").toString());
+      if (replacedCount > 0) {
+        return replacedCount;
+      } else {
+        return 0;
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Result<K, T> execute(Query<K, T> query) throws GoraException {
+    String[] fields = getFieldsToQuery(query.getFields());
+    RethinkDBQuery dataStoreQuery;
+    if (query instanceof RethinkDBQuery) {
+      dataStoreQuery = ((RethinkDBQuery) query);
+    } else {
+      dataStoreQuery = (RethinkDBQuery) ((PartitionQueryImpl<K, T>) query).getBaseQuery();
+    }
+    dataStoreQuery.populateRethinkDBQuery(rethinkDBMapping, rethinkDBStoreParameters, fields,
getFields());
+    try {
+      ReqlExpr reqlExpr = dataStoreQuery.getRethinkDBDbQuery();
+      com.rethinkdb.net.Result<MapObject> result = reqlExpr.run(connection, MapObject.class);
+      return new RethinkDBResult<>(this, query, result);
+    } catch (Exception e) {
+      throw new GoraException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Query<K, T> newQuery() {
+    return new RethinkDBQuery<>(this);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws
IOException {
+    List<PartitionQuery<K, T>> partitions = new ArrayList<>();
+    PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
+            query);
+    partitionQuery.setConf(this.getConf());
+    partitions.add(partitionQuery);
+    return partitions;
+  }
+
+  /**
+   * {@inheritDoc}
+   * Flushes locally cached to content in memory to remote RethinkDB server.
+   */
+  @Override
+  public void flush() throws GoraException {
+
+  }
+
+  /**
+   * {@inheritDoc}
+   * Releases resources which have been used dataStore.
+   */
+  @Override
+  public void close() {
+    try {
+      flush();
+    } catch (Exception ex) {
+      LOG.error("Error occurred while flushing data to RethinkDB : ", ex);
+    }
+  }
+
+  @Override
+  public boolean exists(K key) throws GoraException {
+    boolean isExists = r.db(rethinkDBStoreParameters.getDatabaseName())
+            .table(rethinkDBMapping.getDocumentClass())
+            .getAll(key)
+            .count()
+            .run(connection, Boolean.class)
+            .first();
+    return isExists;
+  }
+
+  private MapObject<String, Object> convertAvroBeanToRethinkDBDocument(final K key,
final T persistent) {
+    MapObject<String, Object> result = new MapObject();
+    for (Schema.Field f : persistent.getSchema().getFields()) {
+      if (persistent.isDirty(f.pos()) && (persistent.get(f.pos()) != null)) {
+        String docf = rethinkDBMapping.getDocumentField(f.name());
+        Object value = persistent.get(f.pos());
+        RethinkDBMapping.DocumentFieldType storeType = rethinkDBMapping.getDocumentFieldType(docf);
+        LOG.debug("Transform value to ODocument, docField:{}, schemaType:{}, storeType:{}",
+                new Object[]{docf, f.schema().getType(), storeType});
+        Object o = convertAvroFieldToRethinkDBField(docf, f.schema(), f.schema().getType(),
+                storeType, value);
+        result.put(docf, o);
+      }
+    }
+    result.put("id", key.toString());
+    return result;
+  }
+
+  private Object convertAvroFieldToRethinkDBField(final String docf, final Schema fieldSchema,
+                                                  final Schema.Type fieldType,
+                                                  final RethinkDBMapping.DocumentFieldType
storeType,
+                                                  final Object value) {
+    Object result = null;
+    switch (fieldType) {
+      case MAP:
+        if (storeType != null && !(storeType == RethinkDBMapping.DocumentFieldType.MAP
||
+                storeType == RethinkDBMapping.DocumentFieldType.DOCUMENT)) {
+          throw new IllegalStateException(
+                  "Field " + fieldSchema.getName()
+                          + ": to store a AVRO 'map', target RethinkDB mapping have to be
of type 'Map'" +
+                          "| 'Document'");
+        }
+        Schema valueSchema = fieldSchema.getValueType();
+        result = convertAvroMapToDocField(docf, (Map<CharSequence, ?>) value, valueSchema,
+                valueSchema.getType(), storeType);
+        break;
+      case ARRAY:
+        if (storeType != null && !(storeType == RethinkDBMapping.DocumentFieldType.LIST))
{
+          throw new IllegalStateException("Field " + fieldSchema.getName()
+                  + ": To store a AVRO 'array', target RethinkDB mapping have to be of type
'List'");
+        }
+        Schema elementSchema = fieldSchema.getElementType();
+        result = convertAvroListToDocField(docf, (List<?>) value, elementSchema,
+                elementSchema.getType(), storeType);
+        break;
+      case BYTES:
+      case FIXED:
+        if (value != null) {
+          result = Base64.getEncoder()
+                  .encodeToString(((ByteBuffer) value).array());
+        }
+        break;
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case BOOLEAN:
+        result = value;
+        break;
+      case STRING:
+        if (value != null) {
+          result = value.toString();
+        }
+        break;
+      case ENUM:
+        if (value != null)
+          result = value.toString();
+        break;
+      case RECORD:
+        if (value == null)
+          break;
+        result = convertAvroBeanToRethinkDBDocField(docf, fieldSchema, value);
+        break;
+      case UNION:
+        result = convertAvroUnionToRethinkDBField(docf, fieldSchema, storeType, value);
+        break;
+      default:
+        LOG.error("Unknown field type: {}", fieldSchema.getType());
+        break;
+    }
+    return result;
+  }
+
+  private Object convertAvroMapToDocField(final String docf,
+                                          final Map<CharSequence, ?> value, final Schema
fieldSchema,
+                                          final Schema.Type fieldType,
+                                          final RethinkDBMapping.DocumentFieldType storeType)
{
+    if (storeType == RethinkDBMapping.DocumentFieldType.MAP) {
+      HashMap map = new HashMap<String, Object>();
+      if (value == null)
+        return map;
+
+      for (Map.Entry<CharSequence, ?> e : value.entrySet()) {
+        String mapKey = e.getKey().toString();
+        Object mapValue = e.getValue();
+
+        RethinkDBMapping.DocumentFieldType fieldStoreType = rethinkDBMapping.getDocumentFieldType(docf);
+        Object result = convertAvroFieldToRethinkDBField(docf, fieldSchema, fieldType, fieldStoreType,
+                mapValue);
+        map.put(mapKey, result);
+      }
+      return map;
+    } else {
+      MapObject<String, Object> doc = new MapObject<String, Object>();
+      if (value == null)
+        return doc;
+      for (Map.Entry<CharSequence, ?> e : value.entrySet()) {
+        String mapKey = e.getKey().toString();
+        Object mapValue = e.getValue();
+
+        RethinkDBMapping.DocumentFieldType fieldStoreType = rethinkDBMapping.getDocumentFieldType(docf);
+        Object result = convertAvroFieldToRethinkDBField(docf, fieldSchema, fieldType, fieldStoreType,
+                mapValue);
+        doc.put(mapKey, result);
+      }
+      return doc;
+    }
+  }
+
+  private Object convertAvroListToDocField(final String docf, final Collection<?> array,
+                                           final Schema fieldSchema, final Schema.Type fieldType,
+                                           final RethinkDBMapping.DocumentFieldType storeType)
{
+    if (storeType == RethinkDBMapping.DocumentFieldType.LIST) {
+      ArrayList list;
+      list = new ArrayList<Object>();
+      if (array == null)
+        return list;
+      for (Object item : array) {
+        RethinkDBMapping.DocumentFieldType fieldStoreType = rethinkDBMapping.getDocumentFieldType(docf);
+        Object result = convertAvroFieldToRethinkDBField(docf, fieldSchema, fieldType, fieldStoreType,
item);
+        list.add(result);
+      }
+      return list;
+    }
+    return null;
+  }
+
+  private MapObject<String, Object> convertAvroBeanToRethinkDBDocField(final String
docf,
+                                                                       final Schema fieldSchema,
+                                                                       final Object value)
{
+    MapObject<String, Object> record = new MapObject();
+    for (Schema.Field member : fieldSchema.getFields()) {
+      Object innerValue = ((PersistentBase) value).get(member.pos());
+      String innerDoc = rethinkDBMapping.getDocumentField(member.name());
+      Schema.Type innerType = member.schema().getType();
+      RethinkDBMapping.DocumentFieldType innerStoreType = rethinkDBMapping.getDocumentFieldType(innerDoc);
+      LOG.debug("Transform value to BaseDocument , docField:{}, schemaType:{}, storeType:{}",
+              new Object[]{member.name(), member.schema().getType(),
+                      innerStoreType});
+      Object fieldValue = convertAvroFieldToRethinkDBField(docf, member.schema()
+              , innerType, innerStoreType, innerValue);
+      record.put(member.name(), fieldValue);
+    }
+    return record;
+  }
+
+  private Object convertAvroUnionToRethinkDBField(final String docf, final Schema fieldSchema,
+                                                  final RethinkDBMapping.DocumentFieldType
storeType,
+                                                  final Object value) {
+    Object result;
+    Schema.Type type0 = fieldSchema.getTypes().get(0).getType();
+    Schema.Type type1 = fieldSchema.getTypes().get(1).getType();
+
+    if (!type0.equals(type1)
+            && (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)))
{
+      Schema innerSchema = null;
+      if (type0.equals(Schema.Type.NULL)) {
+        innerSchema = fieldSchema.getTypes().get(1);
+      } else {
+        innerSchema = fieldSchema.getTypes().get(0);
+      }
+
+      LOG.debug("Transform value to ODocument (UNION), type:{}, storeType:{}",
+              new Object[]{innerSchema.getType(), type1, storeType});
+
+      result = convertAvroFieldToRethinkDBField(docf, innerSchema, innerSchema.getType(),
storeType, value);
+    } else {
+      throw new IllegalStateException("RethinkDBStore only supports Union of two types field.");
+    }
+    return result;
+  }
+
+  public T convertRethinkDBDocToAvroBean(final MapObject<String, Object> obj, final
String[] fields) throws GoraException {
+    T persistent = newPersistent();
+    String[] dbFields = getFieldsToQuery(fields);
+    for (String f : dbFields) {
+      String docf = rethinkDBMapping.getDocumentField(f);
+      if (docf == null || !obj.containsKey(docf))
+        continue;
+
+      RethinkDBMapping.DocumentFieldType storeType = rethinkDBMapping.getDocumentFieldType(docf);
+      Schema.Field field = fieldMap.get(f);
+      Schema fieldSchema = field.schema();
+
+      LOG.debug("Load from ODocument, field:{}, schemaType:{}, docField:{}, storeType:{}",
+              new Object[]{field.name(), fieldSchema.getType(), docf, storeType});
+      Object result = convertDocFieldToAvroField(fieldSchema, storeType, field, docf, obj);
+      persistent.put(field.pos(), result);
+    }
+    persistent.clearDirty();
+    return persistent;
+  }
+
+  private Object convertDocFieldToAvroField(final Schema fieldSchema,
+                                            final RethinkDBMapping.DocumentFieldType storeType,
+                                            final Schema.Field field,
+                                            final String docf,
+                                            final MapObject<String, Object> obj) throws
GoraException {
+    Object result = null;
+    switch (fieldSchema.getType()) {
+      case MAP:
+        result = convertDocFieldToAvroMap(docf, fieldSchema, obj, field, storeType);
+        break;
+      case ARRAY:
+        result = convertDocFieldToAvroList(docf, fieldSchema, obj, field, storeType);
+        break;
+      case RECORD:
+        MapObject<String, Object> record = (MapObject<String, Object>)
+                decorateMapToODoc((Map<String, Object>) obj.get(docf));
+        if (record == null) {
+          result = null;
+          break;
+        }
+        result = convertAvroBeanToRethinkDBDoc(fieldSchema, record);
+        break;
+      case BOOLEAN:
+        result = Boolean.valueOf(obj.get(docf).toString());
+        break;
+      case DOUBLE:
+        result = Double.valueOf(obj.get(docf).toString());
+        break;
+      case FLOAT:
+        result = Float.valueOf(obj.get(docf).toString());
+        break;
+      case INT:
+        result = Integer.valueOf(obj.get(docf).toString());
+        break;
+      case LONG:
+        result = Long.valueOf(obj.get(docf).toString());
+        break;
+      case STRING:
+        result = new Utf8(obj.get(docf).toString());
+        ;

Review comment:
       remove this semi colon




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message