flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] NicoK closed pull request #6470: [FLINK-10006][network] improve logging in BarrierBuffer: prepend owning task name
Date Mon, 13 Aug 2018 09:57:32 GMT
NicoK closed pull request #6470: [FLINK-10006][network] improve logging in BarrierBuffer: prepend
owning task name
URL: https://github.com/apache/flink/pull/6470
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 0413caa8aec..c78abb5165a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -69,6 +69,8 @@
 
 	int getNumberOfInputChannels();
 
+	String getOwningTaskName();
+
 	boolean isFinished();
 
 	void requestPartitions() throws IOException, InterruptedException;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 06e80ff531a..2e7d076f3f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -62,10 +62,10 @@
 /**
  * An input gate consumes one or more partitions of a single produced intermediate result.
  *
- * <p> Each intermediate result is partitioned over its producing parallel subtasks;
each of these
+ * <p>Each intermediate result is partitioned over its producing parallel subtasks;
each of these
  * partitions is furthermore partitioned into one or more subpartitions.
  *
- * <p> As an example, consider a map-reduce program, where the map operator produces
data and the
+ * <p>As an example, consider a map-reduce program, where the map operator produces
data and the
  * reduce operator consumes the produced data.
  *
  * <pre>{@code
@@ -74,7 +74,7 @@
  * +-----+              +---------------------+              +--------+
  * }</pre>
  *
- * <p> When deploying such a program in parallel, the intermediate result will be partitioned
over its
+ * <p>When deploying such a program in parallel, the intermediate result will be partitioned
over its
  * producing parallel subtasks; each of these partitions is furthermore partitioned into
one or more
  * subpartitions.
  *
@@ -95,7 +95,7 @@
  *               +-----------------------------------------+
  * }</pre>
  *
- * <p> In the above example, two map subtasks produce the intermediate result in parallel,
resulting
+ * <p>In the above example, two map subtasks produce the intermediate result in parallel,
resulting
  * in two partitions (Partition 1 and 2). Each of these partitions is further partitioned
into two
  * subpartitions -- one for each parallel reduce subtask.
  */
@@ -274,6 +274,11 @@ public int getNumberOfQueuedBuffers() {
 		return 0;
 	}
 
+	@Override
+	public String getOwningTaskName() {
+		return owningTaskName;
+	}
+
 	// ------------------------------------------------------------------------
 	// Setup/Life-cycle
 	// ------------------------------------------------------------------------
@@ -364,7 +369,7 @@ else if (partitionLocation.isRemote()) {
 					throw new IllegalStateException("Tried to update unknown channel with unknown channel.");
 				}
 
-				LOG.debug("Updated unknown input channel to {}.", newChannel);
+				LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel);
 
 				inputChannels.put(partitionId, newChannel);
 
@@ -393,7 +398,7 @@ public void retriggerPartitionRequest(IntermediateResultPartitionID partitionId)
 
 				checkNotNull(ch, "Unknown input channel with ID " + partitionId);
 
-				LOG.debug("Retriggering partition request {}:{}.", ch.partitionId, consumedSubpartitionIndex);
+				LOG.debug("{}: Retriggering partition request {}:{}.", owningTaskName, ch.partitionId,
consumedSubpartitionIndex);
 
 				if (ch.getClass() == RemoteInputChannel.class) {
 					final RemoteInputChannel rch = (RemoteInputChannel) ch;
@@ -432,7 +437,8 @@ public void releaseAllResources() throws IOException {
 							inputChannel.releaseAllResources();
 						}
 						catch (IOException e) {
-							LOG.warn("Error during release of channel resources: " + e.getMessage(), e);
+							LOG.warn("{}: Error during release of channel resources: {}.",
+								owningTaskName, e.getMessage(), e);
 						}
 					}
 
@@ -725,7 +731,8 @@ else if (partitionLocation.isUnknown()) {
 			inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]);
 		}
 
