spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael Armbrust (JIRA)" <>
Subject [jira] [Commented] (SPARK-17812) More granular control of starting offsets (assign)
Date Thu, 13 Oct 2016 23:29:20 GMT


Michael Armbrust commented on SPARK-17812:

As far as I understand it, {{auto.offset.reset}} is conflating a few things that make it hard
for me to reason about exactly-once semantics in my query.  It is answering all of the following:
 - Where do I start when I'm creating this {{}} for the first time?
 - What do I do when a new partition is added to a topic I'm watching?
 - What do I do when the current offset is invalid because of retention?

The model of structured streaming is an append only table, where we are computing the same
answer incrementally as if you were running a batch query over all of the data in the table.
 The whole goal is to make it easy to reason about correctness and push the hard work of incremental
processing and late data management into the optimizer / query planner.  As a result, I think
we are trying to answer a different set of questions than a distributed set of consumers that
share a {{}}:
 - Should this append only table contain all of the historical data available, or do I begin
at this moment and start appending?  This is what {{startingOffsets}} answers.  I think we
should handle {{"earliest"}} (all data), {{"latest"}} (only data that arrives after now),
and a very specific point in time across partitions (probably when some other query stopped
 - When I get into a situation where data has been deleted by the retention mechanism without
me seeing it, what should I do?  Fail the query?  Or issue a warning and compute best effort
on the data available.   This is what {{failOnDataLoss}} answers.

In particular, I think the kafka method of configuration makes it confusing to do something
like, "starting now, compute some aggregation exactly once".  The documentation even points
out some of the pit falls:
bq. ... If this is set to largest, the consumer may lose some messages when the number of
partitions, for the topics it subscribes to, changes on the broker. To prevent data loss during
partition addition, set auto.offset.reset to smallest.

Really what I want here is, "begin the query at largest", but "start new partitions at smallest
(and in fact, tell me if I'm so late joining a new partition that I have already lost some

> More granular control of starting offsets (assign)
> --------------------------------------------------
>                 Key: SPARK-17812
>                 URL:
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Michael Armbrust
> Right now you can only run a Streaming Query starting from either the earliest or latests
offsets available at the moment the query is started.  Sometimes this is a lot of data.  It
would be nice to be able to do the following:
>  - seek to user specified offsets for manually specified topicpartitions

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message