flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai (Jira)" <j...@apache.org>
Subject [jira] [Closed] (FLINK-15769) Allow configuring offset startup positions for Stateful Functions Kafka Ingress
Date Fri, 31 Jan 2020 16:10:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-15769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Tzu-Li (Gordon) Tai closed FLINK-15769.
    Fix Version/s: statefun-1.1
       Resolution: Fixed

Merged to master via 9e0e3803d34daf8ac933310da8b62766adffe09d.

> Allow configuring offset startup positions for Stateful Functions Kafka Ingress
> -------------------------------------------------------------------------------
>                 Key: FLINK-15769
>                 URL: https://issues.apache.org/jira/browse/FLINK-15769
>             Project: Flink
>          Issue Type: New Feature
>          Components: Stateful Functions
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: statefun-1.1
>          Time Spent: 20m
>  Remaining Estimate: 0h
> It is quite typical that a user is capable of setting where to start consuming a Kafka
> Since the Stateful Functions Kafka ingress sits on top of Flink's Kafka consumer, there
is already various options to start with:
> * {{GROUP_OFFSETS}} (default): start with whatever offsets were committed to Kafka for
given {{group.id}}
> * {{LATEST}}: start from latest record in topic
> * {{EARLIEST}}: start from earliest record in topic
> * {{SPECIFIC_OFFSETS}}: provide a map of topic partition -> offset. This is particularly
important for bootstrapped state scenarios, where the user would want to start from a specific
position consistent with the state bootstrapped in their functions.
> * {{TIMESTAMP}}: start from offsets written starting from the given timestamp.
> The proposed API looks like so:
> {code}
> KafkaIngressBuilder<T> builder = KafkaIngressBuilder.forIdentifier(...)
>     .withTopic(...)
>     .withDeserializer(...)
>     .withDefaultStartPosition(KafkaIngressStartPosition.fromEarliest()/fromLatest())
>     .withSpecificStartOffsets(KafkaIngressStartOffsets.fromMap(Map)/fromTimestamp(Long))
> {code}
> The {{withDefaultStartPosition}} method is straightforward.
> The reason to separate this from another {{withSpecificStartOffsets}} method is that
there would be cases where some partition does not contain the offsets specified by {{withSpecificStartOffsets}}.
> In this case, the ingress would need to fallback to some default configuration; this
would be the {{withDefaultStartPosition}} configuration.

This message was sent by Atlassian Jira

View raw message