lucene-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yonik Seeley (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SOLR-13431) Efficient updates with shared storage
Date Mon, 29 Apr 2019 15:15:00 GMT

     [ https://issues.apache.org/jira/browse/SOLR-13431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Yonik Seeley updated SOLR-13431:
--------------------------------
    Description: 
h2. Background & problem statement:

With shared storage support, data durability is handled by the storage layer (e.g. S3 or HDFS)
and replicas are not needed for durability. This changes the nature of how a single update
(say adding a document) must be handled. The local transaction log does not help... a node
can go down and never come back. The implication is that *a commit must be done for any updates
to be considered durable.*

The problem is also more complex than just batching updates and adding a commit at the end
of a batch. Consider indexing documents A,B,C,D followed by a commit:
 1) documents A,B sent to leader1 and indexed
 2) leader1 fails, leader2 is elected
 3) documents C,D sent to leader2 and indexed
 4) commit
 After this sequence of events, documents A,B are actually lost because a commit was not done
on leader1 before it failed.

Adding a commit for every single update would fix the problem of data loss, but would obviously
be too expensive (and each commit will be more expensive We can still do batches if we *disable
transparent failover* for a batch.
 - all updates in a batch (for a specific shard) should be indexed on the *same leader*...
any change in leadership should result in a failure at the low level instead of any transparent
failover or forwarding.
 - in the event of a failure, *all updates since the last commit must be replayed* (we can't
just retry the failure itself), or the failure will need to be bubbled up to a higher layer
to retry from the beginning.

h2. Indexing scenario 1: CSV upload

If SolrCloud is loading a large CSV file, The receiving Solr node will forward updates to
the correct leaders. This happens in the DistributedUpdateProcessor via SolrCmdDistributor,
which ends up using a ConcurrentUpdateHttp2SolrClient subclass.

The forward-to-replica use case here is quite different than the forward-to-correct-leader
(the latter has the current solr node acting much more like an external client.).  To simpliify
development, we may want to separate these cases and continue using the existing code for
forward-to-replica. 

h2. Indexing scenario 2: SolrJ bulk indexing

In this scenario, a client is trying to do a large amount of indexing and can use batches
or streaming. For this scenario, we could just require that a commit be added for each batch
and then fail a batch on any leader change. This is problematic for a couple of reasons:
 - larger batches add latency to build, hurting throughput
 - doesn't scale well - as a collection grows, the number of shards grow and the chance that
any shard leader goes down (or the shard is split) goes up. Requiring that the entire batch
(all shards) be replayed when this happens is wasteful and gets worse with collection growth.

h2. Proposed Solution: a SolrJ cloud aware streaming client
 - something like ConcurrentUpdateHttp2SolrClient that can stream and know about cloud layout
 - track when last commit happened for each shard leader
 - buffer updates per-shard since the last commit happened
 -- doesn't have to be exact... assume idempotent updates here, so overlap is fine
 -- buffering would also be triggered by the replica type of the collection (so this class
could be used for both shared storage and normal NRT replicas) 
 - a parameter would be passed that would disallow any forwarding (since we're handling buffering/failover
at this level)
 - on a failure because of a leader going down or loss of leadership, wait until a new leader
has been elected and then replay updates since the last commit
 - insert commits where necessary to prevent buffers from growing too large
 -- inserted commits should be able to proceed in parallel... we shouldn't need to block and
wait for a commit before resuming to send documents to that leader.
 -- it would be nice if there was a way we could get notified if a commit happened via some
other mechanism (like an autoCommit being triggered)
  --- assuming we can't get this, perhaps we should pass a flag that disables triggering auto-commits
for these batch updates?
 - handle splits (not only can a shard leader change, but a shard could split... buffered
updates may need to be re-slotted)
 - need to handle a leader "bounce" like a change in leadership (assuming we're skipping using
the transaction log)
 - multi-threaded - all updates to a leader regardless of thread are managed as a single update
stream
 -- this perhaps provides a way to coalesce incremental/realtime updates
 - OPTIONAL: ability to have multiple channels to a single leader?
 -- we would need to avoid reordering updates to the same ID
 -- an alternative to attempting to create more parallelism-per-shard on the client side is
to do it on the server side.

  was:
h2. Background & problem statement:

With shared storage support, data durability is handled by the storage layer (e.g. S3 or HDFS)
and replicas are not needed for durability. This changes the nature of how a single update
(say adding a document) must be handled. The local transaction log does not help... a node
can go down and never come back. The implication is that *a commit must be done for any updates
to be considered durable.*

The problem is also more complex than just batching updates and adding a commit at the end
of a batch. Consider indexing documents A,B,C,D followed by a commit:
 1) documents A,B sent to leader1 and indexed
 2) leader1 fails, leader2 is elected
 3) documents C,D sent to leader2 and indexed
 4) commit
 After this sequence of events, documents A,B are actually lost because a commit was not done
