flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "luoguohao (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry
Date Sun, 02 Dec 2018 07:20:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

luoguohao updated FLINK-11046:
------------------------------
    Description: 
When i'm using es6 sink to index into es, bulk process with some exception catched, and 
i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()`
method, but things goes incorrect. The call thread stuck there, and with the thread dump,
i saw the `bulkprocessor` object was locked by other thread. 
{code:java}
public interface ActionRequestFailureHandler extends Serializable {

 void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer
indexer) throws Throwable;

}
{code}
After i read the code implemented in the `indexer.add(action)`, i find that `synchronized`
is needed on each add operation.
{code:java}
private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) {
  ensureOpen();
  bulkRequest.add(request, payload);
  executeIfNeeded();
}
{code}
And, at i also noticed that `bulkprocessor` object would also locked in the bulk process
thread. 

the bulk process operation is in the following code:
{code:java}
public void execute(BulkRequest bulkRequest, long executionId) {
    Runnable toRelease = () -> {};
    boolean bulkRequestSetupSuccessful = false;
    try {
        listener.beforeBulk(executionId, bulkRequest);
        semaphore.acquire();
        toRelease = semaphore::release;
        CountDownLatch latch = new CountDownLatch(1);
        retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>()
{
            @Override
            public void onResponse(BulkResponse response) {
                try {
                    listener.afterBulk(executionId, bulkRequest, response);
                } finally {
                    semaphore.release();
                    latch.countDown();
                }
            }

            @Override
            public void onFailure(Exception e) {
                try {
                    listener.afterBulk(executionId, bulkRequest, e);
                } finally {
                    semaphore.release();
                    latch.countDown();
                }
            }
        }, Settings.EMPTY);
        bulkRequestSetupSuccessful = true;
        if (concurrentRequests == 0) {
            latch.await();
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.",
executionId), e);
        listener.afterBulk(executionId, bulkRequest, e);
    } catch (Exception e) {
        logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.",
executionId), e);
        listener.afterBulk(executionId, bulkRequest, e);
    } finally {
        if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release
the semaphore
            toRelease.run();
        }
    }
}
{code}
As the read line i marked above, i think, that's the reason why the retry operation thread
was block, because the the bulk process thread never release the lock on `bulkprocessor`. 
and, i also trying to figure out why the field `concurrentRequests` was set to zero. And i
saw the the initialize for bulkprocessor in class `ElasticsearchSinkBase`:
{code:java}
protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
 ...
 BulkProcessor.Builder bulkProcessorBuilder =      callBridge.createBulkProcessorBuilder(client,
listener);

 // This makes flush() blocking
 bulkProcessorBuilder.setConcurrentRequests(0);
 
 ...

 return bulkProcessorBuilder.build();
}
{code}
 this field value was set to zero explicitly. So, all things seems to make sense, but i
still wonder why the retry operation is not in the same thread as the bulk process execution,
after i read the code, `bulkAsync` method might be the last puzzle.
{code:java}
@Override
public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener
listener) {
 return BulkProcessor.builder(client::bulkAsync, listener);
}
{code}
So, I hope someone can help to fix this problem, or given some suggestions, and also i can
make a try to take it. 
 Thanks a lot !

  was:
When i'm using es6 sink to index into es, bulk process with some exception catched, and 
i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()`
method, but things goes incorrect. The call thread stuck there, and with the thread dump,
i saw the `bulkprocessor` object was locked by other thread. 
{code:java}
public interface ActionRequestFailureHandler extends Serializable {

 void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer
indexer) throws Throwable;

}
{code}
After i read the code implemented in the `indexer.add(action)`, i find that `synchronized`
is needed on each add operation.

 

 
{code:java}
private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) {
  ensureOpen();
  bulkRequest.add(request, payload);
  executeIfNeeded();
}
{code}
 

 

And, at i also noticed that `bulkprocessor` object would also locked in the bulk process
thread. 

