flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3888) Custom Aggregator with Convergence can't be registered directly with DeltaIteration
Date Wed, 12 Oct 2016 19:17:21 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15569630#comment-15569630
] 

ASF GitHub Bot commented on FLINK-3888:
---------------------------------------

Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2606#discussion_r83069609
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
---
    @@ -1513,14 +1513,21 @@ private void finalizeWorksetIteration(IterationDescriptor descr)
{
     		
     		String convAggName = aggs.getConvergenceCriterionAggregatorName();
     		ConvergenceCriterion<?> convCriterion = aggs.getConvergenceCriterion();
    -		
    +
     		if (convCriterion != null || convAggName != null) {
    -			throw new CompilerException("Error: Cannot use custom convergence criterion with workset
iteration. Workset iterations have implicit convergence criterion where workset is empty.");
    +			if (convCriterion == null) {
    +				throw new CompilerException("Error: Convergence criterion aggregator set, but criterion
is null.");
    +			}
    +			if (convAggName == null) {
    +				throw new CompilerException("Error: Aggregator convergence criterion set, but aggregator
is null.");
    +			}
    +
    +			syncConfig.setConvergenceCriterion(convAggName, convCriterion);
     		}
     		
     		headConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
new LongSumAggregator());
     		syncConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
new LongSumAggregator());
    -		syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
new WorksetEmptyConvergenceCriterion());
    +		syncConfig.setDefaultConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
new WorksetEmptyConvergenceCriterion());
    --- End diff --
    
    Is it safe to use a fixed name for the aggregator? Could we not have multiple iterations
in a single job?


> Custom Aggregator with Convergence can't be registered directly with DeltaIteration
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-3888
>                 URL: https://issues.apache.org/jira/browse/FLINK-3888
>             Project: Flink
>          Issue Type: Bug
>          Components: Iterations
>            Reporter: Martin Liesenberg
>            Assignee: Vasia Kalavri
>
> Contrary to the [documentation|https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html]
the method to add an aggregator with a custom convergence criterion to a DeltaIteration is
not exposed directly to DeltaIteration, but can only be accessed via the {{aggregatorRegistry}}.
> Moreover, when registering an aggregator with a custom convergence criterion  and running
the program, the following exception appears in the logs:
> {noformat}
> Error: Cannot use custom convergence criterion with workset iteration. Workset iterations
have implicit convergence criterion where workset is empty.
> org.apache.flink.optimizer.CompilerException: Error: Cannot use custom convergence criterion
with workset iteration. Workset iterations have implicit convergence criterion where workset
is empty.
> 	at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.finalizeWorksetIteration(JobGraphGenerator.java:1518)
> 	at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:198)
> 	at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:164)
> 	at org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:76)
> 	at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:898)
> 	at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
> 	at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
> {noformat}
> The issue has been found while discussing FLINK-2926



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message