flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2808) Rework / Extend the StatehandleProvider
Date Thu, 08 Oct 2015 05:02:27 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14948077#comment-14948077
] 

ASF GitHub Bot commented on FLINK-2808:
---------------------------------------

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/1239

    [FLINK-2808] Rework state abstraction and clean up task / operator internals

    This pull request fixes many related/intermixed issues. It was hard to split this into
individual issues.
    
    ### Crucial bug fixes
    
      - State snapshots for memory backed state previously copied a reference into the StateHandle,
after which the streaming program continued. If the state was mutated prior to serialization
by Akka, the mutated state was checkpointed, rather than the state at the point of drawing
the snapshot.
      
      - Key/value state is checkpointed as a whole, rather than individually per key.
      
      - Memory-backed state now has a maximum size that is checked upon checkpointing. Exceeding
that size fails the checkpoint. Before, too large state simply resulted in an oversized Akka
frame that was dropped, silently letting the program run without ever completing a checkpoint.
    
    
    ### User-facing changes
    
      - The state backend is not only responsible for storing snapshots of the user state,
but they also define how exactly the key/value state is represented in the first place. This
allows us to plug in external key/value stores to store the Flink key/value state. Default
implementations store the state in memory / files.
      
      - State backend offers additional methods to checkpoint directly into streams.
      
      - One can configure arbitrary default state backends via a factory interface that creates
them from the TaskManager configuration.
      
      - Key/value state supports arbitrary types without extra checkpointer logic, but user
needs to supply type of state (via class or TypeInformation)
      
      - Removed the `OperatorState` that is non-partitioned. The only type of state remaining
through the ┬┤OperatorState` abstraction is partitioned key/value state in functions that
are applied on a KeyedStream. Consequently, the `mapWithState()` and related methods are only
available on the `KeyedStream`
      
    
    ### Internal cleanups
    
      - Checkpoint barriers are forwarded earlier, to reduce latency introduced by checkpoints.
    
      - Fewer in-memory copies when checkpointing to the file system state backend
    
      - The StreamingRuntimeContext is used purely for UDF interaction, not to hand over components
to the operators. 
    
      - The infinite reduce and aggregations work properly on key/value state, rather than
maintaining their own maps
      
      - made the OutputHandler (not OperatorChain) type safe and simpler
      
      - made clear distinction between responsibilities of StreamTasks (input/output streams,
setup of operator chain, checkpoint coordination) and operators (scope of one function and
runtime context)
      
      - clean up checkpointing logic between operator (checkpoints generic key/value state)
and UDF operators (checkpoint UDFs)
      
      - removed Configuration from operator open() method (was used in confusion with UDF
open(Configuration())

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink statebackend

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1239.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1239
    
----
commit 3d633f0d608d91cfa69455fa9a47c53bf753a677
Author: Stephan Ewen <sewen@apache.org>
Date:   2015-10-05T13:57:49Z

    [hotfix] Correct name of HDFS tests from 'org.apache.flink.tachyon' to 'org.apache.flink.hdfstests'

commit 441c089552b3045062e8620ad9d2c8411fb387a8
Author: Stephan Ewen <sewen@apache.org>
Date:   2015-10-05T13:57:04Z

    [FLINK-2808] [streaming] Refactor and extend state backend abstraction

commit 73b65e2196576b0e36730bd0c8d8d3ced56f9f4f
Author: Stephan Ewen <sewen@apache.org>
Date:   2015-10-07T11:54:05Z

    [FLINK-2808] [streaming] Integrate extended state backend abstraction with streaming state
handling

----


> Rework / Extend the StatehandleProvider
> ---------------------------------------
>
>                 Key: FLINK-2808
>                 URL: https://issues.apache.org/jira/browse/FLINK-2808
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 0.10
>
>
> I would like to make some changes (mostly additions) to the {{StateHandleProvider}}.
Ideally for the upcoming release, as it is somewhat part of the public API.
> The rational behind this is to handle in a nice and extensible way the creation of key/value
state backed by various implementations (FS, distributed KV store, local KV store with FS
backup, ...) and various checkpointing ways (full dump, append, incremental keys, ...)
> The changes would concretely be:
> 1.  There should be a default {{StateHandleProvider}} set on the execution environment.
Functions can later specify the {{StateHandleProvider}} when grabbing the {{StreamOperatorState}}
from the runtime context (plus optionally a {{Checkpointer}})
> 2.  The {{StreamOperatorState}} is created from the {{StateHandleProvider}}. That way,
a KeyValueStore state backend can create a {{StreamOperatorState}} that directly updates data
in the KV store on every access, if that is desired (and filter accesses by timestamps to
only show committed data)
> 3.  The StateHandleProvider should have methods to get an output stream that writes to
the state checkpoint directly (and returns a StateHandle upon closing). That way we can convert
and dump large state into the checkpoint without crating a full copy in memory before.
> Lastly, I would like to change some names
>   - {{StateHandleProvider}} to either {{StateBackend}}, {{StateStore}}, or {{StateProvider}}
(simpler name).
>   - {{StreamOperatorState}} to either {{State}} or {{KVState}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message