flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7430) ContinuousFileReaderOperator swallows exceptions
Date Wed, 06 Sep 2017 09:27:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155070#comment-16155070
] 

ASF GitHub Bot commented on FLINK-7430:
---------------------------------------

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4650

    [FLINK-7430] Set StreamTask.isRunning to false after closing StreamOperators

    ## What is the purpose of the change
    
    Closing StreamOperators is still part of the StreamTask's running lifecycle,
    because operators which perform asynchronous operations usually finish their
    work when the StreamOperator is closed. Since this also entails that errors
    can occur and that a checkpointing operation is triggered, we should only set
    the StreamTask's isRunning to false after all StreamOperators have been closed.
    
    Furthermore, this commit introduces a while guard for the waiting condition in
    ContinuousFileReaderOperator#close.
    
    R @aljoscha.
    
    ## Brief change log
    
    - Set `StreamTask#isRunning` to false after all operators have been closed
    - Use `while` loop instead of `if` condition for guarding synchronization in `ContinuousFileReaderOperator#close`
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    - `StreamTaskTest#testOperatorClosingBeforeStopRunning` verifies that `StreamTask#isRunning
== true` when the close method of the `StreamOperators` is called.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
Yarn/Mesos, ZooKeeper: (no)
      - **Affects the `StreamTask` lifecycle**
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixStreamTask

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4650.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4650
    
----

----


> ContinuousFileReaderOperator swallows exceptions
> ------------------------------------------------
>
>                 Key: FLINK-7430
>                 URL: https://issues.apache.org/jira/browse/FLINK-7430
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, filesystem-connector
>    Affects Versions: 1.4.0, 1.3.2
>         Environment: - macOS 10.12.6
> - Oracle JDK 1.8.0_144
> - Flink 1.3.2
>            Reporter: Peter Ertl
>            Assignee: Till Rohrmann
>            Priority: Critical
>
> The class *ContinuousFileReaderOperator* is swallowing exceptions as the following example
demonstrates:
> {code:java}
> package org.apache.flink.streaming.examples;
> import java.io.File;
> import java.io.IOException;
> import org.apache.flink.api.common.io.OutputFormat;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> public class FormatExceptionSwallowed {
> 	public static void main(String[] args) throws Exception {
> 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 		File bla = File.createTempFile("foo", "baz");
> 		try(PrintWriter w = new PrintWriter(bla)) {
> 			w.println("one");
> 			w.println("two");
> 			w.println("three");
> 		}
> 		env.readTextFile(bla.getCanonicalPath())
> 			.writeUsingOutputFormat(new OutputFormat<String>() {
> 				@Override
> 				public void configure(final Configuration parameters) {
> 				}
> 				@Override
> 				public void open(final int taskNumber, final int numTasks) throws IOException {
> 				}
> 				@Override
> 				public void writeRecord(final String record) throws IOException {
> 					throw new IllegalArgumentException("bla");
> 				}
> 				@Override
> 				public void close() throws IOException {
> 				}
> 			});
> 		env.execute("go");
> 		
> 		// JOB TERMINATES WITH NO EXCEPTION / ERROR whatsoever ... 
> 	}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message