flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzulitai <...@git.apache.org>
Subject [GitHub] flink pull request #3112: [FLINK-4988] [elasticsearch] Add Elasticsearch 5.x...
Date Tue, 07 Feb 2017 14:47:17 GMT
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3112#discussion_r99837460
  
    --- Diff: docs/dev/connectors/elasticsearch.md ---
    @@ -23,158 +23,291 @@ specific language governing permissions and limitations
     under the License.
     -->
     
    -This connector provides a Sink that can write to an
    -[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
    -following dependency to your project:
    -
    -{% highlight xml %}
    -<dependency>
    -  <groupId>org.apache.flink</groupId>
    -  <artifactId>flink-connector-elasticsearch{{ site.scala_version_suffix }}</artifactId>
    -  <version>{{site.version }}</version>
    -</dependency>
    -{% endhighlight %}
    +This connector provides sinks that can request document actions to an
    +[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
    +of the following dependencies to your project, depending on the version
    +of the Elasticsearch installation:
    +
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left">Maven Dependency</th>
    +      <th class="text-left">Supported since</th>
    +      <th class="text-left">Elasticsearch version</th>
    +    </tr>
    +  </thead>
    +  <tbody>
    +    <tr>
    +        <td>flink-connector-elasticsearch{{ site.scala_version_suffix }}</td>
    +        <td>1.0.0</td>
    +        <td>1.x</td>
    +    </tr>
    +    <tr>
    +        <td>flink-connector-elasticsearch2{{ site.scala_version_suffix }}</td>
    +        <td>1.0.0</td>
    +        <td>2.x</td>
    +    </tr>
    +    <tr>
    +        <td>flink-connector-elasticsearch5{{ site.scala_version_suffix }}</td>
    +        <td>1.2.0</td>
    +        <td>5.x</td>
    +    </tr>
    +  </tbody>
    +</table>
     
     Note that the streaming connectors are currently not part of the binary
    -distribution. See
    -[here]({{site.baseurl}}/dev/linking.html)
    -for information about how to package the program with the libraries for
    -cluster execution.
    +distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
    +about how to package the program with the libraries for cluster execution.
     
     #### Installing Elasticsearch
     
     Instructions for setting up an Elasticsearch cluster can be found
     [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
     Make sure to set and remember a cluster name. This must be set when
    -creating a Sink for writing to your cluster
    +creating an `ElasticsearchSink` for requesting document actions against your cluster.
     
     #### Elasticsearch Sink
    -The connector provides a Sink that can send data to an Elasticsearch Index.
    -
    -The sink can use two different methods for communicating with Elasticsearch:
    -
    -1. An embedded Node
    -2. The TransportClient
     
    -See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
    -for information about the differences between the two modes.
    +The `ElasticsearchSink` uses a `TransportClient` to communicate with an
    +Elasticsearch cluster.
     
    -This code shows how to create a sink that uses an embedded Node for
    -communication:
    +The example below shows how to configure and create a sink:
     
     <div class="codetabs" markdown="1">
    -<div data-lang="java" markdown="1">
    +<div data-lang="java, Elasticsearch 1.x" markdown="1">
     {% highlight java %}
     DataStream<String> input = ...;
     
    -Map<String, String> config = Maps.newHashMap();
    +Map<String, String> config = new HashMap<>();
    +config.put("cluster.name", "my-cluster-name")
     // This instructs the sink to emit after every element, otherwise they would be buffered
     config.put("bulk.flush.max.actions", "1");
    -config.put("cluster.name", "my-cluster-name");
     
    -input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>()
{
    -    @Override
    -    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
    -        Map<String, Object> json = new HashMap<>();
    -        json.put("data", element);
    +List<TransportAddress> transportAddresses = new ArrayList<String>();
    +transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
    +transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
     
    +input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>()
{
    +    public IndexRequest createIndexRequest(String element) {
    +        Map<String, String> json = new HashMap<>();
    +        json.put("data", element);
    +    
             return Requests.indexRequest()
                     .index("my-index")
                     .type("my-type")
                     .source(json);
         }
    +    
    +    @Override
    +    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
    +        indexer.add(createIndexRequest(element));
    +    }
     }));
     {% endhighlight %}
     </div>
    -<div data-lang="scala" markdown="1">
    +<div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1">
    +{% highlight java %}
    +DataStream<String> input = ...;
    +
    +Map<String, String> config = new HashMap<>();
    +config.put("cluster.name", "my-cluster-name")
    +// This instructs the sink to emit after every element, otherwise they would be buffered
    +config.put("bulk.flush.max.actions", "1");
    +
    +List<InetSocketAddress> transportAddresses = new ArrayList<>();
    +transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
    +transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
    +
    +input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>()
{
    +    public IndexRequest createIndexRequest(String element) {
    +        Map<String, String> json = new HashMap<>();
    +        json.put("data", element);
    +    
    +        return Requests.indexRequest()
    +                .index("my-index")
    +                .type("my-type")
    +                .source(json);
    +    }
    +    
    +    @Override
    +    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
    +        indexer.add(createIndexRequest(element));
    +    }
    +}));{% endhighlight %}
    +</div>
    +<div data-lang="scala, Elasticsearch 1.x" markdown="1">
     {% highlight scala %}
     val input: DataStream[String] = ...
     
    -val config = new util.HashMap[String, String]
    +val config = new java.util.HashMap[String, String]
    +config.put("cluster.name", "my-cluster-name")
    --- End diff --
    
    This is scala code, so the semicolon is ignored ;-) .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message