flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2608) Arrays.asList(..) does not work with CollectionInputFormat
Date Wed, 12 Oct 2016 13:57:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15568798#comment-15568798
] 

ASF GitHub Bot commented on FLINK-2608:
---------------------------------------

Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2623
  
    As far as I read it, Kryo 3.x is not strictly serialization compatible with 2.x, hence
the major version number bump.
    
    If the interfaces are still stable, then it should be fine to bump the chill dependency
version, exclude any kryo dependency, and add our own 2.x kryo dependency. I would prefer
that approach.


> Arrays.asList(..) does not work with CollectionInputFormat
> ----------------------------------------------------------
>
>                 Key: FLINK-2608
>                 URL: https://issues.apache.org/jira/browse/FLINK-2608
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>    Affects Versions: 0.9, 0.10.0
>            Reporter: Maximilian Michels
>            Priority: Minor
>             Fix For: 1.0.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the serialization/deserialization
fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>     DataSet<String> text = env.fromElements(
>                 "Who's there?",
>                 "I think I hear them. Stand, ho! Who's there?");
>         // DOES NOT WORK
>         List<Integer> elements = Arrays.asList(0, 0, 0);
>         // The following works:
>         //List<Integer> elements = new ArrayList<>(new int[] {0,0,0});
>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>         DataSet<Tuple2<String, Integer>> wordCounts = text
>                 .flatMap(new LineSplitter())
>                 .withBroadcastSet(set, "set")
>                 .groupBy(0)
>                 .sum(1);
>         wordCounts.print();
>     }
>     public static class LineSplitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
>         @Override
>         public void flatMap(String line, Collector<Tuple2<String,
> Integer>> out) {
>             for (String word : line.split(" ")) {
>                 out.collect(new Tuple2<String, Integer>(word, 1));
>             }
>         }
>     }
>     public static class TestClass implements Serializable {
>         private static final long serialVersionUID = -2932037991574118651L;
>         List<Integer> integerList;
>         public TestClass(List<Integer> integerList){
>             this.integerList=integerList;
>         }
>     }
> {code}
> {noformat}
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Cannot
initialize task 'DataSource (at main(Test.java:32) (org.apache.flink.api.java.io.CollectionInputFormat))':
Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>     at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>     at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>     at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>     at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>     at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>     at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>     at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>     at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Deserializing the InputFormat ([mytests.Test$TestClass@4d6025c5])
failed: unread block data
>     at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
>     at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>     ... 25 more
> Caused by: java.lang.IllegalStateException: unread block data
>     at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
>     at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
>     at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
>     ... 26 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message