flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Alexander Alexandrov (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1959) Accumulators BROKEN after Partitioning
Date Mon, 11 May 2015 11:44:38 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14537845#comment-14537845

Alexander Alexandrov commented on FLINK-1959:

I think that some communication / traversal chain gets broken in the ParitionByHash node.

You can either 

(1) try to dig through the code and see where this happens, or
(2) use an alternative to the accumulator until the issue is resolved (e.g. write the information
to a pre-defined HDFS path); 

> Accumulators BROKEN after Partitioning
> --------------------------------------
>                 Key: FLINK-1959
>                 URL: https://issues.apache.org/jira/browse/FLINK-1959
>             Project: Flink
>          Issue Type: Bug
>          Components: Examples
>    Affects Versions: 0.8.1
>            Reporter: mustafa elbehery
>            Priority: Critical
>             Fix For: 0.8.1
> while running the Accumulator example in https://github.com/Elbehery/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java,

> I tried to alter the data flow with "PartitionByHash" function before applying "Filter",
and the resulted accumulator was NULL. 
> By Debugging, I could see the accumulator in the RunTime Map. However, by retrieving
the accumulator from the JobExecutionResult object, it was NULL. 
> The line caused the problem is "file.partitionByHash(1).filter(new EmptyFieldFilter())"
instead of "file.filter(new EmptyFieldFilter())"

This message was sent by Atlassian JIRA

View raw message