flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] rkhachatryan commented on a change in pull request #10435: [FLINK-13955][runtime] migrate ContinuousFileReaderOperator to the mailbox execution model
Date Tue, 28 Jan 2020 20:49:40 GMT
rkhachatryan commented on a change in pull request #10435: [FLINK-13955][runtime] migrate ContinuousFileReaderOperator
to the mailbox execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r372047776
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ##########
 @@ -54,39 +62,157 @@
  * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link ContinuousFileMonitoringFunction}
  * which has a parallelism of 1, this operator can have DOP > 1.
  *
- * <p>As soon as a split descriptor is received, it is put in a queue, and have another
- * thread read the actual data of the split. This architecture allows the separation of the
- * reading thread from the one emitting the checkpoint barriers, thus removing any potential
- * back-pressure.
+ * <p>This implementation uses {@link MailboxExecutor} to execute each action and states:<ol>
+ *     <li>start in {@link ReaderState#IDLE IDLE}</li>
+ *     <li>upon receiving a split add it to the queue, switch to {@link ReaderState#OPENING
OPENING} and enqueue a
+ *     {@link org.apache.flink.streaming.runtime.tasks.mailbox.Mail mail} with self as {@link
Runnable}</li>
+ *     <li>open file, switch to {@link ReaderState#READING READING}, read one record,
re-enqueue self</li>
+ *     <li>if no more records or splits available, switch back to {@link ReaderState#IDLE
IDLE}</li>
+ *     </ol>
+ *     On close:
+ *     <ol>
+ *     <li>if {@link ReaderState#IDLE IDLE} then close immediately</li>
+ *     <li>otherwise switch to {@link ReaderState#CLOSING CLOSING}, call {@link MailboxExecutor#yield()
yield} in a loop
+ *     until state is {@link ReaderState#CLOSED CLOSED}</li>
+ *     <li>{@link MailboxExecutor#yield() yield()} causes remaining records (and splits)
to be processed in the same way as above</li>
+ * </ol></p>
+ * <p>Using {@link MailboxExecutor} allows to avoid explicit synchronization. At most
one mail should be enqueued at any
+ * given time.</p>
+ * <p>Using FSM approach allows to explicitly define states and enforce {@link ReaderState#TRANSITIONS
transitions} between them.</p>
  */
 @Internal
 public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
-	implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT>
{
+	implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT>,
RunnableWithException {
 
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
 
-	private FileInputFormat<OUT> format;
-	private TypeSerializer<OUT> serializer;
+	private enum ReaderState {
+		IDLE {
+			@Override
+			public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+				LOG.warn("not processing any records in IDLE state");
+				return false;
+			}
+		},
+		/**
+		 * A message is enqueued to process split, but no split is opened.
+		 */
+		OPENING { // the split was added and message to itself was enqueued to process it
+			@Override
+			public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?> op) throws
IOException {
+				if (op.splits.isEmpty()) {
+					op.switchState(ReaderState.IDLE);
+					return false;
+				} else {
+					op.loadSplit(op.splits.poll());
+					op.switchState(ReaderState.READING);
+					return true;
+				}
+			}
+		},
+		/**
+		 * A message is enqueued to process split and its processing was started.
+		 */
+		READING {
+			@Override
+			public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+				return true;
+			}
 
-	private transient Object checkpointLock;
+			@Override
+			public void onNoMoreData(ContinuousFileReaderOperator<?> op) {
+				op.switchState(ReaderState.IDLE);
+			}
+		},
+		/**
+		 * {@link #close()} was called but unprocessed data (records and splits) remains and needs
to be processed.
+		 * {@link #close()} caller is blocked.
+		 */
+		CLOSING {
+			@Override
+			public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?> op) throws
IOException {
+				if (op.currentSplit == null && !op.splits.isEmpty()) {
+					op.loadSplit(op.splits.poll());
+				}
+				return true;
+			}
 
-	private transient SplitReader<OUT> reader;
-	private transient SourceFunction.SourceContext<OUT> readerContext;
+			@Override
+			public void onNoMoreData(ContinuousFileReaderOperator<?> op) {
+				// need one more mail to unblock possible yield() in close() method (todo: wait with
timeout in yield)
+				op.enqueueMail();
+				op.switchState(CLOSED);
+			}
+		},
+		CLOSED {
+			@Override
+			public boolean prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+				LOG.warn("not processing any records while closed");
+				return false;
+			}
+		};
+
+		private static final Set<ReaderState> ACCEPT_SPLITS = EnumSet.of(IDLE, OPENING, READING);
+		/**
+		 * Possible transition FROM each state.
+		 */
+		private static final Map<ReaderState, Set<ReaderState>> TRANSITIONS;
+		static {
+			Map<ReaderState, Set<ReaderState>> tmpTransitions = new HashMap<>();
+			tmpTransitions.put(IDLE, EnumSet.of(OPENING, CLOSED));
+			tmpTransitions.put(OPENING, EnumSet.of(READING, CLOSING));
+			tmpTransitions.put(READING, EnumSet.of(IDLE, OPENING, CLOSING));
+			tmpTransitions.put(CLOSING, EnumSet.of(CLOSED));
+			tmpTransitions.put(CLOSED, EnumSet.noneOf(ReaderState.class));
+			TRANSITIONS = new EnumMap<>(tmpTransitions);
+		}
+
+		public boolean isAcceptingSplits() {
+			return ACCEPT_SPLITS.contains(this);
+		}
+
+		public final boolean isTerminal() {
+			return this == CLOSED;
+		}
+
+		public boolean canSwitchTo(ReaderState next) {
+			return TRANSITIONS
+					.getOrDefault(this, EnumSet.noneOf(ReaderState.class))
+					.contains(next);
+		}
+
+		/**
+		 * Prepare to process new record OR split.
+		 * @return true if should read the record
+		 */
+		public abstract boolean prepareToProcessRecord(ContinuousFileReaderOperator<?> op)
throws IOException;
+
+		public void onNoMoreData(ContinuousFileReaderOperator<?> op) {
+		}
+	}
+
+	private FileInputFormat<OUT> format;
+	private TypeSerializer<OUT> serializer;
+	private OUT reusedRecord;
+	private transient SourceFunction.SourceContext<OUT> sourceContext;
+	private MailboxExecutor executor;
 
 	private transient ListState<TimestampedFileInputSplit> checkpointedState;
-	private transient List<TimestampedFileInputSplit> restoredReaderState;
+
+	private ReaderState state = ReaderState.IDLE;
+	/**
+	 * MUST only be changed via {@link #switchState(ReaderState) switchState}.
+	 */
+	private PriorityQueue<TimestampedFileInputSplit> splits;
 
 Review comment:
   The field should have been `transient` (fixed), and that's why it's initialized in `open()`.
   Thanks for pointing out.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message