spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cody Koeninger (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-17937) Clarify Kafka offset semantics for Structured Streaming
Date Fri, 14 Oct 2016 19:27:20 GMT

     [ https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Cody Koeninger updated SPARK-17937:
-----------------------------------
    Description: 
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost).   It's possible to separate this into offset
too small and offset too large, but I'm not sure it matters for us.

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to migrate from existing
kafka jobs need to jump through hoops.  Even if we never want to support it, as soon as we
take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and
handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, inaccurate Kafka
time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently unsupported.  I think
the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't
have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies *Earliest* above)
 In general, I see no reason this couldn't specify Latest as an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If startingOffsets
is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*.  Note
that this is effectively undistinguishable from new parititon during query, because partitions
may have changed in between pre-query configuration and query start, but we treat it differently,
and users in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this case yet.  Could
use the value of failOnDataLoss, but it's possible people may want to know at startup that
something was wrong, even if they're ok with earliest for a during-query out of range
#* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss.
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason this can't
be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because we're doing explicit
seeks to the latest position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on failOnDataLoss
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this couldn't be
configurable fall back to Latest
#* Offset out of range on driver:   this _probably_ doesn't happen, because we're doing explicit
seeks to the specified position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on failOnDataLoss


I've probably missed something, chime in.


  was:
Possible events for which offsets are needed:
# New partition is discovered
# Offset out of range (aka, data has been lost)

Possible sources of offsets:
# *Earliest* position in log
# *Latest* position in log
# *Fail* and kill the query
# *Checkpoint* position
# *User specified* per topicpartition
# *Kafka commit log*.  Currently unsupported.  This means users who want to migrate from existing
kafka jobs need to jump through hoops.  Even if we never want to support it, as soon as we
take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented and
handled.
# *Timestamp*.  Currently unsupported.  This could be supported with old, inaccurate Kafka
time api, or upcoming time index
# *X offsets* before or after latest / earliest position.  Currently unsupported.  I think
the semantics of this are super unclear by comparison with timestamp, given that Kafka doesn't
have a single range of offsets.

Currently allowed pre-query configuration, all "ORs" are exclusive:
# startingOffsets: *earliest* OR *latest* OR *User specified* json per topicpartition  (SPARK-17812)
# failOnDataLoss: true (which implies *Fail* above) OR false (which implies *Earliest* above)
 In general, I see no reason this couldn't specify Latest as an option.

Possible lifecycle times in which an offset-related event may happen:
# At initial query start
#* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If startingOffsets
is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*.  Note
that this is effectively undistinguishable from new parititon during query, because partitions
may have changed in between pre-query configuration and query start, but we treat it differently,
and users in this case are SOL
#* Offset out of range on driver: We don't technically have behavior for this case yet.  Could
use the value of failOnDataLoss, but it's possible people may want to know at startup that
something was wrong, even if they're ok with earliest for a during-query out of range
#* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss.
# During query
#* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason this can't
be configurable.
#* Offset out of range on driver:  this _probably_ doesn't happen, because we're doing explicit
seeks to the latest position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on failOnDataLoss
# At query restart 
#* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this couldn't be
configurable fall back to Latest
#* Offset out of range on driver:   this _probably_ doesn't happen, because we're doing explicit
seeks to the specified position
#* Offset out of range on executor:  *Fail* or *Earliest*, based on failOnDataLoss


I've probably missed something, chime in.



> Clarify Kafka offset semantics for Structured Streaming
> -------------------------------------------------------
>
>                 Key: SPARK-17937
>                 URL: https://issues.apache.org/jira/browse/SPARK-17937
>             Project: Spark
>          Issue Type: Sub-task
>            Reporter: Cody Koeninger
>
> Possible events for which offsets are needed:
> # New partition is discovered
> # Offset out of range (aka, data has been lost).   It's possible to separate this into
offset too small and offset too large, but I'm not sure it matters for us.
> Possible sources of offsets:
> # *Earliest* position in log
> # *Latest* position in log
> # *Fail* and kill the query
> # *Checkpoint* position
> # *User specified* per topicpartition
> # *Kafka commit log*.  Currently unsupported.  This means users who want to migrate from
existing kafka jobs need to jump through hoops.  Even if we never want to support it, as soon
as we take on SPARK-17815 we need to make sure Kafka commit log state is clearly documented
and handled.
> # *Timestamp*.  Currently unsupported.  This could be supported with old, inaccurate
Kafka time api, or upcoming time index
> # *X offsets* before or after latest / earliest position.  Currently unsupported.  I
think the semantics of this are super unclear by comparison with timestamp, given that Kafka
doesn't have a single range of offsets.
> Currently allowed pre-query configuration, all "ORs" are exclusive:
> # startingOffsets: *earliest* OR *latest* OR *User specified* json per topicpartition
 (SPARK-17812)
> # failOnDataLoss: true (which implies *Fail* above) OR false (which implies *Earliest*
above)  In general, I see no reason this couldn't specify Latest as an option.
> Possible lifecycle times in which an offset-related event may happen:
> # At initial query start
> #* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If startingOffsets
is *User specified* perTopicpartition, and the new partition isn't in the map, *Fail*.  Note
that this is effectively undistinguishable from new parititon during query, because partitions
may have changed in between pre-query configuration and query start, but we treat it differently,
and users in this case are SOL
> #* Offset out of range on driver: We don't technically have behavior for this case yet.
 Could use the value of failOnDataLoss, but it's possible people may want to know at startup
that something was wrong, even if they're ok with earliest for a during-query out of range
> #* Offset out of range on executor: *Fail* or *Earliest*, based on failOnDataLoss.
> # During query
> #* New partition:  *Earliest*, only.  This seems to be by fiat, I see no reason this
can't be configurable.
> #* Offset out of range on driver:  this _probably_ doesn't happen, because we're doing
explicit seeks to the latest position
> #* Offset out of range on executor:  *Fail* or *Earliest*, based on failOnDataLoss
> # At query restart 
> #* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason this couldn't
be configurable fall back to Latest
> #* Offset out of range on driver:   this _probably_ doesn't happen, because we're doing
explicit seeks to the specified position
> #* Offset out of range on executor:  *Fail* or *Earliest*, based on failOnDataLoss
> I've probably missed something, chime in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message