flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry
Date Fri, 14 Dec 2018 11:38:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16721296#comment-16721296
] 

Tzu-Li (Gordon) Tai commented on FLINK-11046:
---------------------------------------------

This seems a bit odd.

While concurrent requests is indeed set to 0 and therefore only a single bulk request will
be allowed to be executed and new index accumulations are blocked during the process, the
lock should have been released after the bulk request finishes and un-block the new index
addition.

After all, the {{BulkProcessor}} is supposed to be thread-safe: [http://javadoc.kyubu.de/elasticsearch/HEAD/org/elasticsearch/action/bulk/BulkProcessor.html.]

> ElasticSearch6Connector cause thread blocked when index failed with retry
> -------------------------------------------------------------------------
>
>                 Key: FLINK-11046
>                 URL: https://issues.apache.org/jira/browse/FLINK-11046
>             Project: Flink
>          Issue Type: Bug
>          Components: ElasticSearch Connector
>    Affects Versions: 1.6.2
>            Reporter: luoguohao
>            Priority: Major
>
> When i'm using es6 sink to index into es, bulk process with some exception catched,
and  i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()`
method, but things goes incorrect. The call thread stuck there, and with the thread dump,
i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer
indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that `synchronized`
is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload)
{
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the bulk process
thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
>     Runnable toRelease = () -> {};
>     boolean bulkRequestSetupSuccessful = false;
>     try {
>         listener.beforeBulk(executionId, bulkRequest);
>         semaphore.acquire();
>         toRelease = semaphore::release;
>         CountDownLatch latch = new CountDownLatch(1);
>         retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>()
{
>             @Override
>             public void onResponse(BulkResponse response) {
>                 try {
>                     listener.afterBulk(executionId, bulkRequest, response);
>                 } finally {
>                     semaphore.release();
>                     latch.countDown();
>                 }
>             }
>             @Override
>             public void onFailure(Exception e) {
>                 try {
>                     listener.afterBulk(executionId, bulkRequest, e);
>                 } finally {
>                     semaphore.release();
>                     latch.countDown();
>                 }
>             }
>         }, Settings.EMPTY);
>         bulkRequestSetupSuccessful = true;
>        if (concurrentRequests == 0) {
>            latch.await();
>         }
>     } catch (InterruptedException e) {
>         Thread.currentThread().interrupt();
>         logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.",
executionId), e);
>         listener.afterBulk(executionId, bulkRequest, e);
>     } catch (Exception e) {
>         logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request
{}.", executionId), e);
>         listener.afterBulk(executionId, bulkRequest, e);
>     } finally {
>         if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release
the semaphore
>             toRelease.run();
>         }
>     }
> }
> {code}
> As the read line i marked above, i think, that's the reason why the retry operation thread
was block, because the the bulk process thread never release the lock on `bulkprocessor`. 
and, i also trying to figure out why the field `concurrentRequests` was set to zero. And i
saw the the initialize for bulkprocessor in class `ElasticsearchSinkBase`:
> {code:java}
> protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
>  ...
>  BulkProcessor.Builder bulkProcessorBuilder =      callBridge.createBulkProcessorBuilder(client,
listener);
>  // This makes flush() blocking
>  bulkProcessorBuilder.setConcurrentRequests(0);
>  
>  ...
>  return bulkProcessorBuilder.build();
> }
> {code}
>  this field value was set to zero explicitly. So, all things seems to make sense, but
i still wonder why the retry operation is not in the same thread as the bulk process execution,
after i read the code, `bulkAsync` method might be the last puzzle.
> {code:java}
> @Override
> public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener
listener) {
>  return BulkProcessor.builder(client::bulkAsync, listener);
> }
> {code}
> So, I hope someone can help to fix this problem, or given some suggestions, and also i
can make a try to take it. 
>  Thanks a lot !



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message