flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1085) Unnecessary failing of GroupReduceCombineDriver
Date Thu, 02 Jul 2015 06:21:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14611542#comment-14611542

ASF GitHub Bot commented on FLINK-1085:

Github user dabaitu commented on a diff in the pull request:

    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
    @@ -170,7 +171,14 @@ public void run() throws Exception {
     			// write the value again
     			if (!this.sorter.write(value)) {
    -				throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
    +				if (oversizedRecordCount == Long.MAX_VALUE) {
    +					LOG.debug("Number of oversized record has exceeded MAX Long");
    +				} else {
    +					++oversizedRecordCount;
    +					LOG.debug("Cannot write record to fresh sort buffer. Record too large. Oversized
record count: {}", oversizedRecordCount);
    +				}
    +				// simply forward the record
    +				this.output.collect((OUT)value);
    --- End diff --
    I tried to call
    I couldn't figure out how to convert this single value from type IN to type OUT besides
direct casting. Could you guys please help me? @StephanEwen @rmetzger

> Unnecessary failing of GroupReduceCombineDriver
> -----------------------------------------------
>                 Key: FLINK-1085
>                 URL: https://issues.apache.org/jira/browse/FLINK-1085
>             Project: Flink
>          Issue Type: Bug
>          Components: Local Runtime
>    Affects Versions: 0.6.1-incubating, 0.7.0-incubating
>            Reporter: Fabian Hueske
>              Labels: starter
> With a recent update (commit cbbcf7820885a8a9734ffeba637b0182a6637939) the GroupReduceCombineDriver
was changed to not use an asynchronous partial sorter. Instead, the driver fills a sort buffer
with records, sorts it, combines them, clears the buffer, and continues to fill it again.
> The GroupReduceCombineDriver fails if a record cannot be serialized into an empty sort
buffer, i.e., if the record is too large for the buffer.
> Alternatively, we should emit a WARN message for the first record that is too large and
just forward all records which do not fit into the empty sort buffer (maybe continue to count
how many records were simply forwarded and give a second WARN message with this statistic).

This message was sent by Atlassian JIRA

View raw message