flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felix Wollschläger (JIRA) <j...@apache.org>
Subject [jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet
Date Thu, 07 Mar 2019 14:14:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16786807#comment-16786807
] 

Felix Wollschläger commented on FLINK-11774:
--------------------------------------------

I encountered the same exception when using a custom type as a key. It seems like it only
occurs when restoring from a savepoint or checkpoint.

 

I already encountered it back with Flink 1.3.2 (so it doesn't seem to be a new bug) and kept
the stacktrace. If it helps, this was the stacktrace in Flink 1.3.2:
{code:java}
/**
 * Causes the job to throw:
 * java.lang.IllegalArgumentException: Key Group 75 does not belong to the local range.
 *     at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
 *     at org.apache.flink.streaming.api.operators.HeapInternalTimerService.getIndexForKeyGroup(HeapInternalTimerService.java:409)
 *     at org.apache.flink.streaming.api.operators.HeapInternalTimerService.getEventTimeTimerSetForKeyGroup(HeapInternalTimerService.java:362)
 *     at org.apache.flink.streaming.api.operators.HeapInternalTimerService.getEventTimeTimerSetForTimer(HeapInternalTimerService.java:352)
 *     at org.apache.flink.streaming.api.operators.HeapInternalTimerService.registerEventTimeTimer(HeapInternalTimerService.java:226)
 *     at org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:54)
 *     at SomeProcessFunction.processElement(SomeProcessFunction.java:115)
 *     at SomeProcessFunction.processElement(SomeProcessFunction.java:35)
 *     at SomeAbstractProcessFunction.processElement(SomeAbstractProcessFunction.java:26)
 *     at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
 *     at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
 *     at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
 *     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
 *     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 *     at java.lang.Thread.run(Thread.java:748)
 */{code}
Back then I simply changed my key to a String but it would be nice if this would be fixed
for future versions.

> IllegalArgumentException in HeapPriorityQueueSet
> ------------------------------------------------
>
>                 Key: FLINK-11774
>                 URL: https://issues.apache.org/jira/browse/FLINK-11774
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.7.2
>         Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>            Reporter: Kirill Vainer
>            Priority: Major
>         Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
>         at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>         at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
>         at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>         at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
>         at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
>         at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
>         at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
>         at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
>         at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(2);
>         env.fromElements(1, 2)
>             .map(Aggregate::new)
>             .keyBy(Aggregate::getKey)
>             .timeWindow(Time.seconds(2))
>             .reduce(Aggregate::reduce)
>             .addSink(new CollectSink());
>         env.execute();
>     }
>     private static class Aggregate {
>         private Key key = new Key();
>         public Aggregate(long number) {
>         }
>         public static Aggregate reduce(Aggregate a, Aggregate b) {
>             return new Aggregate(0);
>         }
>         public Key getKey() {
>             return key;
>         }
>     }
>     public static class Key {
>     }
>     private static class CollectSink implements SinkFunction<Aggregate> {
>         private static final long serialVersionUID = 1;
>         @SuppressWarnings("rawtypes")
>         @Override
>         public void invoke(Aggregate value, Context ctx) throws Exception {
>         }
>     }
> }
> {code}
> Attached is the project that can be executed with {{./gradlew run}} showing the problem,
or you can run the attached {{flink-bug-dist.zip}} which is prepackaged with the dependencies.
> Thanks in advance



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message