-		LOG.debug("Created {} input channels (local: {}, remote: {}, unknown: {}).",
+		LOG.debug("{}: Created {} input channels (local: {}, remote: {}, unknown: {}).",
+			owningTaskName,
 			inputChannels.length,
 			numLocalChannels,
 			numRemoteChannels,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 742592a93f0..d3085cb5279 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -129,6 +129,12 @@ public int getNumberOfInputChannels() {
 		return totalNumberOfInputChannels;
 	}
 
+	@Override
+	public String getOwningTaskName() {
+		// all input gates have the same owning task
+		return inputGates[0].getOwningTaskName();
+	}
+
 	@Override
 	public boolean isFinished() {
 		for (InputGate inputGate : inputGates) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 78852b82fdc..991635a9205 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -213,7 +213,7 @@ else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class)
{
 	}
 
 	private void completeBufferedSequence() throws IOException {
-		LOG.debug("Finished feeding back buffered data");
+		LOG.debug("{}: Finished feeding back buffered data.", inputGate.getOwningTaskName());
 
 		currentBuffered.cleanup();
 		currentBuffered = queuedBuffered.pollFirst();
@@ -247,8 +247,11 @@ private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex)
 			}
 			else if (barrierId > currentCheckpointId) {
 				// we did not complete the current checkpoint, another started before
-				LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint
{}. " +
-						"Skipping current checkpoint.", barrierId, currentCheckpointId);
+				LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current
checkpoint {}. " +
+						"Skipping current checkpoint.",
+					inputGate.getOwningTaskName(),
+					barrierId,
+					currentCheckpointId);
 
 				// let the task know we are not completing this
 				notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
@@ -279,8 +282,10 @@ else if (barrierId > currentCheckpointId) {
 		if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
 			// actually trigger checkpoint
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Received all barriers, triggering checkpoint {} at {}",
-						receivedBarrier.getId(), receivedBarrier.getTimestamp());
+				LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
+					inputGate.getOwningTaskName(),
+					receivedBarrier.getId(),
+					receivedBarrier.getTimestamp());
 			}
 
 			releaseBlocksAndResetBarriers();
@@ -309,7 +314,9 @@ private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier)
th
 			if (barrierId == currentCheckpointId) {
 				// cancel this alignment
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Checkpoint {} canceled, aborting alignment", barrierId);
+					LOG.debug("{}: Checkpoint {} canceled, aborting alignment.",
+						inputGate.getOwningTaskName(),
+						barrierId);
 				}
 
 				releaseBlocksAndResetBarriers();
@@ -317,8 +324,11 @@ private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier)
th
 			}
 			else if (barrierId > currentCheckpointId) {
 				// we canceled the next which also cancels the current
-				LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint
{}. " +
-						"Skipping current checkpoint.", barrierId, currentCheckpointId);
+				LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current
checkpoint {}. " +
+						"Skipping current checkpoint.",
+					inputGate.getOwningTaskName(),
+					barrierId,
+					currentCheckpointId);
 
 				// this stops the current alignment
 				releaseBlocksAndResetBarriers();
@@ -347,7 +357,9 @@ else if (barrierId > currentCheckpointId) {
 			latestAlignmentDurationNanos = 0L;
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId);
+				LOG.debug("{}: Checkpoint {} canceled, skipping alignment.",
+					inputGate.getOwningTaskName(),
+					barrierId);
 			}
 
 			notifyAbortOnCancellationBarrier(barrierId);
@@ -401,8 +413,10 @@ private void notifyAbort(long checkpointId, CheckpointDeclineException
cause) th
 	private void checkSizeLimit() throws Exception {
 		if (maxBufferedBytes > 0 && (numQueuedBytes + bufferBlocker.getBytesBlocked())
> maxBufferedBytes) {
 			// exceeded our limit - abort this checkpoint
-			LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded",
-					currentCheckpointId, maxBufferedBytes);
+			LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.",
+				inputGate.getOwningTaskName(),
+				currentCheckpointId,
+				maxBufferedBytes);
 
 			releaseBlocksAndResetBarriers();
 			notifyAbort(currentCheckpointId, new AlignmentLimitExceededException(maxBufferedBytes));
@@ -444,7 +458,9 @@ private void beginNewAlignment(long checkpointId, int channelIndex) throws
IOExc
 		startOfAlignmentTimestamp = System.nanoTime();
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Starting stream alignment for checkpoint " + checkpointId + '.');
+			LOG.debug("{}: Starting stream alignment for checkpoint {}.",
+				inputGate.getOwningTaskName(),
+				checkpointId);
 		}
 	}
 
