flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9053) Exception in RecordWriter during cleanup of StreamTask with the checkpoint trigger running in parallel
Date Thu, 22 Mar 2018 16:20:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409803#comment-16409803
] 

ASF GitHub Bot commented on FLINK-9053:
---------------------------------------

GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/5748

    [FLINK-9053][runtime] only release outputs under the checkpoint lock

    ## What is the purpose of the change
    
    Releasing an operator chain's outputs will call `RecordWriter#clearBuffers()` and this
may not be run in parallel with `RecordWriter#broadcastEvent()` which the asynchronous checkpoint
barrier trigger inside `Task` may run via `StreamTask#triggerCheckpoint()`. Now, during the
cleanup of `StreamTask#invoke()`, `StreamTask`'s asynchronous services are shut down but not
those of the `Task` and also `operatorChain.releaseOutputs()` is not put under the checkpoint
lock. Therefore, the following may run in parallel:
    - `Task`'s checkpoint trigger execution
    - `operatorChain.releaseOutputs()`
    
    We may guard `operatorChain.releaseOutputs()` with the the checkpoint lock and should
be safe to do so since we already closed all of `StreamTask`'s asynchronous executors and
also disposed the operators. Hence nothing should be blocking the cleanup by holding the checkpoint
lock.
    @StephanEwen can you please have a look to verify the safety of this?
    
    ## Brief change log
    
    - add the checkpoint lock in the cleanup of `StreamTask#invoke()` around  `operatorChain.releaseOutputs()`
    
    ## Verifying this change
    
    This is a very rare race condition that was uncovered by the `RescalingITCase`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): **no**
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
      - The serializers: **no**
      - The runtime per-record code paths (performance sensitive): **no**
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
Yarn/Mesos, ZooKeeper: **yes**
      - The S3 file system connector: **no**
    
    ## Documentation
    
      - Does this pull request introduce a new feature? **no**
      - If yes, how is the feature documented? **not applicable**


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-9053

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5748.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5748
    
----
commit 9f0295aa3c02e4870b248241cb9094d14863a686
Author: Stefan Richter <s.richter@...>
Date:   2018-02-26T17:03:14Z

    [hotfix] Improved logging for task local recovery
    
    (cherry picked from commit 56c7560)

commit e9e13dec10f1a1ee57c46719d758885c4f33dcf3
Author: Stephan Ewen <sewen@...>
Date:   2018-02-27T15:53:03Z

    [hotfix] [core] Suppress unused warning config options only used in shell scripts and
doc generation.

commit a269f8519305faff153e84d729873b6f9497bd36
Author: Stephan Ewen <sewen@...>
Date:   2018-02-27T16:04:29Z

    [FLINK-8798] [core] Make force 'commons-logging' to be parent-first loaded.

commit 1d26062de130c05fdbe7701b55766b4a8d433418
Author: Xingcan Cui <xingcanc@...>
Date:   2018-02-12T10:11:36Z

    [FLINK-8538][table]Add a Kafka table source factory with JSON format support

commit db2c510fb4f171c9e9940759e5fbaf466ec74474
Author: Timo Walther <twalthr@...>
Date:   2018-02-19T12:35:45Z

    [FLINK-8538] [table] Improve unified table sources
    
    This closes #5564.

commit 23358ff87003fd6603c0ca19bc37f31944d2c494
Author: Stephan Ewen <sewen@...>
Date:   2018-02-26T15:41:24Z

    [FLINK-8791] [docs] Fix documentation about configuring dependencies

commit acf114793c708f0ab207008c25195f6f65796e5f
Author: gyao <gary@...>
Date:   2018-02-21T15:02:01Z

    [FLINK-8730][REST] JSON serialize entire SerializedThrowable
    
    Do not only serialize the serialized exception but the entire
    SerializedThrowable object. This makes it possible to throw the
    SerializedThrowable itself without deserializing it.
    
    This closes #5546.

commit 2f6cb37c775106bb684ef9c608585e7a72056460
Author: gyao <gary@...>
Date:   2018-02-27T15:58:53Z

    [FLINK-8787][flip6] Do not copy flinkConfiguration in AbstractYarnClusterDescriptor
    
    This closes #5591.

