flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7430) ContinuousFileReaderOperator swallows exceptions
Date Wed, 06 Sep 2017 08:26:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16154991#comment-16154991
] 

Aljoscha Krettek commented on FLINK-7430:
-----------------------------------------

I think the issue lies here: https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L280.
Both {{ContinuousFileReaderOperator}} and {{AsyncWaitOperator}} have asynchronous computation
going on and they wait in {{close()}} for those to finish. However, {{StreamTask}} will set
{{isRunning}} to {{false}} to early and therefore swallow exceptions that occur in the asynchronous
parts while waiting for {{close()}} to finish.

I think the solution is setting {{isRunning = true}} after closing all operators.

> ContinuousFileReaderOperator swallows exceptions
> ------------------------------------------------
>
>                 Key: FLINK-7430
>                 URL: https://issues.apache.org/jira/browse/FLINK-7430
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, filesystem-connector
>    Affects Versions: 1.4.0, 1.3.2
>         Environment: - macOS 10.12.6
> - Oracle JDK 1.8.0_144
> - Flink 1.3.2
>            Reporter: Peter Ertl
>            Priority: Critical
>
> The class *ContinuousFileReaderOperator* is swallowing exceptions as the following example
demonstrates:
> {code:java}
> package org.apache.flink.streaming.examples;
> import java.io.File;
> import java.io.IOException;
> import org.apache.flink.api.common.io.OutputFormat;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> public class FormatExceptionSwallowed {
> 	public static void main(String[] args) throws Exception {
> 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 		File bla = File.createTempFile("foo", "baz");
> 		try(PrintWriter w = new PrintWriter(bla)) {
> 			w.println("one");
> 			w.println("two");
> 			w.println("three");
> 		}
> 		env.readTextFile(bla.getCanonicalPath())
> 			.writeUsingOutputFormat(new OutputFormat<String>() {
> 				@Override
> 				public void configure(final Configuration parameters) {
> 				}
> 				@Override
> 				public void open(final int taskNumber, final int numTasks) throws IOException {
> 				}
> 				@Override
> 				public void writeRecord(final String record) throws IOException {
> 					throw new IllegalArgumentException("bla");
> 				}
> 				@Override
> 				public void close() throws IOException {
> 				}
> 			});
> 		env.execute("go");
> 		
> 		// JOB TERMINATES WITH NO EXCEPTION / ERROR whatsoever ... 
> 	}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message