flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3888) Custom Aggregator with Convergence can't be registered directly with DeltaIteration
Date Wed, 12 Oct 2016 19:17:21 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15569631#comment-15569631
] 

ASF GitHub Bot commented on FLINK-3888:
---------------------------------------

Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2606#discussion_r83075316
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorConvergenceITCase.java
---
    @@ -137,42 +138,51 @@ public void testConnectedComponentsWithParametrizableConvergence()
{
     			fail(e.getMessage());
     		}
     	}
    +
    +	@Test
    +	public void testDeltaConnectedComponentsWithParametrizableConvergence() {
    +		try {
    +
    +			// name of the aggregator that checks for convergence
    +			final String UPDATED_ELEMENTS = "updated.elements.aggr";
    +
    +			// the iteration stops if less than this number of elements change value
    +			final long convergence_threshold = 3;
    +
    +			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +			DataSet<Tuple2<Long, Long>> initialSolutionSet = env.fromCollection(verticesInput);
    +			DataSet<Tuple2<Long, Long>> edges = env.fromCollection(edgesInput);
    +
    +			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration
=
    +					initialSolutionSet.iterateDelta(initialSolutionSet, 10, 0);
    +
    +			// register the convergence criterion
    +			iteration.registerAggregationConvergenceCriterion(UPDATED_ELEMENTS,
    +					new LongSumAggregator(), new UpdatedElementsConvergenceCriterion(convergence_threshold));
    +
    +			DataSet<Tuple2<Long, Long>> verticesWithNewComponents = iteration.getWorkset().join(edges).where(0).equalTo(0)
    +					.with(new NeighborWithComponentIDJoin())
    +					.groupBy(0).min(1);
    +
    +			DataSet<Tuple2<Long, Long>> updatedComponentId =
    +					verticesWithNewComponents.join(iteration.getSolutionSet()).where(0).equalTo(0)
    +							.flatMap(new MinimumIdFilter(UPDATED_ELEMENTS));
    +
    +			List<Tuple2<Long, Long>> result = iteration.closeWith(updatedComponentId,
updatedComponentId).collect();
    +			Collections.sort(result, new JavaProgramTestBase.TupleComparator<Tuple2<Long,
Long>>());
    +
    +			assertEquals(expectedResult, result);
    +		}
    +		catch (Exception e) {
    --- End diff --
    
    Catching and re-throwing exceptions looks to be unnecessary.


> Custom Aggregator with Convergence can't be registered directly with DeltaIteration
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-3888
>                 URL: https://issues.apache.org/jira/browse/FLINK-3888
>             Project: Flink
>          Issue Type: Bug
>          Components: Iterations
>            Reporter: Martin Liesenberg
>            Assignee: Vasia Kalavri
>
> Contrary to the [documentation|https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html]
the method to add an aggregator with a custom convergence criterion to a DeltaIteration is
not exposed directly to DeltaIteration, but can only be accessed via the {{aggregatorRegistry}}.
> Moreover, when registering an aggregator with a custom convergence criterion  and running
the program, the following exception appears in the logs:
> {noformat}
> Error: Cannot use custom convergence criterion with workset iteration. Workset iterations
have implicit convergence criterion where workset is empty.
> org.apache.flink.optimizer.CompilerException: Error: Cannot use custom convergence criterion
with workset iteration. Workset iterations have implicit convergence criterion where workset
is empty.
> 	at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.finalizeWorksetIteration(JobGraphGenerator.java:1518)
> 	at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:198)
> 	at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:164)
> 	at org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:76)
> 	at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:898)
> 	at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
> 	at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
> {noformat}
> The issue has been found while discussing FLINK-2926



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message