flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "sunxiongkun (JIRA)" <j...@apache.org>
Subject [jira] [Closed] (FLINK-12551) elasticsearch6 connector print log error
Date Thu, 23 May 2019 04:20:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-12551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

sunxiongkun closed FLINK-12551.
-------------------------------
    Resolution: Not A Bug

> elasticsearch6 connector print log error
> ----------------------------------------
>
>                 Key: FLINK-12551
>                 URL: https://issues.apache.org/jira/browse/FLINK-12551
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / ElasticSearch
>    Affects Versions: 1.6.3
>            Reporter: sunxiongkun
>            Priority: Minor
>
> when i use elasticsearch connector ,when my project is running,i find some data does
not insert elasticsearch ,so i want to read log help me ,but the log does contain importance
message,so i read source code (org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase),i
find a error on write ERROR log.
>  
> {code:java}
> @Override
> public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
>  if (response.hasFailures()) {
>   BulkItemResponse itemResponse;
>   Throwable failure;
>   RestStatus restStatus;
>   try {
>    for (int i = 0; i < response.getItems().length; i++) {
>     itemResponse = response.getItems()[i];
>     failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
>     if (failure != null) {
>      LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(),
failure);
>      restStatus = itemResponse.getFailure().getStatus();
>      if (restStatus == null) {
>       failureHandler.onFailure(request.requests().get(i), failure, -1, requestIndexer);
>      } else {
>       failureHandler.onFailure(request.requests().get(i), failure, restStatus.getStatus(),
requestIndexer);
>      }
>     }
>    }
>   } catch (Throwable t) {
>    // fail the sink and skip the rest of the items
>    // if the failure handler decides to throw an exception
>    failureThrowable.compareAndSet(null, t);
>   }
>  }
>  if (flushOnCheckpoint) {
>   numPendingRequests.getAndAdd(-request.numberOfActions());
>  }
> }
> {code}
> {code:java}
> @Override
>  public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
>   LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
>   try {
>    for (ActionRequest action : request.requests()) {
>     failureHandler.onFailure(action, failure, -1, requestIndexer);
>    }
>   } catch (Throwable t) {
>    // fail the sink and skip the rest of the items
>    // if the failure handler decides to throw an exception
>    failureThrowable.compareAndSet(null, t);
>   }
>   if (flushOnCheckpoint) {
>    numPendingRequests.getAndAdd(-request.numberOfActions());
>   }
>  }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message