flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xintong Song (Jira)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot
Date Mon, 01 Jun 2020 02:51:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120728#comment-17120728
] 

Xintong Song commented on FLINK-17923:
--------------------------------------

[~sewen],

I'm still trying to understand, what is the benefit of making Python processes the responsibility
of the deployment framework.

I'm not saying this is not the right approach. I'm asking because, I have several concerns
on this approach. These concerns are probably resolvable, in one way or another. But first
I would like to understand whether what we gain worth the efforts.

*1. The resources reserved for Python UDFs (python process containers on K8s, and the additional
memory on Yarn) might be wasted in use cases without Python UDFs.* I think one of the reasons
we make RocksDB uses managed memory in FLIP-49 is that, we want the default configuration
works for all use cases while not leaving part of the memory unused. As for Python, if we
reserve resources by default, these resources will be wasted in non-python use cases. If we
don't reserve resources by default, then the default configuration does not work for python
use cases.

*2. Which component is responsible for managing lifecycles for Python Processes?* Do we consider
the python processes as part of the Flink framework? If so, the lifecycle of python processes
should be decoupled from a job's lifecycle. If the user code does something wrong that makes
the python process fail, Flink should be able to bring it back up. This could be achieved
naturally on Kubernetes, but not on Yarn. On Yarn, once a container is started the YarnResourceManager
can no longer start another process on it. We probably need to start another service in YarnTaskExecutorRunner
to start/monitor/recover/stop the python processes. That sounds similarly to just have the
TaskManager managing the Python processes.

> It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used
in the same slot  
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17923
>                 URL: https://issues.apache.org/jira/browse/FLINK-17923
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python, Runtime / State Backends
>    Affects Versions: 1.10.0, 1.11.0
>            Reporter: Dian Fu
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
>     content = "line Licensed to the Apache Software Foundation ASF under one " \
>               "line or more contributor license agreements See the NOTICE file " \
>               "line distributed with this work for additional information " \
>               "line regarding copyright ownership The ASF licenses this file " \
>               "to you under the Apache License Version the " \
>               "License you may not use this file except in compliance " \
>               "with the License"
>     t_config = TableConfig()
>     env = StreamExecutionEnvironment.get_execution_environment()
>     t_env = StreamTableEnvironment.create(env, t_config)
>     # register Results table in table environment
>     tmp_dir = tempfile.gettempdir()
>     result_path = tmp_dir + '/result'
>     if os.path.exists(result_path):
>         try:
>             if os.path.isfile(result_path):
>                 os.remove(result_path)
>             else:
>                 shutil.rmtree(result_path)
>         except OSError as e:
>             logging.error("Error removing directory: %s - %s.", e.filename, e.strerror)
>     logging.info("Results directory: %s", result_path)
>     sink_ddl = """
>         create table Results(
>             word VARCHAR,
>             `count` BIGINT
>         ) with (
>             'connector' = 'blackhole'
>         )
>         """
>     t_env.sql_update(sink_ddl)
>     @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
>     def inc(count):
>         return count + 1
>     t_env.register_function("inc", inc)
>     elements = [(word, 1) for word in content.split(" ")]
>     t_env.from_elements(elements, ["word", "count"]) \
>          .group_by("word") \
>          .select("word, count(1) as count") \
>          .select("word, inc(count) as count") \
>          .insert_into("Results")
>     t_env.execute("word_count")
> if __name__ == '__main__':
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
>     word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend
for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from any of the 1 provided
restore options.
> 	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
> 	... 9 more
> Caused by: java.io.IOException: Failed to acquire shared cache resource for RocksDB
> 	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212)
> 	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
> 	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> 	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> 	... 11 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not created
the shared memory resource of size 536870920. Not enough memory left to reserve from the slot's
managed memory.
> 	at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603)
> 	at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
> 	at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
> 	at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617)
> 	at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566)
> 	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208)
> 	... 15 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could not allocate
536870920 bytes. Only 454033416 bytes are remaining.
> 	at org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461)
> 	at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601)
> 	... 20 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message