flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
Date Tue, 28 Feb 2017 09:40:45 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887668#comment-15887668
] 

ASF GitHub Bot commented on FLINK-3679:
---------------------------------------

Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    @StephanEwen and I just had an offline discussion about the change, and we came up with
the following thoughts:
    
    Using an `ArrayList` for buffering elements is an "anti-pattern" in Flink, because it
is not a robust solution. Users could theoretically run into the size limit of an array list,
and unnesting large messages (in multiple threads in the Kafka 0.8 case) can put pressure
on the GC. We think that we should try to avoid that approach if possible.
    
    Alternative approaches we considered (ordered by preference):
    - Define the DeserializationSchema so that users can return `null` if the user doesn't
want to emit a record.
    This approach would not change the current approach, and is pretty minimal. Of course,
it would not allow for the "unnesting" use case, where you want to emit multiple records from
one Kafka message. Users would need to deserialize into a nested structure and use a flatMap
afterwards to do the un-nesting.
    - Move the deserialization into the checkpoint lock. This would allow us to collect elements
into our internal collector from the user collector while still preserving exactly once semantics.
    This change would probably be a bit more involved code-wise, as we need to rearrange some
parts (maybe moving the deserialization schema instance into the emitRecord() method, change
of some method signatures).
    A downside of this approach would be that the Kafka 0.8 consumer threads would deserialize
records in a sequential order (since only one consumer thread can hold the lock at a time).
For Kafka 0.9 this is already the case. I think we can live with that, because the majority
of users moved away from kafka 0.8 by now.
    - Use the `ArrayList` approach. Users would potentially run into issues and we would loose
some of Flink's robustness.
    
    @jgrier since you've opened the original JIRA back then, what's your take on the discussion?
How bad would it be for users to just allow the `null` or record approach? (Other opinions
are of course also appreciated)



> DeserializationSchema should handle zero or more outputs for every input
> ------------------------------------------------------------------------
>
>                 Key: FLINK-3679
>                 URL: https://issues.apache.org/jira/browse/FLINK-3679
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, Kafka Connector
>            Reporter: Jamie Grier
>            Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think should be
improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one mapping between
input and outputs.  In reality there are scenarios where one input message (say from Kafka)
might actually map to zero or more logical elements in the pipeline.
> Particularly important here is the case where you receive a message from a source (such
as Kafka) and say the raw bytes don't deserialize properly.  Right now the only recourse is
to throw IOException and therefore fail the job.  
> This is definitely not good since bad data is a reality and failing the job is not the
right option.  If the job fails we'll just end up replaying the bad data and the whole thing
will start again.
> Instead in this case it would be best if the user could just return the empty set.
> The other case is where one input message should logically be multiple output messages.
 This case is probably less important since there are other ways to do this but in general
it might be good to make the DeserializationSchema.deserialize() method return a collection
rather than a single element.
> Maybe we need to support a DeserializationSchema variant that has semantics more like
that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message