flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load
Date Wed, 01 Feb 2017 07:09:51 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15848072#comment-15848072
] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5122 at 2/1/17 7:09 AM:
--------------------------------------------------------------------

I would like to handle this issue together with FLINK-5353 with a different approach: let
the user provide a {{FailedActionRequestHandler}} that implements how to deal with an action
request that failed, ex. drop it or re-add it to the {{BulkProcessor}}.

The reason for this is that there is actually quite a variety of different reasons an action
request can fail, and for different cases, can be treated to be "temporary" differently. For
example, in FLINK-5353, malformed documents can somewhat be "temporary" if the erroneous field
is reprocessed. Instead of handling these case by case, I propose to let user implement logic
for them.

The handler will look something like this:

{code}
public interface FailedActionRequestHandler {
    boolean onFailure(ActionRequest originalRequest, Throwable failure, RequestIndexer indexer);
}
{code}

The ElasticsearchSink will still try to retry a bulk request (with backoff) for obvious temporary
errors like {{EsRejectedExecutionException}}, and will only call {{onFailure}} after the retries.
There the user can decide whether they want to re-add it to be requested through the {{RequestIndexer}}
or just drop it. The method should return {{true}} / {{false}} depending on whether they'd
like to fail the sink because of that failure.

What do you think? Sorry for being picky about how to resolve this. I think it'll be best
to find a good long-term solution, as from the current state of the ES issues I have a feeling
that things will start to get unmaintainable once new exception handling cases pop out, so
it'll be helpful to know what actual ES Flink users think of the idea.


was (Author: tzulitai):
I would like to handle this issue together with FLINK-5353 with a different approach: let
the user provide a {{FailedActionRequestHandler}} that implements how to deal with an action
request that failed, ex. drop it or re-add it to the {{BulkProcessor}}.

The reason for this is that there is actually quite a variety of different reasons an action
request can fail, and for different cases, can be treated to be "temporary" differently. For
example, in FLINK-5353, malformed documents can somewhat be "temporary" if the erroneous field
is reprocessed. Instead of handling these case by case, I propose to let user implement logic
for them.

The handler will look something like this:

{code}
public interface FailedActionRequestHandler {
    boolean onFailure(ActionRequest originalRequest, Throwable failure, RequestIndexer indexer);
}
{code}

The ElasticsearchSink will still try to retry a bulk request (with backoff) for obvious temporary
errors like {{EsRejectedExecutionException}}, and will only call {{onFailure}} after the retries.
There the user can decide whether they want to re-add it to be requested through the {{RequestIndexer}}
or just drop it. The method should return {{true}} / {{false}} depending on whether they'd
like to fail the sink because of that failure.

What do you think? Sorry for being picky about how to resolve this. I think it'll be best
to find a good long-term solution, as from the current state of the ES issues I have a feeling
that things will start to get unmaintainable, so it'll be helpful to know what actual ES Flink
users think of the idea.

> Elasticsearch Sink loses documents when cluster has high load
> -------------------------------------------------------------
>
>                 Key: FLINK-5122
>                 URL: https://issues.apache.org/jira/browse/FLINK-5122
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.2.0
>            Reporter: static-max
>            Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at least once"
semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be created and balanced.
On those errors the bulk should be tried again instead of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink
 - Failed to index document in Elasticsearch: UnavailableShardsException[[index-name][3] primary
shard is not active Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20]
requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink
 - Failed to index document in Elasticsearch: RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
nested: EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$4@727e677c
on EsThreadPoolExecutor[bulk, queue capacity = 1, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message