james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adup...@apache.org
Subject [16/41] james-project git commit: JAMES-2082 proposition of a new organisation for blob
Date Mon, 10 Jul 2017 17:54:28 GMT
JAMES-2082 proposition of a new organisation for blob


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e9979b56
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e9979b56
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e9979b56

Branch: refs/heads/master
Commit: e9979b56e03f362c094c72de3ff0ee0f020b5373
Parents: bd69ab9
Author: Luc DUZAN <lduzan@linagora.com>
Authored: Thu Jul 6 17:57:03 2017 +0200
Committer: Antoine Duprat <aduprat@linagora.com>
Committed: Mon Jul 10 14:23:56 2017 +0200

----------------------------------------------------------------------
 .../james/mailbox/cassandra/ids/PartId.java     | 72 ---------------
 .../cassandra/mail/CassandraBlobsDAO.java       | 94 ++++++++++----------
 .../cassandra/modules/CassandraBlobModule.java  | 21 +++--
 .../mailbox/cassandra/table/BlobTable.java      |  7 +-
 .../table/CassandraMessageV2Table.java          | 12 ---
 .../james/mailbox/cassandra/ids/PartIdTest.java | 87 ------------------
 6 files changed, 60 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/e9979b56/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/PartId.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/PartId.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/PartId.java
