flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Anton Kalashnikov (Jira)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-22376) SequentialChannelStateReaderImpl may recycle buffer twice
Date Fri, 07 May 2021 11:57:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-22376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17340762#comment-17340762
] 

Anton Kalashnikov commented on FLINK-22376:
-------------------------------------------

In my opinion, the pattern of using the Buffer should be logically something like this:
{noformat}
Buffer buf = getOrCreate();
try {
 ....
 buf.retain();
 try {
 .....
 } finally {
 buf.recycle();
 }
} finally {
 buf.recycle();
}
{noformat}
or
{noformat}
Buffer buf = get();
try {
 ....
 list.add(buf.retain());
} finally {
 buf.recycle();
}

//otherThread/method
Buffer buf =list.get();
try {
 .....
 } finally {
 buf.recycle();
 }
{noformat}
In fact, in most cases, it indeed uses in such a way. But unfortunately when BufferBuilder
is used this pattern is broken. For example:
{noformat}
BufferBuilder buff = createBufferBuilder();
try{
 BufferConsumer consumer = buff.createBufferConsumer();
 try{
 } finally {
 consumer.recycle();
 }
} finally {
 buff.recycle();//error - this buffer is already recycled when consumer.recycle()
}
{noformat}
and
{noformat}
BufferBuilder buff = createBufferBuilder();
try{
 list.add(buff.createBufferConsumer());
} finally {
 buff.recycle();
}

BufferConsumer consumer = list.get();
try{
 //error - it is impossible to use consumer because it is already recycled in buff.recycle();
 } finally {
 consumer.recycle();//error - this buffer is already recycled when buff.recycle()
 }
{noformat}
This happens because BufferBuilder writes directly to MemorySegment missing the Buffer. But
the reference count is stored in the Buffer so it is impossible to recycle BufferBuilder correctly.

My proposal is to change the implementation of BufferBuilder in such a way that it writes
in Buffer instead of MemorySegment and during the creation of the new consumer it is just
'retain' this buffer. So in this case the Buffer will be recycled only when all consumers
and source BufferBuilder invoke the recycle.

P.S. I also don't see a lot of sense having the BufferConsumer#CachedPositionMarker. So I
would want to delete it.

> SequentialChannelStateReaderImpl may recycle buffer twice
> ---------------------------------------------------------
>
>                 Key: FLINK-22376
>                 URL: https://issues.apache.org/jira/browse/FLINK-22376
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network, Runtime / Task
>    Affects Versions: 1.13.0
>            Reporter: Roman Khachatryan
>            Priority: Critical
>             Fix For: 1.14.0, 1.13.1
>
>
> In ChannelStateChunkReader.readChunk in case of error buffer is recycled in the catch
block. However, it might already have been recycled in stateHandler.recover().
> Using minor priority, as this only affects already failing path.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message