Clean resource usage warnings.
Patch by Branimir Lambov; reviewed by tjake for CASSANDRA-10385
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/127f7c58
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/127f7c58
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/127f7c58
Branch: refs/heads/trunk
Commit: 127f7c5844f938649de0f3a5dec630b3c5cda256
Parents: 4b6b24c
Author: Branimir Lambov <branimir.lambov@datastax.com>
Authored: Wed Sep 30 12:41:30 2015 +0300
Committer: T Jake Luciani <jake@apache.org>
Committed: Wed Oct 7 14:04:02 2015 -0400
----------------------------------------------------------------------
build.xml | 1 -
.../apache/cassandra/cache/AutoSavingCache.java | 62 ++++++++------------
.../org/apache/cassandra/cache/OHCProvider.java | 3 +-
.../org/apache/cassandra/db/Clustering.java | 11 +---
src/java/org/apache/cassandra/db/Memtable.java | 2 +
.../cassandra/db/ReadExecutionController.java | 6 +-
.../org/apache/cassandra/db/ReadResponse.java | 16 ++---
.../db/SinglePartitionReadCommand.java | 2 +-
.../db/SinglePartitionSliceCommand.java | 7 +--
.../org/apache/cassandra/db/SystemKeyspace.java | 3 +-
.../db/WindowsFailedSnapshotTracker.java | 41 ++++++-------
.../db/compaction/CompactionManager.java | 6 +-
.../cassandra/db/compaction/Scrubber.java | 2 +-
.../cassandra/db/compaction/Upgrader.java | 3 +-
.../writers/CompactionAwareWriter.java | 11 +++-
.../db/lifecycle/LifecycleTransaction.java | 4 ++
.../db/partitions/PurgingPartitionIterator.java | 2 +
.../cassandra/hints/ChecksummedDataInput.java | 1 +
.../org/apache/cassandra/hints/HintsBuffer.java | 6 +-
.../org/apache/cassandra/hints/HintsReader.java | 5 +-
.../cassandra/hints/HintsWriteExecutor.java | 12 ++--
.../org/apache/cassandra/hints/HintsWriter.java | 4 +-
.../cassandra/hints/LegacyHintsMigrator.java | 4 +-
.../cassandra/io/sstable/SSTableRewriter.java | 22 +++----
.../cassandra/io/sstable/SSTableTxnWriter.java | 10 ++++
.../io/sstable/SimpleSSTableMultiWriter.java | 5 +-
.../apache/cassandra/io/util/SegmentedFile.java | 1 +
.../net/IncomingStreamingConnection.java | 2 +
.../cassandra/net/IncomingTcpConnection.java | 1 +
.../cassandra/security/JKSKeyProvider.java | 9 +--
.../service/pager/MultiPartitionPager.java | 3 +-
.../cassandra/service/pager/PagingState.java | 4 +-
.../cassandra/streaming/StreamReader.java | 2 +-
.../compress/CompressedStreamReader.java | 3 +-
.../streaming/messages/PrepareMessage.java | 1 +
.../streaming/messages/ReceivedMessage.java | 1 +
.../streaming/messages/RetryMessage.java | 1 +
.../cassandra/utils/BytesReadTracker.java | 2 +-
.../utils/concurrent/Transactional.java | 3 +
.../db/lifecycle/RealTransactionsTest.java | 2 +-
.../io/sstable/SSTableRewriterTest.java | 28 ++++-----
41 files changed, 154 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 669ca56..87f98cc 100644
--- a/build.xml
+++ b/build.xml
@@ -2029,7 +2029,6 @@
<target name="eclipse-warnings" depends="build" description="Run eclipse compiler code analysis">
<property name="ecj.log.dir" value="${build.dir}/ecj" />
<property name="ecj.warnings.file" value="${ecj.log.dir}/eclipse_compiler_checks.txt"/>
- <delete dir="${ecj.log.dir}" />
<mkdir dir="${ecj.log.dir}" />
<property name="ecj.properties" value="${basedir}/eclipse_compiler.properties" />
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 8e563fd..e39dcf1 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -333,56 +333,42 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
long start = System.nanoTime();
- WrappedDataOutputStreamPlus writer = null;
Pair<File, File> cacheFilePaths = tempCacheFiles();
- try
+ try (WrappedDataOutputStreamPlus writer = new WrappedDataOutputStreamPlus(streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right)))
{
- try
- {
- writer = new WrappedDataOutputStreamPlus(streamFactory.getOutputStream(cacheFilePaths.left, cacheFilePaths.right));
- }
- catch (FileNotFoundException e)
+
+ //Need to be able to check schema version because CF names are ambiguous
+ UUID schemaVersion = Schema.instance.getVersion();
+ if (schemaVersion == null)
{
- throw new RuntimeException(e);
+ Schema.instance.updateVersion();
+ schemaVersion = Schema.instance.getVersion();
}
+ writer.writeLong(schemaVersion.getMostSignificantBits());
+ writer.writeLong(schemaVersion.getLeastSignificantBits());
- try
+ while (keyIterator.hasNext())
{
- //Need to be able to check schema version because CF names are ambiguous
- UUID schemaVersion = Schema.instance.getVersion();
- if (schemaVersion == null)
- {
- Schema.instance.updateVersion();
- schemaVersion = Schema.instance.getVersion();
- }
- writer.writeLong(schemaVersion.getMostSignificantBits());
- writer.writeLong(schemaVersion.getLeastSignificantBits());
-
- while (keyIterator.hasNext())
- {
- K key = keyIterator.next();
+ K key = keyIterator.next();
- ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(key.ksAndCFName);
- if (cfs == null)
- continue; // the table or 2i has been dropped.
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(key.ksAndCFName);
+ if (cfs == null)
+ continue; // the table or 2i has been dropped.
- cacheLoader.serialize(key, writer, cfs);
+ cacheLoader.serialize(key, writer, cfs);
- keysWritten++;
- if (keysWritten >= keysEstimate)
- break;
- }
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, cacheFilePaths.left);
+ keysWritten++;
+ if (keysWritten >= keysEstimate)
+ break;
}
-
}
- finally
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IOException e)
{
- if (writer != null)
- FileUtils.closeQuietly(writer);
+ throw new FSWriteError(e, cacheFilePaths.left);
}
File cacheFile = getCacheDataPath(CURRENT_VERSION);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/cache/OHCProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/OHCProvider.java b/src/java/org/apache/cassandra/cache/OHCProvider.java
index 8a7bdfc..6f75c74 100644
--- a/src/java/org/apache/cassandra/cache/OHCProvider.java
+++ b/src/java/org/apache/cassandra/cache/OHCProvider.java
@@ -175,8 +175,7 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
public void serialize(IRowCacheEntry entry, ByteBuffer buf)
{
assert entry != null; // unlike CFS we don't support nulls, since there is no need for that in the cache
- DataOutputBufferFixed out = new DataOutputBufferFixed(buf);
- try
+ try (DataOutputBufferFixed out = new DataOutputBufferFixed(buf))
{
boolean isSentinel = entry instanceof RowCacheSentinel;
out.writeBoolean(isSentinel);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/db/Clustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java
index 6bffd45..a40cc1f 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -24,10 +24,7 @@ import java.util.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.*;
import org.apache.cassandra.utils.memory.AbstractAllocator;
/**
@@ -142,9 +139,8 @@ public class Clustering extends AbstractClusteringPrefix
public ByteBuffer serialize(Clustering clustering, int version, List<AbstractType<?>> types)
{
- try
+ try (DataOutputBuffer buffer = new DataOutputBuffer((int)serializedSize(clustering, version, types)))
{
- DataOutputBuffer buffer = new DataOutputBuffer((int)serializedSize(clustering, version, types));
serialize(clustering, buffer, version, types);
return buffer.buffer();
}
@@ -170,9 +166,8 @@ public class Clustering extends AbstractClusteringPrefix
public Clustering deserialize(ByteBuffer in, int version, List<AbstractType<?>> types)
{
- try
+ try (DataInputBuffer buffer = new DataInputBuffer(in, true))
{
- DataInputBuffer buffer = new DataInputBuffer(in, true);
return deserialize(buffer, version, types);
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 81decde..014467e 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -415,6 +416,7 @@ public class Memtable implements Comparable<Memtable>
}
}
+ @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
public SSTableTxnWriter createFlushWriter(String filename,
PartitionColumns columns,
EncodingStats stats)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/db/ReadExecutionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadExecutionController.java b/src/java/org/apache/cassandra/db/ReadExecutionController.java
index 0bb40f8..4a779e4 100644
--- a/src/java/org/apache/cassandra/db/ReadExecutionController.java
+++ b/src/java/org/apache/cassandra/db/ReadExecutionController.java
@@ -61,6 +61,7 @@ public class ReadExecutionController implements AutoCloseable
return new ReadExecutionController(readOp, null, null);
}
+ @SuppressWarnings("resource") // ops closed during controller close
public static ReadExecutionController forCommand(ReadCommand command)
{
ColumnFamilyStore baseCfs = Keyspace.openAndGetStore(command.metadata());
@@ -72,7 +73,7 @@ public class ReadExecutionController implements AutoCloseable
}
else
{
- OpOrder.Group baseOp = null, indexOp = null, writeOp;
+ OpOrder.Group baseOp = null, indexOp = null, writeOp = null;
// OpOrder.start() shouldn't fail, but better safe than sorry.
try
{
@@ -80,12 +81,13 @@ public class ReadExecutionController implements AutoCloseable
indexOp = indexCfs.readOrdering.start();
// TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room
// as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made
- writeOp = baseCfs.keyspace.writeOrder.start();
+ writeOp = Keyspace.writeOrder.start();
return new ReadExecutionController(baseOp, indexOp, writeOp);
}
catch (RuntimeException e)
{
// Note that must have writeOp == null since ReadOrderGroup ctor can't fail
+ assert writeOp == null;
try
{
if (baseOp != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index b8ffe25..41f0d5d 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -186,9 +186,8 @@ public abstract class ReadResponse
public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command)
{
- try
+ try (DataInputBuffer in = new DataInputBuffer(data, true))
{
- DataInputPlus in = new DataInputBuffer(data, true);
return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
MessagingService.current_version,
metadata,
@@ -368,18 +367,13 @@ public abstract class ReadResponse
// ReadResponses from older versions are always single-partition (ranges are handled by RangeSliceReply)
ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
- UnfilteredRowIterator rowIterator = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key);
- if (rowIterator == null)
- return new LegacyRemoteDataResponse(Collections.emptyList());
-
- try
+ try (UnfilteredRowIterator rowIterator = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key))
{
+ if (rowIterator == null)
+ return new LegacyRemoteDataResponse(Collections.emptyList());
+
return new LegacyRemoteDataResponse(Collections.singletonList(ImmutableBTreePartition.create(rowIterator)));
}
- finally
- {
- rowIterator.close();
- }
}
ByteBuffer digest = ByteBufferUtil.readWithVIntLength(in);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 3a50f23..448eaa6 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -254,9 +254,9 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
metric.readLatency.addNano(latencyNanos);
}
+ @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail)
protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController)
{
- @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail)
UnfilteredRowIterator partition = cfs.isRowCacheEnabled()
? getThroughCache(cfs, executionController)
: queryMemtableAndDisk(cfs, executionController);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
index 9fabdc2..e151882 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
@@ -128,6 +128,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
return oldestUnrepairedTombstone;
}
+ @SuppressWarnings("resource") // Iterators added to iterators list which is closed on exception or with the closing of the result of the method.
protected UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
{
Tracing.trace("Acquiring sstable references");
@@ -144,9 +145,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
if (partition == null)
continue;
- @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
- @SuppressWarnings("resource") // same as above
UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter;
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied);
@@ -192,7 +191,6 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
}
sstable.incrementReadCount();
- @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
if (!sstable.isRepaired())
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
@@ -212,7 +210,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
continue;
sstable.incrementReadCount();
- @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator
+
UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp)
{
@@ -239,7 +237,6 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
- @SuppressWarnings("resource") // Closed through the closing of the result of that method.
UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec());
if (!merged.isEmpty())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 5c47e1e..689a33d 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -674,9 +674,8 @@ public final class SystemKeyspace
private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes)
{
- try
+ try (RebufferingInputStream in = new DataInputBuffer(bytes, true))
{
- RebufferingInputStream in = new DataInputBuffer(bytes, true);
return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java b/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
index 9e6bb47..7cc7893 100644
--- a/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
+++ b/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
@@ -52,32 +52,33 @@ public class WindowsFailedSnapshotTracker
{
try
{
- BufferedReader reader = new BufferedReader(new FileReader(TODELETEFILE));
- String snapshotDirectory;
- while ((snapshotDirectory = reader.readLine()) != null)
+ try (BufferedReader reader = new BufferedReader(new FileReader(TODELETEFILE)))
{
- File f = new File(snapshotDirectory);
+ String snapshotDirectory;
+ while ((snapshotDirectory = reader.readLine()) != null)
+ {
+ File f = new File(snapshotDirectory);
- // Skip folders that aren't a subset of temp or a data folder. We don't want people to accidentally
- // delete something important by virtue of adding something invalid to the .toDelete file.
- boolean validFolder = FileUtils.isSubDirectory(new File(System.getenv("TEMP")), f);
- for (String s : DatabaseDescriptor.getAllDataFileLocations())
- validFolder |= FileUtils.isSubDirectory(new File(s), f);
+ // Skip folders that aren't a subset of temp or a data folder. We don't want people to accidentally
+ // delete something important by virtue of adding something invalid to the .toDelete file.
+ boolean validFolder = FileUtils.isSubDirectory(new File(System.getenv("TEMP")), f);
+ for (String s : DatabaseDescriptor.getAllDataFileLocations())
+ validFolder |= FileUtils.isSubDirectory(new File(s), f);
- if (!validFolder)
- {
- logger.warn("Skipping invalid directory found in .toDelete: {}. Only %TEMP% or data file subdirectories are valid.", f);
- continue;
- }
+ if (!validFolder)
+ {
+ logger.warn("Skipping invalid directory found in .toDelete: {}. Only %TEMP% or data file subdirectories are valid.", f);
+ continue;
+ }
- // Could be a non-existent directory if deletion worked on previous JVM shutdown.
- if (f.exists())
- {
- logger.warn("Discovered obsolete snapshot. Deleting directory [{}]", snapshotDirectory);
- FileUtils.deleteRecursive(new File(snapshotDirectory));
+ // Could be a non-existent directory if deletion worked on previous JVM shutdown.
+ if (f.exists())
+ {
+ logger.warn("Discovered obsolete snapshot. Deleting directory [{}]", snapshotDirectory);
+ FileUtils.deleteRecursive(new File(snapshotDirectory));
+ }
}
}
- reader.close();
// Only delete the old .toDelete file if we succeed in deleting all our known bad snapshots.
Files.delete(Paths.get(TODELETEFILE));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 77c0cbb..a8c6c9b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -794,7 +794,7 @@ public class CompactionManager implements CompactionManagerMBean
List<SSTableReader> finished;
int nowInSec = FBUtilities.nowInSeconds();
- try (SSTableRewriter writer = new SSTableRewriter(cfs, txn, sstable.maxDataAge, false);
+ try (SSTableRewriter writer = new SSTableRewriter(txn, sstable.maxDataAge, false);
ISSTableScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs, nowInSec));
CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
@@ -1165,8 +1165,8 @@ public class CompactionManager implements CompactionManagerMBean
int nowInSec = FBUtilities.nowInSeconds();
CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
- try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false);
+ try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(anticompactionGroup, groupMaxDataAge, false, false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(anticompactionGroup, groupMaxDataAge, false, false);
AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals());
CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec));
CompactionIterator ci = new CompactionIterator(OperationType.ANTICOMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index c437832..272c2f8 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -151,7 +151,7 @@ public class Scrubber implements Closeable
List<SSTableReader> finished = new ArrayList<>();
boolean completed = false;
outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
- try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, isOffline))
+ try (SSTableRewriter writer = new SSTableRewriter(transaction, sstable.maxDataAge, isOffline))
{
nextIndexKey = indexAvailable() ? ByteBufferUtil.readWithShortLength(indexFile) : null;
if (indexAvailable())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index ebfd997..fcd1a3c 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -23,7 +23,6 @@ import java.util.*;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -83,7 +82,7 @@ public class Upgrader
{
outputHandler.output("Upgrading " + sstable);
int nowInSec = FBUtilities.nowInSeconds();
- try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true).keepOriginals(keepOriginals);
+ try (SSTableRewriter writer = SSTableRewriter.constructKeepingOriginals(transaction, keepOriginals, CompactionTask.getMaxDataAge(transaction.originals()), true);
AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals());
CompactionIterator iter = new CompactionIterator(transaction.opType(), scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index abc4107..0b3b7d0 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.db.compaction.writers;
import java.util.Collection;
-import java.util.List;
import java.util.Set;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -28,7 +27,6 @@ import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.compaction.CompactionTask;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.concurrent.Transactional;
@@ -65,7 +63,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
this.txn = txn;
- this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline).keepOriginals(keepOriginals);
+ this.sstableWriter = SSTableRewriter.constructKeepingOriginals(txn, keepOriginals, maxAge, offline);
}
@@ -112,6 +110,13 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
return realAppend(partition);
}
+ @Override
+ protected Throwable doPostCleanup(Throwable accumulate)
+ {
+ sstableWriter.close();
+ return super.doPostCleanup(accumulate);
+ }
+
protected abstract boolean realAppend(UnfilteredRowIterator partition);
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 83d0f82..3705f3d 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -140,6 +140,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
/**
* construct an empty Transaction with no existing readers
*/
+ @SuppressWarnings("resource") // log closed during postCleanup
public static LifecycleTransaction offline(OperationType operationType, CFMetaData metadata)
{
Tracker dummy = new Tracker(null, false);
@@ -149,12 +150,14 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
/**
* construct an empty Transaction with no existing readers
*/
+ @SuppressWarnings("resource") // log closed during postCleanup
public static LifecycleTransaction offline(OperationType operationType, File operationFolder)
{
Tracker dummy = new Tracker(null, false);
return new LifecycleTransaction(dummy, new LogTransaction(operationType, operationFolder, dummy), Collections.emptyList());
}
+ @SuppressWarnings("resource") // log closed during postCleanup
LifecycleTransaction(Tracker tracker, OperationType operationType, Iterable<SSTableReader> readers)
{
this(tracker, new LogTransaction(operationType, getMetadata(tracker, readers), tracker), readers);
@@ -283,6 +286,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
@Override
protected Throwable doPostCleanup(Throwable accumulate)
{
+ log.close();
return unmarkCompacting(marked, accumulate);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
index d23d4a7..2093f53 100644
--- a/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
@@ -61,6 +61,7 @@ public abstract class PurgingPartitionIterator extends WrappingUnfilteredPartiti
}
@Override
+ @SuppressWarnings("resource") // 'purged' closes wrapped 'iterator'
public boolean hasNext()
{
while (next == null && super.hasNext())
@@ -76,6 +77,7 @@ public abstract class PurgingPartitionIterator extends WrappingUnfilteredPartiti
}
onEmptyPartitionPostPurge(purged.partitionKey());
+ purged.close();
}
return next != null;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index 95ea256..d5b8ae0 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -57,6 +57,7 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR
resetLimit();
}
+ @SuppressWarnings("resource") // channel owned by RandomAccessReaderWithOwnChannel
public static ChecksummedDataInput open(File file)
{
return new Builder(new ChannelProxy(file)).build();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/hints/HintsBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsBuffer.java b/src/java/org/apache/cassandra/hints/HintsBuffer.java
index 097abce..e86dede 100644
--- a/src/java/org/apache/cassandra/hints/HintsBuffer.java
+++ b/src/java/org/apache/cassandra/hints/HintsBuffer.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CRC32;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.FileUtils;
@@ -142,6 +143,7 @@ final class HintsBuffer
};
}
+ @SuppressWarnings("resource")
Allocation allocate(int hintSize)
{
int totalSize = hintSize + ENTRY_OVERHEAD_SIZE;
@@ -153,7 +155,6 @@ final class HintsBuffer
slab.capacity() / 2));
}
- @SuppressWarnings("resource")
OpOrder.Group opGroup = appendOrder.start(); // will eventually be closed by the receiver of the allocation
try
{
@@ -239,10 +240,9 @@ final class HintsBuffer
private void write(Hint hint)
{
ByteBuffer buffer = (ByteBuffer) slab.duplicate().position(offset).limit(offset + totalSize);
- DataOutputPlus dop = new DataOutputBufferFixed(buffer);
CRC32 crc = new CRC32();
int hintSize = totalSize - ENTRY_OVERHEAD_SIZE;
- try
+ try (DataOutputBuffer dop = new DataOutputBufferFixed(buffer))
{
dop.writeInt(hintSize);
updateChecksumInt(crc, hintSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java
index bc83654..67bb4f6 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -71,6 +71,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
this.rateLimiter = rateLimiter;
}
+ @SuppressWarnings("resource") // HintsReader owns input
static HintsReader open(File file, RateLimiter rateLimiter)
{
ChecksummedDataInput reader = ChecksummedDataInput.open(file);
@@ -81,7 +82,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
}
catch (IOException e)
{
- FileUtils.closeQuietly(reader);
+ reader.close();
throw new FSReadError(e, file);
}
}
@@ -93,7 +94,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
public void close()
{
- FileUtils.closeQuietly(input);
+ input.close();
}
public HintsDescriptor descriptor()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
index be52f92..932f1c7 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
@@ -68,7 +68,7 @@ final class HintsWriteExecutor
/**
* Flush the provided buffer, recycle it and offer it back to the pool.
*/
- Future flushBuffer(HintsBuffer buffer, HintsBufferPool bufferPool)
+ Future<?> flushBuffer(HintsBuffer buffer, HintsBufferPool bufferPool)
{
return executor.submit(new FlushBufferTask(buffer, bufferPool));
}
@@ -76,7 +76,7 @@ final class HintsWriteExecutor
/**
* Flush the current buffer, but without clearing/recycling it.
*/
- Future flushBufferPool(HintsBufferPool bufferPool)
+ Future<?> flushBufferPool(HintsBufferPool bufferPool)
{
return executor.submit(new FlushBufferPoolTask(bufferPool));
}
@@ -84,7 +84,7 @@ final class HintsWriteExecutor
/**
* Flush the current buffer just for the specified hints stores. Without clearing/recycling it.
*/
- Future flushBufferPool(HintsBufferPool bufferPool, Iterable<HintsStore> stores)
+ Future<?> flushBufferPool(HintsBufferPool bufferPool, Iterable<HintsStore> stores)
{
return executor.submit(new PartiallyFlushBufferPoolTask(bufferPool, stores));
}
@@ -101,12 +101,12 @@ final class HintsWriteExecutor
}
}
- Future closeWriter(HintsStore store)
+ Future<?> closeWriter(HintsStore store)
{
return executor.submit(store::closeWriter);
}
- Future closeAllWriters()
+ Future<?> closeAllWriters()
{
return executor.submit(() -> catalog.stores().forEach(HintsStore::closeWriter));
}
@@ -211,11 +211,11 @@ final class HintsWriteExecutor
}
}
+ @SuppressWarnings("resource") // writer not closed here
private void flushInternal(Iterator<ByteBuffer> iterator, HintsStore store)
{
long maxHintsFileSize = DatabaseDescriptor.getMaxHintsFileSize();
- @SuppressWarnings("resource")
HintsWriter writer = store.getOrOpenWriter();
try (HintsWriter.Session session = writer.newSession(writeBuffer))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/hints/HintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java
index 5cadd35..64520b9 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriter.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriter.java
@@ -62,6 +62,7 @@ final class HintsWriter implements AutoCloseable
this.globalCRC = globalCRC;
}
+ @SuppressWarnings("resource") // HintsWriter owns channel
static HintsWriter create(File directory, HintsDescriptor descriptor) throws IOException
{
File file = new File(directory, descriptor.fileName());
@@ -71,10 +72,9 @@ final class HintsWriter implements AutoCloseable
CRC32 crc = new CRC32();
- try
+ try (DataOutputBuffer dob = new DataOutputBuffer())
{
// write the descriptor
- DataOutputBuffer dob = new DataOutputBuffer();
descriptor.serialize(dob);
ByteBuffer descriptorBytes = dob.buffer();
updateChecksum(crc, descriptorBytes);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
index b0095ed..30e5fe0 100644
--- a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
+++ b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
@@ -204,9 +204,9 @@ public final class LegacyHintsMigrator
private static Mutation deserializeLegacyMutation(UntypedResultSet.Row row)
{
- try
+ try (DataInputBuffer dib = new DataInputBuffer(row.getBlob("mutation"), true))
{
- Mutation mutation = Mutation.serializer.deserialize(new DataInputBuffer(row.getBlob("mutation"), true),
+ Mutation mutation = Mutation.serializer.deserialize(dib,
row.getInt("message_version"));
mutation.getPartitionUpdates().forEach(PartitionUpdate::validate);
return mutation;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 34a657f..f3885de 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -24,7 +24,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.cache.InstrumentingCache;
import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -54,7 +53,6 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
@VisibleForTesting
public static boolean disableEarlyOpeningForTests = false;
- private final ColumnFamilyStore cfs;
private final long preemptiveOpenInterval;
private final long maxAge;
private long repairedAt = -1;
@@ -67,7 +65,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
private final List<SSTableWriter> writers = new ArrayList<>();
private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of Tracker)
- private boolean keepOriginals; // true if we do not want to obsolete the originals
+ private final boolean keepOriginals; // true if we do not want to obsolete the originals
private SSTableWriter writer;
private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
@@ -75,33 +73,31 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
// for testing (TODO: remove when have byteman setup)
private boolean throwEarly, throwLate;
- public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, long maxAge, boolean isOffline)
+ public SSTableRewriter(LifecycleTransaction transaction, long maxAge, boolean isOffline)
{
- this(cfs, transaction, maxAge, isOffline, true);
+ this(transaction, maxAge, isOffline, true);
}
- public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, long maxAge, boolean isOffline, boolean shouldOpenEarly)
+ public SSTableRewriter(LifecycleTransaction transaction, long maxAge, boolean isOffline, boolean shouldOpenEarly)
{
- this(cfs, transaction, maxAge, isOffline, calculateOpenInterval(shouldOpenEarly));
+ this(transaction, maxAge, isOffline, calculateOpenInterval(shouldOpenEarly), false);
}
@VisibleForTesting
- public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, long maxAge, boolean isOffline, long preemptiveOpenInterval)
+ public SSTableRewriter(LifecycleTransaction transaction, long maxAge, boolean isOffline, long preemptiveOpenInterval, boolean keepOriginals)
{
this.transaction = transaction;
for (SSTableReader sstable : this.transaction.originals())
fileDescriptors.put(sstable.descriptor, CLibrary.getfd(sstable.getFilename()));
- this.cfs = cfs;
this.maxAge = maxAge;
this.isOffline = isOffline;
- this.keepOriginals = false;
+ this.keepOriginals = keepOriginals;
this.preemptiveOpenInterval = preemptiveOpenInterval;
}
- public SSTableRewriter keepOriginals(boolean val)
+ public static SSTableRewriter constructKeepingOriginals(LifecycleTransaction transaction, boolean keepOriginals, long maxAge, boolean isOffline)
{
- keepOriginals = val;
- return this;
+ return new SSTableRewriter(transaction, maxAge, isOffline, calculateOpenInterval(true), keepOriginals);
}
private static long calculateOpenInterval(boolean shouldOpenEarly)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 5d65a30..0b50901 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -78,6 +78,14 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
txn.prepareToCommit();
}
+ @Override
+ protected Throwable doPostCleanup(Throwable accumulate)
+ {
+ txn.close();
+ writer.close();
+ return super.doPostCleanup(accumulate);
+ }
+
public Collection<SSTableReader> finish(boolean openResult)
{
writer.setOpenResult(openResult);
@@ -85,6 +93,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
return writer.finished();
}
+ @SuppressWarnings("resource") // log and writer closed during postCleanup
public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
{
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory);
@@ -92,6 +101,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
return new SSTableTxnWriter(txn, writer);
}
+ @SuppressWarnings("resource") // log and writer closed during postCleanup
public static SSTableTxnWriter create(CFMetaData cfm, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
{
if (Keyspace.open(cfm.ksName).hasColumnFamilyStore(cfm.cfId))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index 2112656..e034d90 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -42,7 +42,7 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter
public boolean append(UnfilteredRowIterator partition)
{
- RowIndexEntry indexEntry = writer.append(partition);
+ RowIndexEntry<?> indexEntry = writer.append(partition);
return indexEntry != null;
}
@@ -97,11 +97,12 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter
writer.prepareToCommit();
}
- public void close() throws Exception
+ public void close()
{
writer.close();
}
+ @SuppressWarnings("resource") // SimpleSSTableMultiWriter closes writer
public static SSTableMultiWriter create(Descriptor descriptor,
long keyCount,
long repairedAt,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 2504ecd..c827255 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -180,6 +180,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl implements IChec
*/
protected abstract SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength);
+ @SuppressWarnings("resource") // SegmentedFile owns channel
private SegmentedFile complete(String path, int bufferSize, long overrideLength)
{
ChannelProxy channelCopy = getChannel(path);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index a275b2f..e58d227 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.streaming.StreamResultFuture;
@@ -50,6 +51,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable
}
@Override
+ @SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the stream needs to remain open.
public void run()
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index a5e86fa..f1e4e92 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -130,6 +130,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
}
}
+ @SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the stream needs to remain open.
private void receiveMessages() throws IOException
{
// handshake (true) endpoint versions
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/security/JKSKeyProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/JKSKeyProvider.java b/src/java/org/apache/cassandra/security/JKSKeyProvider.java
index 8d7f1c6..af9d515 100644
--- a/src/java/org/apache/cassandra/security/JKSKeyProvider.java
+++ b/src/java/org/apache/cassandra/security/JKSKeyProvider.java
@@ -26,7 +26,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.TransparentDataEncryptionOptions;
-import org.apache.cassandra.io.util.FileUtils;
/**
* A {@code KeyProvider} that retrieves keys from a java keystore.
@@ -47,10 +46,8 @@ public class JKSKeyProvider implements KeyProvider
{
this.options = options;
logger.info("initializing keystore from file {}", options.get(PROP_KEYSTORE));
- FileInputStream inputStream = null;
- try
+ try (FileInputStream inputStream = new FileInputStream(options.get(PROP_KEYSTORE)))
{
- inputStream = new FileInputStream(options.get(PROP_KEYSTORE));
store = KeyStore.getInstance(options.get(PROP_KEYSTORE_TYPE));
store.load(inputStream, options.get(PROP_KEYSTORE_PW).toCharArray());
isJceks = store.getType().equalsIgnoreCase("jceks");
@@ -59,10 +56,6 @@ public class JKSKeyProvider implements KeyProvider
{
throw new RuntimeException("couldn't load keystore", e);
}
- finally
- {
- FileUtils.closeQuietly(inputStream);
- }
}
public Key getSecretKey(String keyAlias) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 09eabb8..fca0165 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -120,7 +120,7 @@ public class MultiPartitionPager implements QueryPager
throw new AssertionError("Shouldn't be called on an exhausted pager");
}
- @SuppressWarnings("resource")
+ @SuppressWarnings("resource") // iter closed via countingIter
public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
{
int toQuery = Math.min(remaining, pageSize);
@@ -130,6 +130,7 @@ public class MultiPartitionPager implements QueryPager
return countingIter;
}
+ @SuppressWarnings("resource") // iter closed via countingIter
public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) throws RequestValidationException, RequestExecutionException
{
int toQuery = Math.min(remaining, pageSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
index 542b6d2..d495f9d 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.service.pager;
-import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
@@ -58,9 +57,8 @@ public class PagingState
if (bytes == null)
return null;
- try
+ try (DataInputBuffer in = new DataInputBuffer(bytes, true))
{
- DataInputBuffer in = new DataInputBuffer(bytes, true);
ByteBuffer pk;
RowMark mark;
int remaining, remainingInPartition;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index d4b7283..879491e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -83,7 +83,7 @@ public class StreamReader
* @return SSTable transferred
* @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
*/
- @SuppressWarnings("resource")
+ @SuppressWarnings("resource") // channel needs to remain open, streams on top of it can't be closed
public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
{
logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 69c7b87..30cafef 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.streaming.compress;
import java.io.DataInputStream;
-
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
@@ -61,7 +60,7 @@ public class CompressedStreamReader extends StreamReader
* @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
*/
@Override
- @SuppressWarnings("resource")
+ @SuppressWarnings("resource") // channel needs to remain open, streams on top of it can't be closed
public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
{
logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
index 7ce1a2a..1f53be7 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
@@ -34,6 +34,7 @@ public class PrepareMessage extends StreamMessage
{
public static Serializer<PrepareMessage> serializer = new Serializer<PrepareMessage>()
{
+ @SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the channel needs to remain open.
public PrepareMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
{
DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
index 71c8c2a..251b9c8 100644
--- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
@@ -33,6 +33,7 @@ public class ReceivedMessage extends StreamMessage
{
public static Serializer<ReceivedMessage> serializer = new Serializer<ReceivedMessage>()
{
+ @SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the channel needs to remain open.
public ReceivedMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
{
DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
index fa9c30f..9363f0a 100644
--- a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
@@ -33,6 +33,7 @@ public class RetryMessage extends StreamMessage
{
public static Serializer<RetryMessage> serializer = new Serializer<RetryMessage>()
{
+ @SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the channel needs to remain open.
public RetryMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
{
DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/utils/BytesReadTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BytesReadTracker.java b/src/java/org/apache/cassandra/utils/BytesReadTracker.java
index 0da51a5..5e98b25 100644
--- a/src/java/org/apache/cassandra/utils/BytesReadTracker.java
+++ b/src/java/org/apache/cassandra/utils/BytesReadTracker.java
@@ -30,7 +30,7 @@ public class BytesReadTracker implements DataInputPlus
{
private long bytesRead;
- private final DataInput source;
+ final DataInput source;
public BytesReadTracker(DataInput source)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index d142f06..abb876c 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -221,4 +221,7 @@ public interface Transactional extends AutoCloseable
Throwable abort(Throwable accumulate);
void prepareToCommit();
+
+ // close() does not throw
+ public void close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index a7ad156..b6cd9a4 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -151,7 +151,7 @@ public class RealTransactionsTest extends SchemaLoader
int nowInSec = FBUtilities.nowInSeconds();
try (CompactionController controller = new CompactionController(cfs, txn.originals(), cfs.gcBefore(FBUtilities.nowInSeconds())))
{
- try (SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);
+ try (SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false);
AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(txn.originals());
CompactionIterator ci = new CompactionIterator(txn.opType(), scanners.scanners, controller, nowInSec, txn.opId())
)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/127f7c58/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 942c7f9..093bffd 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -133,7 +133,7 @@ public class SSTableRewriterTest extends SchemaLoader
int nowInSec = FBUtilities.nowInSeconds();
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
- SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);
+ SSTableRewriter writer = new SSTableRewriter(txn, 1000, false);
CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
{
@@ -165,7 +165,7 @@ public class SSTableRewriterTest extends SchemaLoader
int nowInSec = FBUtilities.nowInSeconds();
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
- SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, 10000000);
+ SSTableRewriter writer = new SSTableRewriter(txn, 1000, false, 10000000, false);
CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
{
@@ -198,7 +198,7 @@ public class SSTableRewriterTest extends SchemaLoader
boolean checked = false;
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
- SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, 10000000);
+ SSTableRewriter writer = new SSTableRewriter(txn, 1000, false, 10000000, false);
CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
{
@@ -306,7 +306,7 @@ public class SSTableRewriterTest extends SchemaLoader
try (ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false, 10000000, false);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
{
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -361,7 +361,7 @@ public class SSTableRewriterTest extends SchemaLoader
try (ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false, 10000000, false);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
{
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -512,7 +512,7 @@ public class SSTableRewriterTest extends SchemaLoader
try (ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000))
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false, 10000000, false))
{
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
test.run(scanner, controller, s, cfs, rewriter, txn);
@@ -544,7 +544,7 @@ public class SSTableRewriterTest extends SchemaLoader
try (ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false, 10000000, false);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
{
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -590,7 +590,7 @@ public class SSTableRewriterTest extends SchemaLoader
try (ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false, 10000000, false);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
{
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -630,7 +630,7 @@ public class SSTableRewriterTest extends SchemaLoader
try (ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 1000000);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false, 1000000, false);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
{
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -716,7 +716,7 @@ public class SSTableRewriterTest extends SchemaLoader
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = offline ? LifecycleTransaction.offline(OperationType.UNKNOWN, compacting)
: cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, offline, 10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, offline, 10000000, false);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
)
{
@@ -806,7 +806,7 @@ public class SSTableRewriterTest extends SchemaLoader
try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 1);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false, 1, false);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
)
{
@@ -844,7 +844,7 @@ public class SSTableRewriterTest extends SchemaLoader
try (ISSTableScanner scanner = sstables.iterator().next().getScanner();
CompactionController controller = new CompactionController(cfs, sstables, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
- SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, 10000000);
+ SSTableRewriter writer = new SSTableRewriter(txn, 1000, false, 10000000, false);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
)
{
@@ -887,8 +887,8 @@ public class SSTableRewriterTest extends SchemaLoader
int nowInSec = FBUtilities.nowInSeconds();
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
- SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, false);
- SSTableRewriter writer2 = new SSTableRewriter(cfs, txn, 1000, false, false);
+ SSTableRewriter writer = new SSTableRewriter(txn, 1000, false, false);
+ SSTableRewriter writer2 = new SSTableRewriter(txn, 1000, false, false);
CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID())
)
|