flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Martin Junghanns (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-4872) Type erasure problem exclusively on cluster execution
Date Wed, 16 Nov 2016 11:44:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15670241#comment-15670241
] 

Martin Junghanns edited comment on FLINK-4872 at 11/16/16 11:44 AM:
--------------------------------------------------------------------

[~twalthr] Thanks for looking into it. The actual UDF where it crashes can be found [here|https://github.com/dbs-leipzig/gradoop/blob/master/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/matching/single/preserving/explorative/functions/BuildEmbeddingWithTiePoint.java].
If I remember correctly, I already tried the two ways of defining the output type.

The (at least for me) confusing part of this is, that it runs on local and collection execution
environments, but not on the cluster.


was (Author: mju):
[~twalthr] Thanks for looking into it. The actuacl UDF where it crashes can be found [here|https://github.com/dbs-leipzig/gradoop/blob/master/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/matching/single/preserving/explorative/functions/BuildEmbeddingWithTiePoint.java].
If I remember correctly, I already tried the two ways of defining the output type.

The (at least for me) confusing part of this is, that it runs on local and collection execution
environments, but not on the cluster.

> Type erasure problem exclusively on cluster execution
> -----------------------------------------------------
>
>                 Key: FLINK-4872
>                 URL: https://issues.apache.org/jira/browse/FLINK-4872
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.1.2
>            Reporter: Martin Junghanns
>
> The following codes runs fine on local and collection execution environment but fails
when executed on a cluster.
> {code:title=Problem.java}
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple1;
> import java.lang.reflect.Array;
> public class Problem {
>   public static class Pojo {
>   }
>   public static class Foo<T> extends Tuple1<T> {
>   }
>   public static class Bar<T> extends Tuple1<T[]> {
>   }
>   public static class UDF<T> implements MapFunction<Foo<T>, Bar<T>>
{
>     private final Class<T> clazz;
>     public UDF(Class<T> clazz) {
>       this.clazz = clazz;
>     }
>     @Override
>     public Bar<T> map(Foo<T> value) throws Exception {
>       Bar<T> bar = new Bar<>();
>       //noinspection unchecked
>       bar.f0 = (T[]) Array.newInstance(clazz, 10);
>       return bar;
>     }
>   }
>   public static void main(String[] args) throws Exception {
>     // runs in local, collection and cluster execution
>     withLong();
>     // runs in local and collection execution, fails on cluster execution
>     withPojo();
>   }
>   public static void withLong() throws Exception {
>     ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>     Foo<Long> foo = new Foo<>();
>     foo.f0 = 42L;
>     DataSet<Foo<Long>> barDataSource = env.fromElements(foo);
>     DataSet<Bar<Long>> map = barDataSource.map(new UDF<>(Long.class));
>     map.print();
>   }
>   public static void withPojo() throws Exception {
>     ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>     Foo<Pojo> foo = new Foo<>();
>     foo.f0 = new Pojo();
>     DataSet<Foo<Pojo>> barDataSource = env.fromElements(foo);
>     DataSet<Bar<Pojo>> map = barDataSource.map(new UDF<>(Pojo.class));
>     map.print();
>   }
> }
> {code}
> {code:title=ProblemTest.java}
> import org.apache.flink.test.util.MultipleProgramsTestBase;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.junit.runners.Parameterized;
> @RunWith(Parameterized.class)
> public class ProblemTest extends MultipleProgramsTestBase {
>   public ProblemTest(TestExecutionMode mode) {
>     super(mode);
>   }
>   @Test
>   public void testWithLong() throws Exception {
>     Problem.withLong();
>   }
>   @Test
>   public void testWithPOJO() throws Exception {
>     Problem.withPojo();
>   }
> }
> {code}
> Exception:
> {code}
> The return type of function 'withPojo(Problem.java:58)' could not be determined automatically,
due to type erasure. You can give type information hints by using the returns(...) method
on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable'
interface.
>     org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
>     org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
>     org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>     Problem.withPojo(Problem.java:60)
>     Problem.main(Problem.java:38) 
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message