flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] asfgit closed pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature
Date Thu, 09 Aug 2018 14:12:10 GMT
asfgit closed pull request #6379: [FLINK-9637] Add public user documentation for state TTL
feature
URL: https://github.com/apache/flink/pull/6379
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index 44a3653a61f..72fca3e27f1 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -266,6 +266,132 @@ a `ValueState`. Once the count reaches 2 it will emit the average and
clear the
 we start over from `0`. Note that this would keep a different state value for each different
input
 key if we had tuples with different values in the first field.
 
+### State time-to-live (TTL)
+
+A time-to-live (TTL) can be assigned to the keyed state of any type. 
+In this case it will expire after the configured TTL
+and its stored value will be cleaned up on the best effort basis which is discussed in details
later.
+
+The state collection types support per-entry TTLs: list elements and map entries expire independently.
+
+To use state TTL you must first build a `StateTtlConfig` object, 
+then TTL functionality can be enabled in any state descriptor passing this configuration:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+
+StateTtlConfig ttlConfig = StateTtlConfig
+    .newBuilder(Time.seconds(1))
+    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
+    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
+    .build();
+    
+ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text
state", String.class);
+stateDescriptor.enableTimeToLive(ttlConfig);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.common.state.StateTtlConfig
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.state.ValueStateDescriptor
+
+val ttlConfig = StateTtlConfig
+    .newBuilder(Time.seconds(1))
+    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
+    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
+    .build
+    
+val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
+stateDescriptor.enableTimeToLive(ttlConfig)
+{% endhighlight %}
+</div>
+</div>
+
+The configuration has several options to consider. 
+The first parameter of `newBuilder` method is mandatory, it is the time-to-live value.
+
+The update type configures when the state TTL is refreshed (default `OnCreateAndWrite`):
+
+ - `StateTtlConfig.UpdateType.OnCreateAndWrite` - only on creation and write access,
+ - `StateTtlConfig.UpdateType.OnReadAndWrite` - also on read access.
+ 
+The state visibility configures whether the expired value is returned on read access 
+if it is not cleaned up yet (default `NeverReturnExpired`):
+
+ - `StateTtlConfig.StateVisibility.NeverReturnExpired` - expired value is never returned,
+ - `StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp` - returned if still available.
+ 
+ In case of `NeverReturnExpired`, the expired state behaves as if it does not exist anymore,

+ even if it has yet to be removed. The option can be useful for the use cases 
+ where data has to become unavailable for read access strictly after TTL, 
+ e.g. application working with privacy sensitive data.
+ 
+Another option `ReturnExpiredIfNotCleanedUp` allows to return the expired state before its
cleanup.
+
+**Notes:** 
+
+- The state backends store the timestamp of last modification along with the user value,

+which means that enabling this feature increases consumption of state storage. 
+Heap state backend stores an additional Java object with a reference to the user state object

+and a primitive long value in memory. The RocksDB state backend adds 8 bytes per stored value,
list entry or map entry.
+
+- Only TTLs in reference to *processing time* are currently supported.
+
+- Trying to restore state, which was previously configured without TTL, using TTL enabled
descriptor or vice versa
+will lead to compatibility failure and `StateMigrationException`.
+
+- The TTL configuration is not part of check- or savepoints 
+but rather a way how Flink treats it in the currently running job.
+
+#### Cleanup of expired state
+
+Currently expired values are always removed when they are read out explicitly, 
+e.g. by calling `ValueState.value()`.
+
+<span class="label label-danger">Attention!</span> This means that by default
if expired state is not read, 
+it won't be removed, possibly leading to ever growing state. This might change in future
releases. 
+
+Additionally you can activate the cleanup at the moment of taking the full state snapshot
which 
+will reduce its size. The local state is not cleaned up under current implementation 
+but it will not include the removed expired state in case of restoration from the previous
snapshot.
+It can be configured in `StateTtlConfig`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.time.Time;
+
+StateTtlConfig ttlConfig = StateTtlConfig
+    .newBuilder(Time.seconds(1))
+    .cleanupFullSnapshot()
+    .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.common.state.StateTtlConfig
+import org.apache.flink.api.common.time.Time
+
+val ttlConfig = StateTtlConfig
+    .newBuilder(Time.seconds(1))
+    .cleanupFullSnapshot
+    .build
+{% endhighlight %}
+</div>
+</div>
+
+This option is not applicable for the incremental checkpointing in the RocksDB state backend.
+
+More strategies will be added in future that clean up expired state automatically in the
background.
+
 ### State in the Scala DataStream API
 
 In addition to the interface described above, the Scala API has shortcuts for stateful


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message