Repository: drill Updated Branches: refs/heads/master b075bf610 -> 1a8430eac DRILL-4676: Foreman no longer uses a CountDownLatch and relies on the EventProcessor instead as part of this change Foreman.ResponseSendListener no longer calls Foreman.moveToState() as it doesn't make any difference at this point. this closes #503 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1a8430ea Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1a8430ea Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1a8430ea Branch: refs/heads/master Commit: 1a8430eac99ccaae2ef67db6904efdc7478c9f6c Parents: b075bf6 Author: adeneche Authored: Mon May 16 13:19:18 2016 -0700 Committer: adeneche Committed: Wed May 18 10:23:09 2016 -0700 ---------------------------------------------------------------------- .../org/apache/drill/common/EventProcessor.java | 81 ++++--- .../apache/drill/exec/work/foreman/Foreman.java | 240 +++++++++---------- .../drill/exec/work/foreman/QueryManager.java | 12 +- 3 files changed, 167 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/1a8430ea/common/src/main/java/org/apache/drill/common/EventProcessor.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/EventProcessor.java b/common/src/main/java/org/apache/drill/common/EventProcessor.java index 617801b..08ec55e 100644 --- a/common/src/main/java/org/apache/drill/common/EventProcessor.java +++ b/common/src/main/java/org/apache/drill/common/EventProcessor.java @@ -20,62 +20,79 @@ package org.apache.drill.common; import java.util.LinkedList; /** - * Process events serially. - * - *

Our use of listeners that deliver events directly can sometimes + * Process events serially.
+ *
+ * Our use of listeners that deliver events directly can sometimes * cause problems when events are delivered recursively in the middle of * event handling by the same object. This helper class can be used to - * serialize events in such cases. If an event is being processed, arriving - * events are queued. Once the current event handling is completed, the - * next event on the queue is processed; this continues until the event - * queue is empty. The first thread to arrive will process its own event - * and all other events that arrive during that processing. Other threads - * will just enqueue their events.

