flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9981) Tune performance of RocksDB implementation
Date Wed, 01 Aug 2018 18:34:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565785#comment-16565785
] 

ASF GitHub Bot commented on FLINK-9981:
---------------------------------------

azagrebin commented on a change in pull request #6438: [FLINK-9981] Tune performance of RocksDB
implementation
URL: https://github.com/apache/flink/pull/6438#discussion_r206962666
 
 

 ##########
 File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
 ##########
 @@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.TreeSet;
+
+import static org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.OrderedByteArraySetCache.LEXICOGRAPHIC_BYTE_COMPARATOR;
+
+/**
+ * A priority queue with set semantics, implemented on top of RocksDB. This uses a {@link
TreeSet} to cache the bytes
+ * of up to the first n elements from RocksDB in memory to reduce interaction with RocksDB,
in particular seek
+ * operations. Cache uses a simple write-through policy.
+ *
+ * @param <E> the type of the contained elements in the queue.
+ */
+public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
+	implements InternalPriorityQueue<E>, HeapPriorityQueueElement {
+
+	/** Serialized empty value to insert into RocksDB. */
+	private static final byte[] DUMMY_BYTES = new byte[] {};
+
+	/** The RocksDB instance that serves as store. */
+	@Nonnull
+	private final RocksDB db;
+
+	/** Handle to the column family of the RocksDB instance in which the elements are stored.
*/
+	@Nonnull
+	private final ColumnFamilyHandle columnFamilyHandle;
+
+	/**
+	 * Serializer for the contained elements. The lexicographical order of the bytes of serialized
objects must be
+	 * aligned with their logical order.
+	 */
+	@Nonnull
+	private final TypeSerializer<E> byteOrderProducingSerializer;
+
+	/** Wrapper to batch all writes to RocksDB. */
+	@Nonnull
+	private final RocksDBWriteBatchWrapper batchWrapper;
+
+	/** The key-group id in serialized form. */
+	@Nonnull
+	private final byte[] groupPrefixBytes;
+
+	/** Output stream that helps to serialize elements. */
+	@Nonnull
+	private final ByteArrayOutputStreamWithPos outputStream;
+
+	/** Output view that helps to serialize elements, must wrap the output stream. */
+	@Nonnull
+	private final DataOutputViewStreamWrapper outputView;
+
+	@Nonnull
+	private final ByteArrayInputStreamWithPos inputStream;
+
+	@Nonnull
+	private final DataInputViewStreamWrapper inputView;
+
+	/** In memory cache that holds a partial view on the head of the RocksDB content. */
+	@Nonnull
+	private final OrderedByteArraySetCache orderedCache;
+
+	/** This holds the key that we use to seek to the first element in RocksDB, to improve seek/iterator
performance. */
+	@Nonnull
+	private byte[] seekHint;
+
+	/** Cache for the head element in de-serialized form. */
+	@Nullable
+	private E peekCache;
+
+	/** This flag is true if there could be elements in the backend that are not in the cache
(false positives ok). */
+	private boolean storeOnlyElements;
 
 Review comment:
   name of `storeOnlyElements` field was a bit unclear w/o comment, maybe `someElementsNotCached`
is more readable

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Tune performance of RocksDB implementation
> ------------------------------------------
>
>                 Key: FLINK-9981
>                 URL: https://issues.apache.org/jira/browse/FLINK-9981
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>
> General performance tuning/polishing for the RocksDB implementation. We can figure out
how caching/seeking can be improved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message