on leader1 before it failed.

Adding a commit for every single update would fix the problem of data loss, but would obviously
be too expensive (and each commit will be more expensive We can still do batches if we *disable
transparent failover* for a batch.
 - all updates in a batch (for a specific shard) should be indexed on the *same leader*...
any change in leadership should result in a failure at the low level instead of any transparent
failover or forwarding.
 - in the event of a failure, *all updates since the last commit must be replayed* (we can't
just retry the failure itself), or the failure will need to be bubbled up to a higher layer
to retry from the beginning.

h2. Indexing scenario 1: CSV upload

If SolrCloud is loading a large CSV file, The receiving Solr node will forward updates to
the correct leaders. This happens in the DistributedUpdateProcessor via SolrCmdDistributor,
which ends up using a ConcurrentUpdateHttp2SolrClient subclass.
h2. Indexing scenario 2: SolrJ bulk indexing

In this scenario, a client is trying to do a large amount of indexing and can use batches
or streaming. For this scenario, we could just require that a commit be added for each batch
and then fail a batch on any leader change. This is problematic for a couple of reasons:
 - larger batches add latency to build, hurting throughput
 - doesn't scale well - as a collection grows, the number of shards grow and the chance that
any shard leader goes down (or the shard is split) goes up. Requiring that the entire batch
(all shards) be replayed when this happens is wasteful and gets worse with collection growth.

h2. Proposed Solution: a SolrJ cloud aware streaming client
 - something like ConcurrentUpdateHttp2SolrClient that can stream and know about cloud layout
 - track when last commit happened for each shard leader
 - buffer updates per-shard since the last commit happened
 -- doesn't have to be exact... assume idempotent updates here, so overlap is fine
 -- buffering would also be triggered by the replica type of the collection (so this class
could be used for both shared storage and normal NRT replicas) 
 - a parameter would be passed that would disallow any forwarding (since we're handling buffering/failover
at this level)
 - on a failure because of a leader going down or loss of leadership, wait until a new leader
has been elected and then replay updates since the last commit
 - insert commits where necessary to prevent buffers from growing too large
 -- inserted commits should be able to proceed in parallel... we shouldn't need to block and
wait for a commit before resuming to send documents to that leader.
 -- it would be nice if there was a way we could get notified if a commit happened via some
other mechanism (like an autoCommit being triggered)
  --- assuming we can't get this, perhaps we should pass a flag that disables triggering auto-commits
for these batch updates?
 - handle splits (not only can a shard leader change, but a shard could split... buffered
updates may need to be re-slotted)
 - need to handle a leader "bounce" like a change in leadership (assuming we're skipping using
the transaction log)
 - multi-threaded - all updates to a leader regardless of thread are managed as a single update
stream
 -- this perhaps provides a way to coalesce incremental/realtime updates
 - OPTIONAL: ability to have multiple channels to a single leader?
 -- we would need to avoid reordering updates to the same ID
 -- an alternative to attempting to create more parallelism-per-shard on the client side is
to do it on the server side.


> Efficient updates with shared storage
> -------------------------------------
>
>                 Key: SOLR-13431
>                 URL: https://issues.apache.org/jira/browse/SOLR-13431
>             Project: Solr
>          Issue Type: New Feature
>      Security Level: Public(Default Security Level. Issues are Public) 
>            Reporter: Yonik Seeley
>            Priority: Major
>
> h2. Background & problem statement:
> With shared storage support, data durability is handled by the storage layer (e.g. S3
or HDFS) and replicas are not needed for durability. This changes the nature of how a single
update (say adding a document) must be handled. The local transaction log does not help...
a node can go down and never come back. The implication is that *a commit must be done for
any updates to be considered durable.*
> The problem is also more complex than just batching updates and adding a commit at the
end of a batch. Consider indexing documents A,B,C,D followed by a commit:
>  1) documents A,B sent to leader1 and indexed
>  2) leader1 fails, leader2 is elected
>  3) documents C,D sent to leader2 and indexed
>  4) commit
>  After this sequence of events, documents A,B are actually lost because a commit was
not done on leader1 before it failed.
> Adding a commit for every single update would fix the problem of data loss, but would
obviously be too expensive (and each commit will be more expensive We can still do batches
if we *disable transparent failover* for a batch.
>  - all updates in a batch (for a specific shard) should be indexed on the *same leader*...
any change in leadership should result in a failure at the low level instead of any transparent
failover or forwarding.
>  - in the event of a failure, *all updates since the last commit must be replayed* (we
can't just retry the failure itself), or the failure will need to be bubbled up to a higher
layer to retry from the beginning.
> h2. Indexing scenario 1: CSV upload
> If SolrCloud is loading a large CSV file, The receiving Solr node will forward updates
to the correct leaders. This happens in the DistributedUpdateProcessor via SolrCmdDistributor,
which ends up using a ConcurrentUpdateHttp2SolrClient subclass.
> The forward-to-replica use case here is quite different than the forward-to-correct-leader
(the latter has the current solr node acting much more like an external client.).  To simpliify
development, we may want to separate these cases and continue using the existing code for
forward-to-replica. 
> h2. Indexing scenario 2: SolrJ bulk indexing
> In this scenario, a client is trying to do a large amount of indexing and can use batches
or streaming. For this scenario, we could just require that a commit be added for each batch
and then fail a batch on any leader change. This is problematic for a couple of reasons:
>  - larger batches add latency to build, hurting throughput
>  - doesn't scale well - as a collection grows, the number of shards grow and the chance
that any shard leader goes down (or the shard is split) goes up. Requiring that the entire
batch (all shards) be replayed when this happens is wasteful and gets worse with collection
growth.
> h2. Proposed Solution: a SolrJ cloud aware streaming client
>  - something like ConcurrentUpdateHttp2SolrClient that can stream and know about cloud
layout
>  - track when last commit happened for each shard leader
>  - buffer updates per-shard since the last commit happened
>  -- doesn't have to be exact... assume idempotent updates here, so overlap is fine
>  -- buffering would also be triggered by the replica type of the collection (so this
class could be used for both shared storage and normal NRT replicas) 
>  - a parameter would be passed that would disallow any forwarding (since we're handling
buffering/failover at this level)
>  - on a failure because of a leader going down or loss of leadership, wait until a new
leader has been elected and then replay updates since the last commit
>  - insert commits where necessary to prevent buffers from growing too large
>  -- inserted commits should be able to proceed in parallel... we shouldn't need to block
and wait for a commit before resuming to send documents to that leader.
>  -- it would be nice if there was a way we could get notified if a commit happened via
some other mechanism (like an autoCommit being triggered)
>   --- assuming we can't get this, perhaps we should pass a flag that disables triggering
auto-commits for these batch updates?
>  - handle splits (not only can a shard leader change, but a shard could split... buffered
updates may need to be re-slotted)
>  - need to handle a leader "bounce" like a change in leadership (assuming we're skipping
using the transaction log)
>  - multi-threaded - all updates to a leader regardless of thread are managed as a single
update stream
>  -- this perhaps provides a way to coalesce incremental/realtime updates
>  - OPTIONAL: ability to have multiple channels to a single leader?
>  -- we would need to avoid reordering updates to the same ID
>  -- an alternative to attempting to create more parallelism-per-shard on the client side
is to do it on the server side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
For additional commands, e-mail: dev-help@lucene.apache.org


Mime
View raw message