commit 51d5bc6c5151c2aed3f932f84c35da43689501ec
Author: vinoyang <vinoyang@...>
Date:   2018-02-27T06:43:52Z

    [FLINK-8792] [rest] Change MessageQueryParameter.convertStringToValue to convertValueToString
    
    This closes #5587.

commit 08e615027acd426537dc580139a61bd4082b7c3f
Author: Till Rohrmann <trohrmann@...>
Date:   2018-02-28T09:11:44Z

    [FLINK-8792] [rest] Change MessageQueryParameter#convertValueFromString to convertStringToValue

commit 302aaeb021bacf3f37cb9a3ee236304c94adbf30
Author: Timo Walther <twalthr@...>
Date:   2018-02-22T16:22:54Z

    [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant
    
    This closes #5567.

commit 59b607b0c411b7d01b97db302d0f124b28ef0d0e
Author: sihuazhou <summerleafs@...>
Date:   2018-02-26T02:54:53Z

    [FLINK-8777][checkpointing] Cleanup local state more eagerly in recovery
    
    (cherry picked from commit 296f9ff)

commit cf854ccbc6fdbf112095c471705c8799aee64a45
Author: Aljoscha Krettek <aljoscha.krettek@...>
Date:   2018-02-28T14:35:13Z

    [hotfix] Enable FILESYTEM_DEFAULT_OVERRIDE in FLIP-6 MiniClusterResource

commit 623e94459795a191703b880fcfa4f162c92ae458
Author: Stefan Richter <s.richter@...>
Date:   2018-02-28T13:25:55Z

    [FLINK-8557][checkpointing] Remove illegal characters from operator description text before
using it to construct the instance directory in RocksDB
    
    (cherry picked from commit 66474da)

commit bb459cc68f8dc4bd042b61e365e583d4e96b3e0e
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-23T10:37:37Z

    [hotfix][tests] Deduplicate code in SingleInputGateTest
    
    (cherry picked from commit 67a547a)

commit 6b7a4480ef8610df3ff21eb2811b9a0a3c58c912
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-23T11:11:14Z

    [hotfix][runtime] Remove duplicated check
    
    (cherry picked from commit 42f71f6)

commit 651462e6b22c51ce14bd9ea6db389ef6a1f38e55
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-23T10:20:21Z

    [FLINK-8760][runtime] Correctly propagate moreAvailable flag through SingleInputGate
    
    Previously if we SingleInputGate was re-eqnqueuing an input channel, isMoreAvailable
    might incorrectly return false. This might caused some dead locks.
    
    (cherry picked from commit 6c9e267)

commit 8e62f90739e2319491df983917dc7ab484de2550
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-23T10:27:54Z

    [hotfix][tests] Do not hide original exception in SuccessAfterNetworkBuffersFailureITCase
    
    (cherry picked from commit 2c2e189)

commit 8eb6a30798c09d171e3eb8019b53e677252bd5ba
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-23T10:28:20Z

    [FLINK-8694][runtime] Fix notifyDataAvailable race condition
    
    Before there was a race condition that might resulted in igonoring some notifyDataAvailable
calls.
    This fixes the problem by moving buffersAvailable handling to Supartitions and adds stress
test
    for flushAlways (without this fix this test is dead locking).
    
    (cherry picked from commit ebd39f3)

commit 61a34a691e7d5233f18ac72a1ab8fb09b53c4753
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-26T15:13:06Z

    [FLINK-8805][runtime] Optimize EvenSerializer.isEvent method
    
    For example, previously if the method was used to check for EndOfPartitionEvent
    and the Buffer contained huge custom event, the even had to be deserialized before
    performing the actual check. Now we are quickly entering the correct if/else branch
    and doing full costly deserialization only if we have to.
    
    Other calls to isEvent() then checking against EndOfPartitionEvent were not used.
    
    (cherry picked from commit 767027f)

commit d5338c4154e5de029b3b30e3ef0a0732bf7f68e7
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-27T09:39:00Z

    [FLINK-8750][runtime] Improve detection of no remaining data after EndOfPartitionEvent
    
    Because of race condition between:
      1. releasing inputChannelsWithData lock in this method and reaching this place
      2. empty data notification that re-enqueues a channel
    we can end up with moreAvailable flag set to true, while we expect no more data.
    
    This commit detects such situation, makes a correct assertion and turn off moreAvailable
flag.
    
    (cherry picked from commit b9b7416)

commit 32384ed9b00cf0e1961d355dc4080f25a2156e58
Author: Zhijiang <wangzhijiang999@...>
Date:   2018-02-22T14:41:38Z

    [FLINK-8747][bugfix] The tag of waiting for floating buffers in RemoteInputChannel should
be updated properly
    
    (cherry picked from commit 6e9e0dd)

commit f1453276095c55264f7b4097d16e2987a44b3f33
Author: Zhijiang <wangzhijiang999@...>
Date:   2018-02-23T01:55:57Z

    [hotfix] Fix package private and comments
    
    (cherry picked from commit 6165b3d)

commit 18ff2ce15bdb1e7bd246e438e47527a24559c86d
Author: Nico Kruber <nico@...>
Date:   2018-02-26T16:50:10Z

    [hotfix][network] minor improvements in UnionInputGate
    
    (cherry picked from commit 4203557)

commit 9265666517830350a4a7037029e347f33df1bea2
Author: Nico Kruber <nico@...>
Date:   2018-02-26T16:52:37Z

    [FLINK-8737][network] disallow creating a union of UnionInputGate instances
    
    Recently, the pollNextBufferOrEvent() was added but not implemented but this is
    used in getNextBufferOrEvent() and thus any UnionInputGate containing a UnionInputGate
    would have failed already. There should be no use case for wiring up inputs
    this way. Therefore, fail early when trying to construct this.
    
    (cherry picked from commit e8de538)

commit 26c8f6c2a3ff75ffb954c816a57908318a2d8099
Author: Stephan Ewen <sewen@...>
Date:   2018-02-28T11:15:30Z

    [hotfix] [tests] Fix SelfConnectionITCase
    
    The test previously did not fail on failed execution, and thus evaluated incomplete results
    from a failed execution with th expected results.
    
    This cleans up serialization warnings and uses lambdas where possible, to make the code
    more readable.

commit f60e46dafa8950d5e40cd8a3286c172ecaea6b73
Author: gyao <gary@...>
Date:   2018-02-28T12:04:19Z

    [hotfix] Add missing space to log message in ZooKeeperLeaderElectionService

commit b7247929d0745b3b83306d0c93d97faf4ece4107
Author: gyao <gary@...>
Date:   2018-02-28T12:06:00Z

    [hotfix][Javadoc] Fix typo in YARN Utils: teh -> the

commit adb3750226971f7c67a0d3069103b56e4fee1c27
Author: gyao <gary@...>
Date:   2018-02-28T12:07:04Z

    [hotfix][Javadoc] Fix typo in YarnTestBase: teh -> the

commit 94bbd564ce5214b3366cc6d299fcb99ae62a2bd8
Author: gyao <gary@...>
Date:   2018-02-28T12:08:25Z

    [hotfix][tests] Fix wrong assertEquals in YARNSessionCapacitySchedulerITCase
    
    Test swapped actual and expected arguments.
    Remove catching Throwable in test; instead propagate all exceptions.

----


> Exception in RecordWriter during cleanup of StreamTask with the checkpoint trigger running
in parallel
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-9053
>                 URL: https://issues.apache.org/jira/browse/FLINK-9053
>             Project: Flink
>          Issue Type: Bug
>          Components: Network
>    Affects Versions: 1.5.0, 1.6.0
>            Reporter: Chesnay Schepler
>            Assignee: Nico Kruber
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/356612100
> {code}
> testSavepointRescalingOutKeyedStateDerivedMaxParallelism[0](org.apache.flink.test.checkpointing.RescalingITCase)
 Time elapsed: 2.021 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.NoSuchElementException: No value present
> 	at java.util.Optional.get(Optional.java:135)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.closeBufferBuilder(RecordWriter.java:218)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.clearBuffers(RecordWriter.java:178)
> 	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.close(StreamRecordWriter.java:99)
> 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.close(RecordWriterOutput.java:161)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.releaseOutputs(OperatorChain.java:239)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:402)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message