APEX-184 #resolve 1. In autoFlushExecutor don't exit run() until there is at least one listener that has more data to send. 2. Do not enable read in resumeReadIfSuspended when not able to switch to a new buffer. 3. Fix possible race condition in Block acquire. 4. Fix for incorrect counting of in memory block permits. 5. Fix check style violations. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/5b6e4281 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/5b6e4281 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/5b6e4281 Branch: refs/heads/feature-module Commit: 5b6e4281865dd64baf8b12f87fd6b4225dcd0216 Parents: 48cd2e8 Author: Vlad Rozov Authored: Fri Oct 9 17:35:30 2015 -0700 Committer: Vlad Rozov Committed: Tue Oct 20 19:22:02 2015 -0700 ---------------------------------------------------------------------- bufferserver/pom.xml | 2 +- .../bufferserver/internal/DataList.java | 248 ++++++++++++------- .../bufferserver/internal/DataListener.java | 2 +- .../bufferserver/internal/FastDataList.java | 11 +- .../bufferserver/internal/LogicalNode.java | 37 ++- .../datatorrent/bufferserver/server/Server.java | 108 ++++---- 6 files changed, 232 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5b6e4281/bufferserver/pom.xml ---------------------------------------------------------------------- diff --git a/bufferserver/pom.xml b/bufferserver/pom.xml index 939da4f..126e715 100644 --- a/bufferserver/pom.xml +++ b/bufferserver/pom.xml @@ -51,7 +51,7 @@ org.apache.maven.plugins maven-checkstyle-plugin - 124 + 60 http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5b6e4281/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index f5af2e5..1f6c273 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -19,8 +19,13 @@ package com.datatorrent.bufferserver.internal; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; @@ -40,7 +45,6 @@ import com.datatorrent.bufferserver.util.VarInt; import com.datatorrent.netlet.AbstractClient; import com.datatorrent.netlet.util.VarInt.MutableInt; -import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Maps.newHashMap; import static com.google.common.collect.Sets.newHashSet; @@ -66,9 +70,10 @@ public class DataList protected int size; protected int processingOffset; protected long baseSeconds; - private final List suspendedClients = newArrayList(); + private final Set suspendedClients = newHashSet(); private final AtomicInteger numberOfInMemBlockPermits; private MutableInt nextOffset = new MutableInt(); + private Future future; public DataList(final String identifier, final int blockSize, final int numberOfCacheBlocks) { @@ -106,13 +111,20 @@ public class DataList for (Block temp = first; temp != null; temp = temp.next) { if (temp.starting_window >= longWindowId || temp.ending_window > longWindowId) { if (temp != last) { + last.refCount.decrementAndGet(); last = temp; do { temp = temp.next; temp.discard(false); - if (temp.data != null) { - temp.data = null; - numberOfInMemBlockRewound++; + synchronized (temp) { + if (temp.refCount.get() != 0) { + logger.debug("Discarded block {} has positive reference count. Listeners: {}", temp, all_listeners); + throw new IllegalStateException("Discarded block " + temp + " has positive reference count!"); + } + if (temp.data != null) { + temp.data = null; + numberOfInMemBlockRewound++; + } } } while (temp.next != null); last.next = null; @@ -148,8 +160,13 @@ public class DataList Block temp = first; while (temp != last) { temp.discard(false); - temp.data = null; - temp = temp.next; + synchronized (temp) { + if (temp.refCount.get() != 0) { + throw new IllegalStateException("Discarded block " + temp + " not zero reference count!"); + } + temp.data = null; + temp = temp.next; + } } } first = last; @@ -173,9 +190,15 @@ public class DataList break; } temp.discard(false); - if (temp.data != null) { - temp.data = null; - numberOfInMemBlockPurged++; + synchronized (temp) { + if (temp.refCount.get() != 0) { + logger.debug("Discarded block {} has positive reference count. Listeners: {}", temp, all_listeners); + throw new IllegalStateException("Discarded block " + temp + " has positive reference count!"); + } + if (temp.data != null) { + temp.data = null; + numberOfInMemBlockPurged++; + } } } } @@ -202,21 +225,15 @@ public class DataList do { while (size == 0) { size = VarInt.read(last.data, processingOffset, writeOffset, nextOffset); - switch (nextOffset.integer) { - case -5: - throw new RuntimeException("problemo!"); - - case -4: - case -3: - case -2: - case -1: - case 0: - if (writeOffset == last.data.length) { - nextOffset.integer = 0; - processingOffset = 0; - size = 0; - } - break flush; + if (nextOffset.integer > -5 && nextOffset.integer < 1) { + if (writeOffset == last.data.length) { + nextOffset.integer = 0; + processingOffset = 0; + size = 0; + } + break flush; + } else if (nextOffset.integer == -5) { + throw new RuntimeException("problemo!"); } } @@ -240,6 +257,9 @@ public class DataList Tuple rwt = Tuple.getTuple(last.data, processingOffset, size); baseSeconds = (long)rwt.getBaseSeconds() << 32; break; + + default: + break; } processingOffset += size; size = 0; @@ -255,17 +275,28 @@ public class DataList last.writingOffset = writeOffset; - autoFlushExecutor.submit(new Runnable() - { - @Override - public void run() + notifyListeners(); + + } + + public void notifyListeners() + { + if (future == null || future.isDone() || future.isCancelled()) { + future = autoFlushExecutor.submit(new Runnable() { - for (DataListener dl : all_listeners) { - dl.addedData(); + @Override + public void run() + { + boolean atLeastOneListenerHasDataToSend; + do { + atLeastOneListenerHasDataToSend = false; + for (DataListener dl : all_listeners) { + atLeastOneListenerHasDataToSend |= dl.addedData(); + } + } while (atLeastOneListenerHasDataToSend); } - } - - }); + }); + } } public void setAutoFlushExecutor(final ExecutorService es) @@ -381,7 +412,7 @@ public class DataList public boolean suspendRead(final AbstractClient client) { synchronized (suspendedClients) { - return client.suspendReadIfResumed() && suspendedClients.add(client); + return suspendedClients.add(client) && client.suspendReadIfResumed(); } } @@ -395,6 +426,8 @@ public class DataList } suspendedClients.clear(); } + } else { + logger.debug("Keeping clients: {} suspended, numberOfInMemBlockPermits={}, Listeners: {}", suspendedClients, numberOfInMemBlockPermits, all_listeners); } return resumedSuspendedClients; } @@ -409,7 +442,7 @@ public class DataList return new byte[blockSize]; } - public void addBuffer(byte[] array) + public synchronized void addBuffer(byte[] array) { final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.decrementAndGet(); if (numberOfInMemBlockPermits < 0) { @@ -468,8 +501,7 @@ public class DataList oldestBlockIndex = index; oldestReadOffset = entry.getValue().getReadOffset(); status.slowestConsumer = entry.getKey(); - } - else if (index == oldestBlockIndex && entry.getValue().getReadOffset() < oldestReadOffset) { + } else if (index == oldestBlockIndex && entry.getValue().getReadOffset() < oldestReadOffset) { oldestReadOffset = entry.getValue().getReadOffset(); status.slowestConsumer = entry.getKey(); } @@ -481,8 +513,7 @@ public class DataList status.numBytesAllocated += b.data.length; if (oldestBlockIndex == i) { status.numBytesWaiting += b.writingOffset - oldestReadOffset; - } - else if (oldestBlockIndex < i) { + } else if (oldestBlockIndex < i) { status.numBytesWaiting += b.writingOffset - b.readingOffset; } b = b.next; @@ -508,7 +539,7 @@ public class DataList /** * actual data - stored as length followed by actual data. */ - byte data[]; + byte[] data; /** * readingOffset is the offset of the first valid byte in the array. */ @@ -536,8 +567,8 @@ public class DataList /** * how count of references to this block. */ - AtomicInteger refCount; - Future future; + private final AtomicInteger refCount; + private Future future; public Block(String id, int size) { @@ -566,8 +597,7 @@ public class DataList if (current.offset + current.length > writingOffset) { current.length = 0; } - } - else { + } else { current.length = 0; } } @@ -581,7 +611,7 @@ public class DataList SerializedData sd = dli.next(); switch (sd.buffer[sd.dataOffset]) { case MessageType.RESET_WINDOW_VALUE: - ResetWindowTuple rwt = (ResetWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); bs = (long)rwt.getBaseSeconds() << 32; if (bs > windowId) { writingOffset = sd.offset; @@ -590,12 +620,15 @@ public class DataList break; case MessageType.BEGIN_WINDOW_VALUE: - BeginWindowTuple bwt = (BeginWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); if ((bs | bwt.getWindowId()) >= windowId) { writingOffset = sd.offset; break done; } break; + + default: + break; } } } @@ -628,19 +661,19 @@ public class DataList SerializedData sd = dli.next(); switch (sd.buffer[sd.dataOffset]) { case MessageType.RESET_WINDOW_VALUE: - ResetWindowTuple rwt = (ResetWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); bs = (long)rwt.getBaseSeconds() << 32; lastReset = sd; break; case MessageType.BEGIN_WINDOW_VALUE: - BeginWindowTuple bwt = (BeginWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); if ((bs | bwt.getWindowId()) > longWindowId) { found = true; if (lastReset != null) { - /* - * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of the reset tuple. - */ + /* + * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of the reset tuple. + */ if (sd.offset >= lastReset.length) { sd.offset -= lastReset.length; if (!(sd.buffer == lastReset.buffer && sd.offset == lastReset.offset)) { @@ -655,6 +688,10 @@ public class DataList break done; } + break; + + default: + break; } } } @@ -671,8 +708,7 @@ public class DataList System.arraycopy(lastReset.buffer, lastReset.offset, this.data, this.readingOffset, lastReset.length); this.starting_window = this.ending_window = bs; //logger.debug("=20140220= reassign the windowids {}", this); - } - else { + } else { this.readingOffset = this.writingOffset; this.starting_window = this.ending_window = longWindowId; //logger.debug("=20140220= avoid the windowids {}", this); @@ -692,8 +728,7 @@ public class DataList sd.offset = 0; sd.dataOffset = VarInt.write(sd.length - i, sd.buffer, sd.offset, i); sd.buffer[sd.dataOffset] = MessageType.NO_MESSAGE_VALUE; - } - else { + } else { logger.warn("Unhandled condition while purging the data purge to offset {}", sd.offset); } @@ -710,15 +745,17 @@ public class DataList { byte[] data = storage.retrieve(identifier, uniqueIdentifier); synchronized (Block.this) { - Block.this.data = data; - readingOffset = 0; - writingOffset = data.length; - if (refCount.get() > 1) { + if (Block.this.data == null) { + Block.this.data = data; + readingOffset = 0; + writingOffset = data.length; Block.this.notifyAll(); - } - int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.decrementAndGet(); - if (numberOfInMemBlockPermits < 0) { - logger.warn("Exceeded allowed memory block allocation by {}", -numberOfInMemBlockPermits); + int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.decrementAndGet(); + if (numberOfInMemBlockPermits < 0) { + logger.warn("Exceeded allowed memory block allocation by {}", -numberOfInMemBlockPermits); + } + } else { + logger.debug("Block {} was already loaded into memory", Block.this); } } } @@ -727,20 +764,34 @@ public class DataList protected void acquire(boolean wait) { - if (refCount.getAndIncrement() == 0 && storage != null && data == null) { + int refCount = this.refCount.getAndIncrement(); + synchronized (Block.this) { + if (data != null) { + return; + } + } + if (refCount == 0 && storage != null) { final Runnable retriever = getRetriever(); + if (future != null && future.cancel(false)) { + logger.debug("Block {} future is cancelled", this); + } if (wait) { + future = null; retriever.run(); } else { future = storageExecutor.submit(retriever); } - } else if (wait && data == null) { + } else if (wait) { try { synchronized (Block.this) { - wait(); + if (future == null) { + throw new IllegalStateException("No task is scheduled to retrieve block " + Block.this); + } + while (data == null) { + wait(); + } } - } - catch (InterruptedException ex) { + } catch (InterruptedException ex) { throw new RuntimeException("Interrupted while waiting for data to be loaded!", ex); } } @@ -758,15 +809,16 @@ public class DataList } if (uniqueIdentifier == 0) { logger.warn("Storage returned unexpectedly, please check the status of the spool directory!"); - } - else { - //logger.debug("Spooled {} to disk", Block.this); + } else { + int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.get(); synchronized (Block.this) { - if (refCount.get() == 0) { + if (refCount.get() == 0 && Block.this.data != null) { Block.this.data = null; + numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.incrementAndGet(); + } else { + logger.debug("Keeping Block {} unchanged", Block.this); } } - int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.incrementAndGet(); assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; resumeSuspendedClients(numberOfInMemBlockPermits); } @@ -780,10 +832,17 @@ public class DataList if (refCount == 0 && storage != null) { assert (next != null); final Runnable storer = getStorer(data, readingOffset, writingOffset, storage); - if (wait && numberOfInMemBlockPermits.get() == 0) { + if (future != null && future.cancel(false)) { + logger.debug("Block {} future is cancelled", this); + } + final int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.get(); + if (wait && numberOfInMemBlockPermits == 0) { + future = null; storer.run(); - } else if (numberOfInMemBlockPermits.get() < MAX_COUNT_OF_INMEM_BLOCKS/2) { + } else if (numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS / 2) { future = storageExecutor.submit(storer); + } else { + future = null; } } else { logger.debug("Holding {} in memory due to {} references.", this, refCount); @@ -809,11 +868,12 @@ public class DataList protected void discard(final boolean wait) { if (storage != null) { - if (future != null) { - future.cancel(false); - } final Runnable discarder = getDiscarder(); + if (future != null && future.cancel(false)) { + logger.debug("Block {} future is cancelled", this); + } if (wait) { + future = null; discarder.run(); } else { future = storageExecutor.submit(discarder); @@ -828,7 +888,7 @@ public class DataList + ", readingOffset=" + readingOffset + ", writingOffset=" + writingOffset + ", starting_window=" + Codec.getStringWindowId(starting_window) + ", ending_window=" + Codec.getStringWindowId(ending_window) + ", refCount=" + refCount.get() + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier) - + '}'; + + ", future=" + (future == null ? "null" : future.isDone() ? "Done" : future.isCancelled() ? "Cancelled" : future) + '}'; } } @@ -895,19 +955,13 @@ public class DataList { while (size == 0) { size = VarInt.read(buffer, readOffset, da.writingOffset, nextOffset); - switch (nextOffset.integer) { - case -5: - throw new RuntimeException("problemo!"); - - case -4: - case -3: - case -2: - case -1: - case 0: - if (da.writingOffset == buffer.length && switchToNextBlock()) { - continue; - } - return false; + if (nextOffset.integer > -5 && nextOffset.integer < 1) { + if (da.writingOffset == buffer.length && switchToNextBlock()) { + continue; + } + return false; + } else if (size == -5) { + throw new RuntimeException("problemo!"); } } @@ -965,6 +1019,12 @@ public class DataList size = 0; } + @Override + public String toString() + { + return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{da=" + da + '}'; + } + } private static final Logger logger = LoggerFactory.getLogger(DataList.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5b6e4281/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java index fd9cebc..4add008 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java @@ -36,7 +36,7 @@ public interface DataListener /** */ - public void addedData(); + public boolean addedData(); /** * http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5b6e4281/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java index 939d0c1..6ba7b64 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java @@ -99,17 +99,8 @@ public class FastDataList extends DataList last.writingOffset = writeOffset; - autoFlushExecutor.submit(new Runnable() - { - @Override - public void run() - { - for (DataListener dl : all_listeners) { - dl.addedData(); - } - } + notifyListeners(); - }); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5b6e4281/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index 40a8207..f867d69 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -73,8 +73,7 @@ public class LogicalNode implements DataListener if (iterator instanceof DataListIterator) { this.iterator = (DataListIterator)iterator; - } - else { + } else { throw new IllegalArgumentException("iterator does not belong to DataListIterator class"); } @@ -195,12 +194,12 @@ public class LogicalNode implements DataListener case MessageType.BEGIN_WINDOW_VALUE: tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset); logger.debug("{}->{} condition {} =? {}", - new Object[] { - upstream, - group, - Codec.getStringWindowId(baseSeconds | tuple.getWindowId()), - Codec.getStringWindowId(skipWindowId) - }); + new Object[] { + upstream, + group, + Codec.getStringWindowId(baseSeconds | tuple.getWindowId()), + Codec.getStringWindowId(skipWindowId) + }); if ((baseSeconds | tuple.getWindowId()) > skipWindowId) { logger.debug("caught up {}->{} skipping {} payload tuples", upstream, group, skippedPayloadTuples); ready = GiveAll.getInstance().distribute(physicalNodes, data); @@ -219,8 +218,7 @@ public class LogicalNode implements DataListener logger.debug("Message {} was not distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]), physicalNodes); } } - } - catch (InterruptedException ie) { + } catch (InterruptedException ie) { throw new RuntimeException(ie); } @@ -232,9 +230,8 @@ public class LogicalNode implements DataListener logger.debug("Exiting catch up because caughtup = {}", caughtup); } - @SuppressWarnings("fallthrough") @Override - public void addedData() + public boolean addedData() { if (isReady()) { if (caughtup) { @@ -257,6 +254,8 @@ public class LogicalNode implements DataListener case MessageType.RESET_WINDOW_VALUE: Tuple resetWindow = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset); baseSeconds = (long)resetWindow.getBaseSeconds() << 32; + ready = GiveAll.getInstance().distribute(physicalNodes, data); + break; default: //logger.debug("sending data of type {}", MessageType.valueOf(data.buffer[data.dataOffset])); @@ -264,8 +263,7 @@ public class LogicalNode implements DataListener break; } } - } - else { + } else { while (ready && iterator.hasNext()) { SerializedData data = iterator.next(); switch (data.buffer[data.dataOffset]) { @@ -287,6 +285,8 @@ public class LogicalNode implements DataListener case MessageType.RESET_WINDOW_VALUE: tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset); baseSeconds = (long)tuple.getBaseSeconds() << 32; + ready = GiveAll.getInstance().distribute(physicalNodes, data); + break; default: ready = GiveAll.getInstance().distribute(physicalNodes, data); @@ -294,15 +294,14 @@ public class LogicalNode implements DataListener } } } - } - catch (InterruptedException ie) { + } catch (InterruptedException ie) { throw new RuntimeException(ie); } - } - else { + } else { catchUp(); } } + return !ready; } /** @@ -345,7 +344,7 @@ public class LogicalNode implements DataListener @Override public String toString() { - return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions + '}'; + return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions + ", iterator=" + iterator + '}'; } private static final Logger logger = LoggerFactory.getLogger(LogicalNode.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/5b6e4281/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 9f31e02..cd45738 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -29,7 +29,13 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map.Entry; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +43,12 @@ import org.slf4j.LoggerFactory; import com.datatorrent.bufferserver.internal.DataList; import com.datatorrent.bufferserver.internal.FastDataList; import com.datatorrent.bufferserver.internal.LogicalNode; -import com.datatorrent.bufferserver.packet.*; +import com.datatorrent.bufferserver.packet.PayloadTuple; +import com.datatorrent.bufferserver.packet.PublishRequestTuple; +import com.datatorrent.bufferserver.packet.PurgeRequestTuple; +import com.datatorrent.bufferserver.packet.ResetRequestTuple; +import com.datatorrent.bufferserver.packet.SubscribeRequestTuple; +import com.datatorrent.bufferserver.packet.Tuple; import com.datatorrent.bufferserver.storage.Storage; import com.datatorrent.common.util.NameableThreadFactory; import com.datatorrent.netlet.AbstractLengthPrependerClient; @@ -100,16 +111,15 @@ public class Server implements ServerListener @Override public void unregistered(SelectionKey key) { - serverHelperExecutor.shutdown(); - storageHelperExecutor.shutdown(); - try { - serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); - } - catch (InterruptedException ex) { - logger.debug("Executor Termination", ex); - } - logger.info("Server stopped listening at {}", address); - } + serverHelperExecutor.shutdown(); + storageHelperExecutor.shutdown(); + try { + serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + logger.debug("Executor Termination", ex); + } + logger.info("Server stopped listening at {}", address); + } public synchronized InetSocketAddress run(EventLoop eventloop) { @@ -117,8 +127,7 @@ public class Server implements ServerListener while (address == null) { try { wait(20); - } - catch (InterruptedException ex) { + } catch (InterruptedException ex) { throw new RuntimeException(ex); } } @@ -142,8 +151,7 @@ public class Server implements ServerListener int port; if (args.length > 0) { port = Integer.parseInt(args[0]); - } - else { + } else { port = 0; } @@ -173,8 +181,7 @@ public class Server implements ServerListener byte[] message; if (dl == null) { message = ("Invalid identifier '" + request.getIdentifier() + "'").getBytes(); - } - else { + } else { dl.purge(request.getBaseSeconds(), request.getWindowId()); message = ("Request sent for processing: " + request).getBytes(); } @@ -201,8 +208,7 @@ public class Server implements ServerListener byte[] message; if (dl == null) { message = ("Invalid identifier '" + request.getIdentifier() + "'").getBytes(); - } - else { + } else { AbstractLengthPrependerClient channel = publisherChannels.remove(request.getIdentifier()); if (channel != null) { eventloop.disconnect(channel); @@ -252,8 +258,7 @@ public class Server implements ServerListener ln = subscriberGroups.get(type); ln.boot(eventloop); ln.addConnection(connection); - } - else { + } else { /* * if there is already a datalist registered for the type in which this client is interested, * then get a iterator on the data items of that data list. If the datalist is not registered, @@ -263,8 +268,7 @@ public class Server implements ServerListener if (publisherBuffers.containsKey(upstream_identifier)) { dl = publisherBuffers.get(upstream_identifier); //logger.debug("old list = {}", dl); - } - else { + } else { dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : new DataList(upstream_identifier, blockSize, numberOfCacheBlocks); publisherBuffers.put(upstream_identifier, dl); //logger.debug("new list = {}", dl); @@ -315,12 +319,10 @@ public class Server implements ServerListener dl = publisherBuffers.get(identifier); try { dl.rewind(request.getBaseSeconds(), request.getWindowId()); - } - catch (IOException ie) { + } catch (IOException ie) { throw new RuntimeException(ie); } - } - else { + } else { dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(identifier, blockSize, numberOfCacheBlocks) : new DataList(identifier, blockSize, numberOfCacheBlocks); publisherBuffers.put(identifier, dl); } @@ -422,8 +424,7 @@ public class Server implements ServerListener } }; - } - else { + } else { publisher = new Publisher(dl, (long)request.getBaseSeconds() << 32 | request.getWindowId()); } @@ -457,8 +458,7 @@ public class Server implements ServerListener // } if (subscriberRequest.getVersion().equals(Tuple.FAST_VERSION)) { subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), subscriberRequest.getPartitions(), bufferSize); - } - else { + } else { subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), subscriberRequest.getPartitions(), bufferSize) { @Override @@ -494,8 +494,7 @@ public class Server implements ServerListener logger.info("Received purge request: {}", request); try { handlePurgeRequest((PurgeRequestTuple)request, this); - } - catch (IOException io) { + } catch (IOException io) { throw new RuntimeException(io); } break; @@ -504,8 +503,7 @@ public class Server implements ServerListener logger.info("Received reset all request: {}", request); try { handleResetRequest((ResetRequestTuple)request, this); - } - catch (IOException io) { + } catch (IOException io) { throw new RuntimeException(io); } break; @@ -636,9 +634,13 @@ public class Server implements ServerListener { final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_READ) == 0) { - logger.debug("Resuming read on key {} with attachment {}", key, key.attachment()); - read(0); - key.interestOps(interestOps | SelectionKey.OP_READ); + if (readExt(0)) { + logger.debug("Resuming read on key {} with attachment {}", key, key.attachment()); + key.interestOps(interestOps | SelectionKey.OP_READ); + } else { + logger.debug("Keeping read on key {} with attachment {} suspended. ", key, key.attachment(), datalist); + datalist.notifyListeners(); + } } } }); @@ -648,6 +650,11 @@ public class Server implements ServerListener @Override public void read(int len) { + readExt(len); + } + + private boolean readExt(int len) + { //logger.debug("read {} bytes", len); writeOffset += len; do { @@ -664,18 +671,20 @@ public class Server implements ServerListener * new byteBuffer and start as if we always had full room but not enough data. */ if (!switchToNewBufferOrSuspendRead(buffer, readOffset)) { - return; + return false; } } - } - else if (dirty) { + } else if (dirty) { dirty = false; datalist.flush(writeOffset); } - return; + return true; case 0: continue; + + default: + break; } } @@ -683,8 +692,7 @@ public class Server implements ServerListener onMessage(buffer, readOffset, size); readOffset += size; size = 0; - } - else { + } else { if (writeOffset == buffer.length) { dirty = false; datalist.flush(writeOffset); @@ -694,14 +702,14 @@ public class Server implements ServerListener if (!switchToNewBufferOrSuspendRead(buffer, readOffset - VarInt.getSize(size))) { readOffset -= VarInt.getSize(size); size = 0; - return; + return false; } size = 0; } else if (dirty) { dirty = false; datalist.flush(writeOffset); } - return; + return true; } } while (true); @@ -751,8 +759,7 @@ public class Server implements ServerListener if (cce instanceof RejectedExecutionException && serverHelperExecutor.isTerminated()) { logger.warn("Terminated Executor Exception for {}.", this, cce); el.disconnect(this); - } - else { + } else { super.handleException(cce, el); } } @@ -836,8 +843,7 @@ public class Server implements ServerListener if (len < remainingCapacity) { remainingCapacity = len; byteBuffer.position(writeOffset + remainingCapacity); - } - else { + } else { byteBuffer.position(buffer.length); } System.arraycopy(array, offset, buffer, writeOffset, remainingCapacity);