@@ -470,7 +486,9 @@ private void onBarrier(int channelIndex) throws IOException {
 			numBarriersReceived++;
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Received barrier from channel " + channelIndex);
+				LOG.debug("{}: Received barrier from channel {}.",
+					inputGate.getOwningTaskName(),
+					channelIndex);
 			}
 		}
 		else {
@@ -483,7 +501,8 @@ private void onBarrier(int channelIndex) throws IOException {
 	 * Makes sure the just written data is the next to be consumed.
 	 */
 	private void releaseBlocksAndResetBarriers() throws IOException {
-		LOG.debug("End of stream alignment, feeding buffered data back");
+		LOG.debug("{}: End of stream alignment, feeding buffered data back.",
+			inputGate.getOwningTaskName());
 
 		for (int i = 0; i < blockedChannels.length; i++) {
 			blockedChannels[i] = false;
@@ -499,8 +518,9 @@ private void releaseBlocksAndResetBarriers() throws IOException {
 		else {
 			// uncommon case: buffered data pending
 			// push back the pending data, if we have any
-			LOG.debug("Checkpoint skipped via buffered data:" +
-					"Pushing back current alignment buffers and feeding back new alignment data first.");
+			LOG.debug("{}: Checkpoint skipped via buffered data:" +
+					"Pushing back current alignment buffers and feeding back new alignment data first.",
+				inputGate.getOwningTaskName());
 
 			// since we did not fully drain the previous sequence, we need to allocate a new buffer
for this one
 			BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources();
@@ -513,8 +533,9 @@ private void releaseBlocksAndResetBarriers() throws IOException {
 		}
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Size of buffered data: {} bytes",
-					currentBuffered == null ? 0L : currentBuffered.size());
+			LOG.debug("{}: Size of buffered data: {} bytes",
+				inputGate.getOwningTaskName(),
+				currentBuffered == null ? 0L : currentBuffered.size());
 		}
 
 		// the next barrier that comes must assume it is the first
@@ -555,7 +576,10 @@ public long getAlignmentDurationNanos() {
 
 	@Override
 	public String toString() {
-		return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d",
-				currentCheckpointId, numBarriersReceived, numClosedChannels);
+		return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d",
+			inputGate.getOwningTaskName(),
+			currentCheckpointId,
+			numBarriersReceived,
+			numClosedChannels);
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 6dd1e5ef2d6..e968101ca2d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -139,11 +139,18 @@ public boolean isNextBarrier() {
 		private int currentChannel = 0;
 		private long c = 0;
 
+		private final String owningTaskName;
+
 		public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens)
{
+			this(bufferPools, barrierGens, "TestTask");
+		}
+
+		public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens,
String owningTaskName) {
 			this.numChannels = bufferPools.length;
 			this.currentBarriers = new int[numChannels];
 			this.bufferPools = bufferPools;
 			this.barrierGens = barrierGens;
+			this.owningTaskName = owningTaskName;
 		}
 
 		@Override
@@ -151,6 +158,11 @@ public int getNumberOfInputChannels() {
 			return numChannels;
 		}
 
+		@Override
+		public String getOwningTaskName() {
+			return owningTaskName;
+		}
+
 		@Override
 		public boolean isFinished() {
 			return false;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index e62b709419f..6400a175e20 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -44,11 +44,18 @@
 
 	private int closedChannels;
 
+	private final String owningTaskName;
+
 	public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents)
{
+		this(pageSize, numChannels, bufferOrEvents, "MockTask");
+	}
+
+	public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents,
String owningTaskName) {
 		this.pageSize = pageSize;
 		this.numChannels = numChannels;
 		this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents);
 		this.closed = new boolean[numChannels];
+		this.owningTaskName = owningTaskName;
 	}
 
 	@Override
@@ -61,6 +68,11 @@ public int getNumberOfInputChannels() {
 		return numChannels;
 	}
 
+	@Override
+	public String getOwningTaskName() {
+		return owningTaskName;
+	}
+
 	@Override
 	public boolean isFinished() {
 		return bufferOrEvents.isEmpty();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message