flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1493) Support for streaming jobs preserving global ordering of records
Date Sun, 08 Feb 2015 18:19:34 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311465#comment-14311465
] 

Matthias J. Sax commented on FLINK-1493:
----------------------------------------

Hi,
I had a look into this. From my point of view, the best way to implement it, is to provide
a MutableOrderedRecordReader in addition to the MutableRecordReader. The new reader buffers
up all received StreamRecords in seperate buffers (one for each InputChannel). The channel
information can be provided easily from the AbstractRecordReader. InputHandler can instantiace
one or the other depending on the configuration (ie, if ordering is requiered or not).

Pros:
  This design avoids any deadlocks.
Cons:
  The needed memory is consumed from the heap and each StreamRecord is eagerly deserialized.
An implementation using MemorySegments (or a BufferPool) could be added later on (limiting
memory usage including an naive load shedding approach and allowind a lazy deserialization
strategy).

Pleas give some feedback.

Two more question about the usage of generics:
  - Why is the ReaderIterator created with no generics type arguments in InputHandler.createInputIterator()?
  - Why does StreamRecord not implement IOReadableWritable (or requieres its member "streamObject"
to do so)?

> Support for streaming jobs preserving global ordering of records
> ----------------------------------------------------------------
>
>                 Key: FLINK-1493
>                 URL: https://issues.apache.org/jira/browse/FLINK-1493
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Márton Balassi
>
> Distributed streaming jobs do not give total, global ordering guarantees for records
only partial ordering is provided by the system: records travelling on the same exact route
of the physical plan are ordered, but they aren't between routes.
> It turns out that although this feature can only be implemented via "merge sorting" in
the input buffers on a timestamp field thus creating substantial latency is still desired
for a number of applications.
> Just a heads up for the implementation: the sorting introduces back pressure in the buffers
and might cause deadlocks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message