spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrian Mocanu <amoc...@verticalscope.com>
Subject RE: ElasticSearch enrich
Date Fri, 27 Jun 2014 21:16:44 GMT
b0c1<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=1215>,
could you post your code? I am interested in your solution.

Thanks
Adrian

From: boci [mailto:boci.boci@gmail.com]
Sent: June-26-14 6:17 PM
To: user@spark.apache.org
Subject: Re: ElasticSearch enrich

Wow, thanks your fast answer, it's help a lot...

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com<mailto:boci.boci@gmail.com>

On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <holden@pigscanfly.ca<mailto:holden@pigscanfly.ca>>
wrote:
Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the specific example
is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
. Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call  saveAsHadoopDataset
on the RDD that gets passed into the function we provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =>
     val jobconf = ...
     data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)

On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.boci@gmail.com<mailto:boci.boci@gmail.com>>
wrote:
Thanks. I without local option I can connect with es remote, now I only have one problem.
How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have "saveAsHadoopFiles"
method, my second problem the output index is depend by the input data.

Thanks

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com<mailto:boci.boci@gmail.com>

On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <nick.pentreath@gmail.com<mailto:nick.pentreath@gmail.com>>
wrote:
You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat
and ESOutputFormat (https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics
here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch)
and use the default config (host = localhost, port = 9200).

On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.boci@gmail.com<mailto:boci.boci@gmail.com>>
wrote:
That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without
hadoop (or I must need to pull up hadoop programatically? (if I can))

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com<mailto:boci.boci@gmail.com>

On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <holden@pigscanfly.ca<mailto:holden@pigscanfly.ca>>
wrote:

On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.boci@gmail.com<mailto:boci.boci@gmail.com>>
wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution
I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions,
or what is the best practices?
In this case you probably want to make the ElasticClient inside of mapPartitions (since it
isn't serializable) and if you want to use a different client in local mode just have a flag
that control what type of client you create.
- my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat]("-")
in local environment?
- After store the enriched data into ES, I want to generate aggregated data (EsInputFormat)
how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just start single
node elastic search cluster.

Thanks guys

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.boci@gmail.com<mailto:boci.boci@gmail.com>

On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <holden@pigscanfly.ca<mailto:holden@pigscanfly.ca>>
wrote:
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if
you want to see a simple demo which uses elasticsearch for geo input you can take a look at
my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
). This approach uses the ESInputFormat which avoids the difficulty of having to manually
create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query for each record
in your RDD. If this is the case, you could instead look at using mapPartitions and setting
up your Elasticsearch connection inside of that, so you could then re-use the client for all
of the queries on each partition. This approach will avoid having to serialize the Elasticsearch
connection because it will be local to your function.

Hope this helps!

Cheers,

Holden :)

On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <mayur.rustagi@gmail.com<mailto:mayur.rustagi@gmail.com>>
wrote:
Its not used as default serializer for some issues with compatibility & requirement to
register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are
sending it to spark workers inside a map, reduce , mappartition or any of the operations on
RDD.


Mayur Rustagi
Ph: +1 (760) 203 3257<tel:%2B1%20%28760%29%20203%203257>
http://www.sigmoidanalytics.com
@mayur_rustagi<https://twitter.com/mayur_rustagi>


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc175@uow.edu.au<mailto:pc175@uow.edu.au>>
wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
Cell : 425-233-8271<tel:425-233-8271>




--
Cell : 425-233-8271<tel:425-233-8271>






--
Cell : 425-233-8271

Mime
View raw message