+ * serialize events in such cases.
+ *
+ * All events are queued until {@link #start()} is called. + * The thread that calls {@link #start()} will process all events in the order they + * were added until the queue is empty. Other threads will just enqueue their events.
+ * When the queue is empty, the first thread that adds an event will start processing + * the queue until it's empty again. * * @param the event class */ public abstract class EventProcessor { private final LinkedList queuedEvents = new LinkedList<>(); private volatile boolean isProcessing = false; + private volatile boolean started = false; /** - * Constructor. - */ - public EventProcessor() { - } - - /** - * Send an event to the processor. If the processor is not busy, the event - * will be processed. If busy, the event will be queued to be processed after - * any prior events are processed. + * Send an event to the processor. the event will be queued to be processed after + * any prior events are processed, once processing actually starts. * *

If an event's processing causes an exception, it will be added to any * previous exceptions as a suppressed exception. Once all the currently queued * events have been processed, a single exception will be thrown.

* * @param newEvent the new event + * + * @throws RuntimeException if any exception is thrown while events are being processed */ public void sendEvent(final T newEvent) { synchronized (queuedEvents) { - if (isProcessing) { - queuedEvents.addLast(newEvent); + queuedEvents.addLast(newEvent); + if (!started || isProcessing) { return; } isProcessing = true; } + processEvents(); + } + + /** + * Start processing events as soon as the queue isn't empty.
+ * If the queue is not empty, this method will process all events already + * in the queue and any event that will be added while the queue is being processed. + * + * @throws RuntimeException if any exception is thrown while events are being processed + */ + public void start() { + synchronized (queuedEvents) { + if (started) { + return; + } + + started = true; + isProcessing = true; + } + + processEvents(); + } + + /** + * Process all events in the queue until it's empty. + */ + private void processEvents() { @SuppressWarnings("resource") final DeferredException deferredException = new DeferredException(); - T event = newEvent; while (true) { - try { - processEvent(event); - } catch (Exception e) { - deferredException.addException(e); - } catch (AssertionError ae) { - deferredException.addException(new RuntimeException("Caught an assertion", ae)); - } + T event; synchronized (queuedEvents) { if (queuedEvents.isEmpty()) { @@ -85,6 +102,14 @@ public abstract class EventProcessor { event = queuedEvents.removeFirst(); } + + try { + processEvent(event); + } catch (Exception e) { + deferredException.addException(e); + } catch (AssertionError ae) { + deferredException.addException(new RuntimeException("Caught an assertion", ae)); + } } try { http://git-wip-us.apache.org/repos/asf/drill/blob/1a8430ea/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index e7defec..2829ac1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -134,8 +134,6 @@ public class Foreman implements Runnable { private volatile DistributedLease lease; // used to limit the number of concurrent queries - private final ExtendedLatch acceptExternalEvents = new ExtendedLatch(); // gates acceptance of external events - private final StateListener stateListener = new StateListener(); // source of external events private final ResponseSendListener responseListener = new ResponseSendListener(); private final StateSwitch stateSwitch = new StateSwitch(); private final ForemanResult foremanResult = new ForemanResult(); @@ -169,7 +167,7 @@ public class Foreman implements Runnable { queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId); queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(), - drillbitContext.getClusterCoordinator(), stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this + drillbitContext.getClusterCoordinator(), this); final OptionManager optionManager = queryContext.getOptions(); queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE); @@ -210,7 +208,7 @@ public class Foreman implements Runnable { */ public void cancel() { // Note this can be called from outside of run() on another thread, or after run() completes - stateListener.moveToState(QueryState.CANCELLATION_REQUESTED, null); + addToEventQueue(QueryState.CANCELLATION_REQUESTED, null); } /** @@ -305,7 +303,11 @@ public class Foreman implements Runnable { * would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman * to accept events. */ - acceptExternalEvents.countDown(); + try { + stateSwitch.start(); + } catch (Exception ex) { + moveToState(QueryState.FAILED, ex); + } // If we received the resume signal before fragments are setup, the first call does not actually resume the // fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume. @@ -838,126 +840,128 @@ public class Foreman implements Runnable { } } - private class StateSwitch extends EventProcessor { - public void moveToState(final QueryState newState, final Exception exception) { - sendEvent(new StateEvent(newState, exception)); - } - - @Override - protected void processEvent(final StateEvent event) { - final QueryState newState = event.newState; - final Exception exception = event.exception; - - // TODO Auto-generated method stub - logger.debug(queryIdString + ": State change requested {} --> {}", state, newState, - exception); - switch (state) { - case ENQUEUED: - switch (newState) { - case FAILED: - Preconditions.checkNotNull(exception, "exception cannot be null when new state is failed"); - recordNewState(newState); - foremanResult.setFailed(exception); - foremanResult.close(); - return; - case STARTING: - recordNewState(newState); - return; - } - break; + private void moveToState(final QueryState newState, final Exception exception) { + logger.debug(queryIdString + ": State change requested {} --> {}", state, newState, + exception); + switch (state) { + case ENQUEUED: + switch (newState) { + case FAILED: + Preconditions.checkNotNull(exception, "exception cannot be null when new state is failed"); + recordNewState(newState); + foremanResult.setFailed(exception); + foremanResult.close(); + return; case STARTING: - if (newState == QueryState.RUNNING) { - recordNewState(QueryState.RUNNING); - return; - } + recordNewState(newState); + return; + } + break; + case STARTING: + if (newState == QueryState.RUNNING) { + recordNewState(QueryState.RUNNING); + return; + } - //$FALL-THROUGH$ + //$FALL-THROUGH$ - case RUNNING: { + case RUNNING: { + /* + * For cases that cancel executing fragments, we have to record the new + * state first, because the cancellation of the local root fragment will + * cause this to be called recursively. + */ + switch (newState) { + case CANCELLATION_REQUESTED: { + assert exception == null; + recordNewState(QueryState.CANCELLATION_REQUESTED); + queryManager.cancelExecutingFragments(drillbitContext); + foremanResult.setCompleted(QueryState.CANCELED); /* - * For cases that cancel executing fragments, we have to record the new - * state first, because the cancellation of the local root fragment will - * cause this to be called recursively. + * We don't close the foremanResult until we've gotten + * acknowledgements, which happens below in the case for current state + * == CANCELLATION_REQUESTED. */ - switch (newState) { - case CANCELLATION_REQUESTED: { - assert exception == null; - recordNewState(QueryState.CANCELLATION_REQUESTED); - queryManager.cancelExecutingFragments(drillbitContext); - foremanResult.setCompleted(QueryState.CANCELED); - /* - * We don't close the foremanResult until we've gotten - * acknowledgements, which happens below in the case for current state - * == CANCELLATION_REQUESTED. - */ - return; - } + return; + } - case COMPLETED: { - assert exception == null; - recordNewState(QueryState.COMPLETED); - foremanResult.setCompleted(QueryState.COMPLETED); - foremanResult.close(); - return; - } + case COMPLETED: { + assert exception == null; + recordNewState(QueryState.COMPLETED); + foremanResult.setCompleted(QueryState.COMPLETED); + foremanResult.close(); + return; + } - case FAILED: { - assert exception != null; - recordNewState(QueryState.FAILED); - queryManager.cancelExecutingFragments(drillbitContext); - foremanResult.setFailed(exception); - foremanResult.close(); - return; - } + case FAILED: { + assert exception != null; + recordNewState(QueryState.FAILED); + queryManager.cancelExecutingFragments(drillbitContext); + foremanResult.setFailed(exception); + foremanResult.close(); + return; + } - } - break; } + break; + } - case CANCELLATION_REQUESTED: - if ((newState == QueryState.CANCELED) - || (newState == QueryState.COMPLETED) - || (newState == QueryState.FAILED)) { - - if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) { - if (newState == QueryState.FAILED) { - assert exception != null; - recordNewState(QueryState.FAILED); - foremanResult.setForceFailure(exception); - } + case CANCELLATION_REQUESTED: + if ((newState == QueryState.CANCELED) + || (newState == QueryState.COMPLETED) + || (newState == QueryState.FAILED)) { + + if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) { + if (newState == QueryState.FAILED) { + assert exception != null; + recordNewState(QueryState.FAILED); + foremanResult.setForceFailure(exception); } - /* - * These amount to a completion of the cancellation requests' cleanup; - * now we can clean up and send the result. - */ - foremanResult.close(); } - return; - - case CANCELED: - case COMPLETED: - case FAILED: - logger - .warn( - "Dropping request to move to {} state as query is already at {} state (which is terminal).", - newState, state); - return; + /* + * These amount to a completion of the cancellation requests' cleanup; + * now we can clean up and send the result. + */ + foremanResult.close(); } + return; + + case CANCELED: + case COMPLETED: + case FAILED: + logger + .warn( + "Dropping request to move to {} state as query is already at {} state (which is terminal).", + newState, state); + return; + } + + throw new IllegalStateException(String.format( + "Failure trying to change states: %s --> %s", state.name(), + newState.name())); + } + + private class StateSwitch extends EventProcessor { + public void addEvent(final QueryState newState, final Exception exception) { + sendEvent(new StateEvent(newState, exception)); + } - throw new IllegalStateException(String.format( - "Failure trying to change states: %s --> %s", state.name(), - newState.name())); + @Override + protected void processEvent(final StateEvent event) { + moveToState(event.newState, event.exception); } } /** - * Tells the foreman to move to a new state. + * Tells the foreman to move to a new state.
+ * This will be added to the end of the event queue and will be processed once the foreman is ready + * to accept external events. * * @param newState the state to move to * @param exception if not null, the exception that drove this state transition (usually a failure) */ - private void moveToState(final QueryState newState, final Exception exception) { - stateSwitch.moveToState(newState, exception); + public void addToEventQueue(final QueryState newState, final Exception exception) { + stateSwitch.addEvent(newState, exception); } private void recordNewState(final QueryState newState) { @@ -1196,13 +1200,13 @@ public class Foreman implements Runnable { @Override public void failed(final RpcException ex) { - if (latch != null) { + if (latch != null) { // this block only applies to intermediate fragments fragmentSubmitFailures.addFailure(endpoint, ex); latch.countDown(); - } else { + } else { // this block only applies to leaf fragments // since this won't be waited on, we can wait to deliver this event once the Foreman is ready logger.debug("Failure while sending fragment. Stopping query.", ex); - stateListener.moveToState(QueryState.FAILED, ex); + addToEventQueue(QueryState.FAILED, ex); } } @@ -1217,28 +1221,6 @@ public class Foreman implements Runnable { } /** - * Provides gated access to state transitions. - * - *

The StateListener waits on a latch before delivery state transitions to the Foreman. The - * latch will be tripped when the Foreman is sufficiently set up that it can receive and process - * external events from other threads. - */ - public class StateListener { - /** - * Move the Foreman to the specified new state. - * - * @param newState the state to move to - * @param ex if moving to a failure state, the exception that led to the failure; used for reporting - * to the user - */ - public void moveToState(final QueryState newState, final Exception ex) { - acceptExternalEvents.awaitUninterruptibly(); - - Foreman.this.moveToState(newState, ex); - } - } - - /** * Listens for the status of the RPC response sent to the user for the query. */ private class ResponseSendListener extends BaseRpcOutcomeListener { @@ -1246,13 +1228,11 @@ public class Foreman implements Runnable { public void failed(final RpcException ex) { logger.info("Failure while trying communicate query result to initiating client. " + "This would happen if a client is disconnected before response notice can be sent.", ex); - stateListener.moveToState(QueryState.FAILED, ex); } @Override public void interrupted(final InterruptedException e) { logger.warn("Interrupted while waiting for RPC outcome of sending final query result to initiating client."); - stateListener.moveToState(QueryState.FAILED, e); } } } http://git-wip-us.apache.org/repos/asf/drill/blob/1a8430ea/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index f4ca42b..b76fd7b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.exceptions.UserRemoteException; @@ -53,7 +52,6 @@ import org.apache.drill.exec.store.sys.PersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.apache.drill.exec.store.sys.PersistentStoreProvider; import org.apache.drill.exec.work.EndpointListener; -import org.apache.drill.exec.work.foreman.Foreman.StateListener; import com.carrotsearch.hppc.IntObjectHashMap; import com.carrotsearch.hppc.predicates.IntObjectPredicate; @@ -79,7 +77,6 @@ public class QueryManager implements AutoCloseable { .build(); private final Map nodeMap = Maps.newHashMap(); - private final StateListener stateListener; private final QueryId queryId; private final String stringQueryId; private final RunQuery runQuery; @@ -108,10 +105,9 @@ public class QueryManager implements AutoCloseable { private final AtomicInteger finishedFragments = new AtomicInteger(0); public QueryManager(final QueryId queryId, final RunQuery runQuery, final PersistentStoreProvider storeProvider, - final ClusterCoordinator coordinator, final StateListener stateListener, final Foreman foreman) { + final ClusterCoordinator coordinator, final Foreman foreman) { this.queryId = queryId; this.runQuery = runQuery; - this.stateListener = stateListener; this.foreman = foreman; stringQueryId = QueryIdHelper.getQueryId(queryId); @@ -471,7 +467,7 @@ public class QueryManager implements AutoCloseable { final int remaining = totalNodes - finishedNodes; if (remaining == 0) { // this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status - stateListener.moveToState(QueryState.COMPLETED, null); + foreman.addToEventQueue(QueryState.COMPLETED, null); } else { logger.debug("Foreman is still waiting for completion message from {} nodes containing {} fragments", remaining, this.fragmentDataSet.size() - finishedFragments.get()); @@ -494,7 +490,7 @@ public class QueryManager implements AutoCloseable { break; case FAILED: - stateListener.moveToState(QueryState.FAILED, new UserRemoteException(status.getProfile().getError())); + foreman.addToEventQueue(QueryState.FAILED, new UserRemoteException(status.getProfile().getError())); // fall-through. case FINISHED: case CANCELLED: @@ -548,7 +544,7 @@ public class QueryManager implements AutoCloseable { if (atLeastOneFailure) { logger.warn("Drillbits [{}] no longer registered in cluster. Canceling query {}", failedNodeList, QueryIdHelper.getQueryId(queryId)); - stateListener.moveToState(QueryState.FAILED, + foreman.addToEventQueue(QueryState.FAILED, new ForemanException(String.format("One more more nodes lost connectivity during query. Identified nodes were [%s].", failedNodeList))); }