deleted file mode 100644
index 38715d0..0000000
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/PartId.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.cassandra.ids;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-
-public class PartId {
-    public static PartId create(BlobId blobId, int position) {
-        Preconditions.checkNotNull(blobId);
-        Preconditions.checkArgument(position >= 0, "Position needs to be positive");
-        return new PartId(blobId.getId() + "-" + position);
-    }
-
-    public static PartId from(String id) {
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(id));
-        return new PartId(id);
-    }
-
-    private final String id;
-
-    @VisibleForTesting
-    PartId(String id) {
-        this.id = id;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    @Override
-    public final boolean equals(Object obj) {
-        if (obj instanceof PartId) {
-            PartId other = (PartId) obj;
-            return Objects.equal(id, other.id);
-        }
-        return false;
-    }
-
-    @Override
-    public final int hashCode() {
-        return Objects.hashCode(id);
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects
-            .toStringHelper(this)
-            .add("id", id)
-            .toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/e9979b56/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
index 55efcda..e73fd32 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
@@ -23,36 +23,31 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BlobParts;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Blobs;
 
 import java.nio.ByteBuffer;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.mailbox.cassandra.ids.BlobId;
-import org.apache.james.mailbox.cassandra.ids.PartId;
 import org.apache.james.mailbox.cassandra.mail.utils.DataChunker;
-import org.apache.james.util.CompletableFutureUtil;
+import org.apache.james.mailbox.cassandra.table.BlobTable;
+import org.apache.james.mailbox.cassandra.table.BlobTable.BlobParts;
 import org.apache.james.util.FluentFutureStream;
 import org.apache.james.util.OptionalConverter;
 
 import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.primitives.Bytes;
 
-
 public class CassandraBlobsDAO {
 
     public static final int CHUNK_SIZE = 1024 * 100;
@@ -76,26 +71,27 @@ public class CassandraBlobsDAO {
 
     private PreparedStatement prepareSelect(Session session) {
         return session.prepare(select()
-            .from(Blobs.TABLE_NAME)
-            .where(eq(Blobs.ID, bindMarker(Blobs.ID))));
+            .from(BlobTable.TABLE_NAME)
+            .where(eq(BlobTable.ID, bindMarker(BlobTable.ID))));
     }
 
     private PreparedStatement prepareSelectPart(Session session) {
         return session.prepare(select()
             .from(BlobParts.TABLE_NAME)
-            .where(eq(BlobParts.ID, bindMarker(BlobParts.ID))));
+            .where(eq(BlobTable.ID, bindMarker(BlobTable.ID)))
+            .and(eq(BlobParts.CHUNK_NUMBER, bindMarker(BlobParts.CHUNK_NUMBER))));
     }
 
     private PreparedStatement prepareInsert(Session session) {
-        return session.prepare(insertInto(Blobs.TABLE_NAME)
-            .value(Blobs.ID, bindMarker(Blobs.ID))
-            .value(Blobs.POSITION, bindMarker(Blobs.POSITION))
-            .value(Blobs.PART, bindMarker(Blobs.PART)));
+        return session.prepare(insertInto(BlobTable.TABLE_NAME)
+            .value(BlobTable.ID, bindMarker(BlobTable.ID))
+            .value(BlobTable.NUMBER_OF_CHUNK, bindMarker(BlobTable.NUMBER_OF_CHUNK)));
     }
 
     private PreparedStatement prepareInsertPart(Session session) {
         return session.prepare(insertInto(BlobParts.TABLE_NAME)
-            .value(BlobParts.ID, bindMarker(BlobParts.ID))
+            .value(BlobTable.ID, bindMarker(BlobTable.ID))
+            .value(BlobParts.CHUNK_NUMBER, bindMarker(BlobParts.CHUNK_NUMBER))
             .value(BlobParts.DATA, bindMarker(BlobParts.DATA)));
     }
 
@@ -105,55 +101,57 @@ public class CassandraBlobsDAO {
         }
         BlobId blobId = BlobId.forPayload(data);
         return saveBlobParts(data, blobId)
-            .thenCompose(partIds -> saveBlobPartsReferences(blobId, partIds))
+            .thenCompose(numberOfChunk-> saveBlobPartsReferences(blobId, numberOfChunk))
             .thenApply(any -> Optional.of(blobId));
     }
 
-    private CompletableFuture<Stream<Pair<Integer, PartId>>> saveBlobParts(byte[]
data, BlobId blobId) {
+    private CompletableFuture<Integer> saveBlobParts(byte[] data, BlobId blobId) {
         return FluentFutureStream.of(
             dataChunker.chunk(data, CHUNK_SIZE)
                 .map(pair -> writePart(pair.getRight(), blobId, pair.getKey())
                     .thenApply(partId -> Pair.of(pair.getKey(), partId))))
-            .completableFuture();
+            .completableFuture()
+            .thenApply(stream ->
+                getLastOfStream(stream)
+                    .map(numOfChunkAndPartId -> numOfChunkAndPartId.getLeft() + 1)
+                    .orElse(0));
+    }
+
+    private static <T> Optional<T> getLastOfStream(Stream<T> stream) {
+        return stream.reduce((first, second) -> second);
     }
 
-    private CompletableFuture<PartId> writePart(ByteBuffer data, BlobId blobId, int
position) {
-        PartId partId = PartId.create(blobId, position);
+    private CompletableFuture<Void> writePart(ByteBuffer data, BlobId blobId, int position)
{
         return cassandraAsyncExecutor.executeVoid(
             insertPart.bind()
-                .setString(BlobParts.ID, partId.getId())
-                .setBytes(BlobParts.DATA, data))
-            .thenApply(any -> partId);
+                .setString(BlobTable.ID, blobId.getId())
+                .setInt(BlobParts.CHUNK_NUMBER, position)
+                .setBytes(BlobParts.DATA, data));
     }
 
-    private CompletableFuture<Stream<Void>> saveBlobPartsReferences(BlobId blobId,
Stream<Pair<Integer, PartId>> stream) {
-        return FluentFutureStream.of(stream.map(pair ->
-            cassandraAsyncExecutor.executeVoid(insert.bind()
-                .setString(Blobs.ID, blobId.getId())
-                .setLong(Blobs.POSITION, pair.getKey())
-                .setString(Blobs.PART, pair.getValue().getId()))))
-            .completableFuture();
+    private CompletableFuture<Void> saveBlobPartsReferences(BlobId blobId, int numberOfChunk)
{
+        return cassandraAsyncExecutor.executeVoid(insert.bind()
+            .setString(BlobTable.ID, blobId.getId())
+            .setInt(BlobTable.NUMBER_OF_CHUNK, numberOfChunk));
     }
 
     public CompletableFuture<byte[]> read(BlobId blobId) {
-        return cassandraAsyncExecutor.execute(
+        return cassandraAsyncExecutor.executeSingleRow(
             select.bind()
-                .setString(Blobs.ID, blobId.getId()))
-            .thenApply(this::toPartIds)
+                .setString(BlobTable.ID, blobId.getId()))
             .thenCompose(this::toDataParts)
             .thenApply(this::concatenateDataParts);
     }
 
-    private ImmutableMap<Long, PartId> toPartIds(ResultSet resultSet) {
-        return CassandraUtils.convertToStream(resultSet)
-            .map(row -> Pair.of(row.getLong(Blobs.POSITION), PartId.from(row.getString(Blobs.PART))))
-            .collect(Guavate.toImmutableMap(Pair::getKey, Pair::getValue));
-    }
-
-    private CompletableFuture<Stream<Optional<Row>>> toDataParts(ImmutableMap<Long,
PartId> positionToIds) {
-        return CompletableFutureUtil.chainAll(
-            positionToIds.values().stream(),
-            this::readPart);
+    private CompletableFuture<Stream<Optional<Row>>> toDataParts(Optional<Row>
blobRowOptional) {
+        return blobRowOptional.map(blobRow -> {
+            BlobId blobId = BlobId.from(blobRow.getString(BlobTable.ID));
+            int numOfChunk = blobRow.getInt(BlobTable.NUMBER_OF_CHUNK);
+            return FluentFutureStream.of(
+                IntStream.range(0, numOfChunk)
+                    .mapToObj(position -> readPart(blobId, position)))
+                .completableFuture();
+        }).orElse(CompletableFuture.completedFuture(Stream.empty()));
     }
 
     private byte[] concatenateDataParts(Stream<Optional<Row>> rows) {
@@ -170,8 +168,10 @@ public class CassandraBlobsDAO {
         return data;
     }
 
-    private CompletableFuture<Optional<Row>> readPart(PartId partId) {
-        return cassandraAsyncExecutor.executeSingleRow(selectPart.bind()
-            .setString(BlobParts.ID, partId.getId()));
+    private CompletableFuture<Optional<Row>> readPart(BlobId blobId, int position)
{
+        return cassandraAsyncExecutor.executeSingleRow(
+            selectPart.bind()
+                .setString(BlobTable.ID, blobId.getId())
+                .setInt(BlobParts.CHUNK_NUMBER, position));
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e9979b56/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java
index 4eea870..a8a382b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java
@@ -24,12 +24,11 @@ import java.util.List;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.components.CassandraTable;
 import org.apache.james.backends.cassandra.components.CassandraType;
-import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BlobParts;
-import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Blobs;
 
 import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.schemabuilder.SchemaBuilder;
 import com.google.common.collect.ImmutableList;
+import org.apache.james.mailbox.cassandra.table.BlobTable;
 
 public class CassandraBlobModule implements CassandraModule {
 
@@ -38,17 +37,17 @@ public class CassandraBlobModule implements CassandraModule {
 
     public CassandraBlobModule() {
         tables = ImmutableList.of(
-                new CassandraTable(Blobs.TABLE_NAME,
-                        SchemaBuilder.createTable(Blobs.TABLE_NAME)
+                new CassandraTable(BlobTable.TABLE_NAME,
+                        SchemaBuilder.createTable(BlobTable.TABLE_NAME)
                                 .ifNotExists()
-                                .addPartitionKey(Blobs.ID, DataType.text())
-                                .addClusteringColumn(Blobs.POSITION, DataType.bigint())
-                                .addColumn(Blobs.PART, DataType.text())),
-                new CassandraTable(BlobParts.TABLE_NAME,
-                        SchemaBuilder.createTable(BlobParts.TABLE_NAME)
+                                .addPartitionKey(BlobTable.ID, DataType.text())
+                                .addClusteringColumn(BlobTable.NUMBER_OF_CHUNK, DataType.cint())),
+                new CassandraTable(BlobTable.BlobParts.TABLE_NAME,
+                        SchemaBuilder.createTable(BlobTable.BlobParts.TABLE_NAME)
                                 .ifNotExists()
-                                .addPartitionKey(BlobParts.ID, DataType.text())
-                                .addColumn(BlobParts.DATA, DataType.blob())));
+                                .addPartitionKey(BlobTable.ID, DataType.text())
+                                .addClusteringColumn(BlobTable.BlobParts.CHUNK_NUMBER, DataType.cint())
+                                .addColumn(BlobTable.BlobParts.DATA, DataType.blob())));
         types = ImmutableList.of();
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/e9979b56/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java
index be097a5..1c0b0e9 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java
@@ -19,15 +19,14 @@
 
 package org.apache.james.mailbox.cassandra.table;
 
-public interface BlobsTable {
+public interface BlobTable {
     String TABLE_NAME = "blobs";
     String ID = "id";
-    String POSITION = "position";
-    String PART = "part";
+    String NUMBER_OF_CHUNK = "position";
 
     interface BlobParts {
         String TABLE_NAME = "blobParts";
-        String ID = "id";
+        String CHUNK_NUMBER = "chunkNumber";
         String DATA = "data";
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e9979b56/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java
index f7bc698..c2421f3 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java
@@ -52,16 +52,4 @@ public interface CassandraMessageV2Table {
         String IS_INLINE = "isInline";
     }
 
-    interface Blobs {
-        String TABLE_NAME = "blobs";
-        String ID = "id";
-        String POSITION = "position";
-        String PART = "part";
-    }
-
-    interface BlobParts {
-        String TABLE_NAME = "blobParts";
-        String ID = "id";
-        String DATA = "data";
-    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e9979b56/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java
deleted file mode 100644
index b236c55..0000000
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.cassandra.ids;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import nl.jqno.equalsverifier.EqualsVerifier;
-
-public class PartIdTest {
-    private static final BlobId BLOB_ID = BlobId.from("abc");
-
-    @Rule
-    public ExpectedException expectedException = ExpectedException.none();
-
-    @Test
-    public void shouldRespectBeanContract() {
-        EqualsVerifier.forClass(PartId.class).verify();
-    }
-
-    @Test
-    public void test () {
-        String id = "111";
-        assertThat(PartId.from(id))
-            .isEqualTo(new PartId(id));
-    }
-
-    @Test
-    public void fromShouldThrowOnNull() {
-        expectedException.expect(IllegalArgumentException.class);
-
-        PartId.from(null);
-    }
-
-    @Test
-    public void fromShouldThrowOnEmpty() {
-        expectedException.expect(IllegalArgumentException.class);
-
-        PartId.from("");
-    }
-
-    @Test
-    public void createShouldThrowOnNullBlobId() {
-        expectedException.expect(NullPointerException.class);
-
-        PartId.create(null, 1);
-    }
-
-    @Test
-    public void createShouldThrowOnNegativePosition() {
-        expectedException.expect(IllegalArgumentException.class);
-
-        PartId.create(BLOB_ID, -1);
-    }
-
-    @Test
-    public void createShouldAcceptPositionZero() {
-        assertThat(PartId.create(BLOB_ID, 0).getId())
-            .isEqualTo(BLOB_ID.getId() + "-0");
-    }
-
-    @Test
-    public void createShouldConcatenateBlobIdAndPosition() {
-        assertThat(PartId.create(BLOB_ID, 36).getId())
-            .isEqualTo(BLOB_ID.getId() + "-36");
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message