the bulk process operation is in the following code:
{code:java}
public void execute(BulkRequest bulkRequest, long executionId) {
    Runnable toRelease = () -> {};
    boolean bulkRequestSetupSuccessful = false;
    try {
        listener.beforeBulk(executionId, bulkRequest);
        semaphore.acquire();
        toRelease = semaphore::release;
        CountDownLatch latch = new CountDownLatch(1);
        retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>()
{
            @Override
            public void onResponse(BulkResponse response) {
                try {
                    listener.afterBulk(executionId, bulkRequest, response);
                } finally {
                    semaphore.release();
                    latch.countDown();
                }
            }

            @Override
            public void onFailure(Exception e) {
                try {
                    listener.afterBulk(executionId, bulkRequest, e);
                } finally {
                    semaphore.release();
                    latch.countDown();
                }
            }
        }, Settings.EMPTY);
        bulkRequestSetupSuccessful = true;
        if (concurrentRequests == 0) {
            latch.await();
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.",
executionId), e);
        listener.afterBulk(executionId, bulkRequest, e);
    } catch (Exception e) {
        logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.",
executionId), e);
        listener.afterBulk(executionId, bulkRequest, e);
    } finally {
        if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release
the semaphore
            toRelease.run();
        }
    }
}
{code}
As the read line i marked above, i think, that's the reason why the retry operation thread
was block, because the the bulk process thread never release the lock on `bulkprocessor`. 
and, i also trying to figure out why the field `concurrentRequests` was set to zero. And i
saw the the initialize for bulkprocessor in class `ElasticsearchSinkBase`:
{code:java}
protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
 ...
 BulkProcessor.Builder bulkProcessorBuilder =      callBridge.createBulkProcessorBuilder(client,
listener);

 // This makes flush() blocking
 bulkProcessorBuilder.setConcurrentRequests(0);
 
 ...

 return bulkProcessorBuilder.build();
}
{code}
 this field value was set to zero explicitly. So, all things seems to make sense, but i
still wonder why the retry operation is not in the same thread as the bulk process execution,
after i read the code, `bulkAsync` method might be the last puzzle.
{code:java}
@Override
public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener
listener) {
 return BulkProcessor.builder(client::bulkAsync, listener);
}
{code}
So, I hope someone can help to fix this problem, or given some suggestions, and also i can
make a try to take it. 
Thanks a lot !


> ElasticSearch6Connector cause thread blocked when index failed with retry
> -------------------------------------------------------------------------
>
>                 Key: FLINK-11046
>                 URL: https://issues.apache.org/jira/browse/FLINK-11046
>             Project: Flink
>          Issue Type: Bug
>          Components: ElasticSearch Connector
>    Affects Versions: 1.6.2
>            Reporter: luoguohao
>            Priority: Major
>
> When i'm using es6 sink to index into es, bulk process with some exception catched,
and  i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()`
method, but things goes incorrect. The call thread stuck there, and with the thread dump,
i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer
indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that `synchronized`
is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload)
{
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the bulk process
thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
>     Runnable toRelease = () -> {};
>     boolean bulkRequestSetupSuccessful = false;
>     try {
>         listener.beforeBulk(executionId, bulkRequest);
>         semaphore.acquire();
>         toRelease = semaphore::release;
>         CountDownLatch latch = new CountDownLatch(1);
>         retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>()
{
>             @Override
>             public void onResponse(BulkResponse response) {
>                 try {
>                     listener.afterBulk(executionId, bulkRequest, response);
>                 } finally {
>                     semaphore.release();
>                     latch.countDown();
>                 }
>             }
>             @Override
>             public void onFailure(Exception e) {
>                 try {
>                     listener.afterBulk(executionId, bulkRequest, e);
>                 } finally {
>                     semaphore.release();
>                     latch.countDown();
>                 }
>             }
>         }, Settings.EMPTY);
>         bulkRequestSetupSuccessful = true;
>         if (concurrentRequests == 0) {
>             latch.await();
>         }
>     } catch (InterruptedException e) {
>         Thread.currentThread().interrupt();
>         logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.",
executionId), e);
>         listener.afterBulk(executionId, bulkRequest, e);
>     } catch (Exception e) {
>         logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request
{}.", executionId), e);
>         listener.afterBulk(executionId, bulkRequest, e);
>     } finally {
>         if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release
the semaphore
>             toRelease.run();
>         }
>     }
> }
> {code}
> As the read line i marked above, i think, that's the reason why the retry operation thread
was block, because the the bulk process thread never release the lock on `bulkprocessor`. 
and, i also trying to figure out why the field `concurrentRequests` was set to zero. And i
saw the the initialize for bulkprocessor in class `ElasticsearchSinkBase`:
> {code:java}
> protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
>  ...
>  BulkProcessor.Builder bulkProcessorBuilder =      callBridge.createBulkProcessorBuilder(client,
listener);
>  // This makes flush() blocking
>  bulkProcessorBuilder.setConcurrentRequests(0);
>  
>  ...
>  return bulkProcessorBuilder.build();
> }
> {code}
>  this field value was set to zero explicitly. So, all things seems to make sense, but
i still wonder why the retry operation is not in the same thread as the bulk process execution,
after i read the code, `bulkAsync` method might be the last puzzle.
> {code:java}
> @Override
> public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener
listener) {
>  return BulkProcessor.builder(client::bulkAsync, listener);
> }
> {code}
> So, I hope someone can help to fix this problem, or given some suggestions, and also i
can make a try to take it. 
>  Thanks a lot !



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message