flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3875) Add a TableSink for Elasticsearch
Date Wed, 29 Aug 2018 13:33:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596329#comment-16596329

ASF GitHub Bot commented on FLINK-3875:

dawidwys commented on a change in pull request #6611: [FLINK-3875] [connectors] Add an upsert
table sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611#discussion_r213665920

 File path: docs/dev/table/connect.md
 @@ -583,6 +584,104 @@ Make sure to add the version-specific Kafka dependency. In addition,
a correspon
 {% top %}
+### Elasticsearch Connector
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+<span class="label label-info">Format: JSON-only</span>
+The Elasticsearch connector allows for writing into an index of the Elasticsearch search
+The connector operates in [upsert mode](#update-modes) and exchanges UPSERT/DELETE messages
with the external system using a [key defined by the query](streaming.html#table-to-stream-conversion).
It can be defined as follows:
+<div class="codetabs" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+  new Elasticsearch()
+    .version("6")                      // required: valid connector versions are "6"
+    .host("localhost", 9200, "http")   // required: one or more Elasticsearch hosts to connect
+    .index("MyUsers")                  // required: Elasticsearch index
+    .documentType("user")              // required: Elasticsearch document type
+    .keyDelimiter("$")        // optional: delimiter for composite keys ("_" by default)
+                              //   e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
+    .keyNullLiteral("n/a")    // optional: representation for null fields in keys ("null"
by default)
+    // optional: failure handling strategy in case a request to Elasticsearch fails (fail
by default)
+    .failureHandlerFail()          // optional: throws an exception if a request fails and
causes a job failure
+    .failureHandlerIgnore()        //   or ignores failures and drops the request
+    .failureHandlerRetryRejected() //   or re-adds requests that have failed due to queue
capacity saturation
+    .failureHandlerCustom(...)     //   or custom failure handling with a ActionRequestFailureHandler
+    // optional: configure how to buffer elements before sending them in bulk to the cluster
for efficiency
+    .disableFlushOnCheckpoint()    // optional: disables flushing on checkpoint (see notes
+    .bulkFlushMaxActions(42)       // optional: maximum number of actions to buffer for each
bulk request
+    .bulkFlushMaxSize(42)          // optional: maximum size of buffered actions (in MB)
per bulk request
+    .bulkFlushInterval(60000L)     // optional: bulk flush interval (in milliseconds)
+    .bulkFlushBackoffConstant()    // optional: use a constant backoff type
+    .bulkFlushBackoffExponential() //   or use an exponential backoff type
+    .bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
+    .bulkFlushBackoffDelay(30000L) // optional: delay between each backoff attempt (in milliseconds)
+    // optional: connection properties to be used during REST communication to Elasticsearch
+    .connectionMaxRetryTimeout(3)  // optional: maximum timeout (in milliseconds) between
+    .connectionPathPrefix("/v1")   // optional: prefix string to be added to every REST communication
 Review comment:
   Does it have any other goal than setting rest api version? Wouldn't it make more sense
to choose from api version?

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

> Add a TableSink for Elasticsearch
> ---------------------------------
>                 Key: FLINK-3875
>                 URL: https://issues.apache.org/jira/browse/FLINK-3875
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming Connectors, Table API &amp; SQL
>            Reporter: Fabian Hueske
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
> Add a TableSink that writes data to Elasticsearch

This message was sent by Atlassian JIRA

View raw message