flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8365) Relax List type in HeapListState and HeapKeyedStateBackend
Date Mon, 22 Jan 2018 10:55:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334144#comment-16334144
] 

ASF GitHub Bot commented on FLINK-8365:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5326#discussion_r162903235
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
---
    @@ -128,27 +128,33 @@ public void update(List<V> values) throws Exception {
     
     		if (values != null && !values.isEmpty()) {
     			final N namespace = currentNamespace;
    -			final StateTable<K, N, ArrayList<V>> map = stateTable;
    +			final StateTable<K, N, List<V>> map = stateTable;
    +			List<V> list = map.get(namespace);
     
    -			map.put(namespace, new ArrayList<>(values));
    +			if (list == null) {
    +				list = new ArrayList<>(values);
    +				map.put(namespace, list);
    +			} else {
    +				list.clear();
    +				list.addAll(values);
    +			}
     		}
     	}
     
     	@Override
     	public void addAll(List<V> values) throws Exception {
     		if (values != null && !values.isEmpty()) {
     			final N namespace = currentNamespace;
    -			final StateTable<K, N, ArrayList<V>> map = stateTable;
    +			final StateTable<K, N, List<V>> map = stateTable;
     
    -			ArrayList<V> list = map.get(currentNamespace);
    +			List<V> list = map.get(currentNamespace);
     
     			if (list == null) {
     				list = new ArrayList<>();
    --- End diff --
    
    We could already create a `new ArrayList<>(values.size())`


> Relax List type in HeapListState and HeapKeyedStateBackend
> ----------------------------------------------------------
>
>                 Key: FLINK-8365
>                 URL: https://issues.apache.org/jira/browse/FLINK-8365
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Major
>             Fix For: 1.5.0
>
>
> {{stateTable}} in HeapListState and {{HeapKeyedStateBackend#createListState()}} are both
strongly typed to {{ArrayList}} right now.
> As discussed with [~StephanEwen] and [~stefanrichter83@gmail.com] in https://github.com/apache/flink/pull/4963,
we may want to relax the type to {{List}}.
> Problems discovered now:
> 1. That may require changing serializer from {{ArrayListSerializer}} to {{ListSerializer}}
in the following code, and we need to discuss the pros and cons
> {code:java}
> @Override
> 	public <N, T> InternalListState<N, T> createListState(
> 			TypeSerializer<N> namespaceSerializer,
> 			ListStateDescriptor<T> stateDesc) throws Exception {
> 		// the list state does some manual mapping, because the state is typed to the generic
> 		// 'List' interface, but we want to use an implementation typed to ArrayList
> 		// using a more specialized implementation opens up runtime optimizations
> 		StateTable<K, N, ArrayList<T>> stateTable = tryRegisterStateTable(
> 				stateDesc.getName(),
> 				stateDesc.getType(),
> 				namespaceSerializer,
> 				new ArrayListSerializer<T>(stateDesc.getElementSerializer()));
> 		return new HeapListState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
> 	}
> {code}
> 2. for non-RocksDBStateBackend (AsyncFileStateBackendTest, AsyncMemoryStateBackendTest,
FileStateBackendTest, and MemoryStateBackendTest), unit tests testListState and testListStateAddUpdateAndGet
fail



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message