ElasticSearch enrich

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

ElasticSearch enrich

b0c1
Hi guys,

I have a small question. I want to create a "Worker" class which using ElasticClient to make query to elasticsearch. (I want to enrich my data with geo search result).

How can I do that? I try to create a worker instance with ES host/port parameter but spark throw an exceptino (my class not serializable). 

Any idea?

Thanks
b0c1

Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

Peng Cheng
make sure all queries are called through class methods and wrap your query info with a class having only simple properties (strings, collections etc).
If you can't find such wrapper you can also use SerializableWritable wrapper out-of-the-box, but its not recommended. (developer-api and make fat closures that run slowly)
Yours Peng
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

b0c1
Ok but in this case where can I store the ES connection? Or all document create new ES connection inside the worker?

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Wed, Jun 25, 2014 at 1:01 AM, Peng Cheng <[hidden email]> wrote:
make sure all queries are called through class methods and wrap your query
info with a class having only simple properties (strings, collections etc).
If you can't find such wrapper you can also use SerializableWritable wrapper
out-of-the-box, but its not recommended. (developer-api and make fat
closures that run slowly)



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8214.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

Mayur Rustagi
In reply to this post by b0c1
Mostly ES client is not serializable for you. You can do 3 workarounds, 
1. Switch to kryo serialization, register the client in kryo , might solve your serialization issue
2. Use mappartition for all your data & initialize your client in the mappartition code, this will create client for each partition, reduce some parallelism & add some overhead of creation of client but prevent serialization of esclient & transfer to workers
3. Use serializablewrapper to serialize your ESclient manually & send it across & deserialize it manually, this may or may not work depending on whether your class is safely serializable. 

Mayur Rustagi
Ph: +1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:12 AM, boci <[hidden email]> wrote:
Hi guys,

I have a small question. I want to create a "Worker" class which using ElasticClient to make query to elasticsearch. (I want to enrich my data with geo search result).

How can I do that? I try to create a worker instance with ES host/port parameter but spark throw an exceptino (my class not serializable). 

Any idea?

Thanks
b0c1


Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

b0c1
I using elastic4s inside my ESWorker class. ESWorker now only contain two field, host:String, port:Int. Now Inside the "findNearestCity" method I create ElasticClient (elastic4s) connection. What's wrong with my class? I need to serialize ElasticClient? mappartition is sounds good but I still got NotSerializableException, or I must mar kit to transient? and where come the host and port in this case?

my worker:

class ESWorker(val host: String, val port: Int) {
  def findNearestCity(geo: Position): Option[City] = {
     //Here I create ElasticClient connection and execute queries
  }
  def enrichGeoInternal(data:Data):Data = {
     data.location=findNearestCity(data.position)
  }
  def enrichGeo(ds: DStream[Data]): DStream[Data] = {
     ds.map(enrichGeoInternal)
  }
}



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Wed, Jun 25, 2014 at 1:03 AM, Mayur Rustagi <[hidden email]> wrote:
Mostly ES client is not serializable for you. You can do 3 workarounds, 
1. Switch to kryo serialization, register the client in kryo , might solve your serialization issue
2. Use mappartition for all your data & initialize your client in the mappartition code, this will create client for each partition, reduce some parallelism & add some overhead of creation of client but prevent serialization of esclient & transfer to workers
3. Use serializablewrapper to serialize your ESclient manually & send it across & deserialize it manually, this may or may not work depending on whether your class is safely serializable. 

Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:12 AM, boci <[hidden email]> wrote:
Hi guys,

I have a small question. I want to create a "Worker" class which using ElasticClient to make query to elasticsearch. (I want to enrich my data with geo search result).

How can I do that? I try to create a worker instance with ES host/port parameter but spark throw an exceptino (my class not serializable). 

Any idea?

Thanks
b0c1



Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

Peng Cheng
In reply to this post by b0c1
I'm afraid persisting connection across two tasks is a dangerous act as they can't be guaranteed to be executed on the same machine. Your ES server may think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in a local 'pool', so nothing will sneak into your closure, but its too complex and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the default serializer
Yours Peng
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

Mayur Rustagi
Its not used as default serializer for some issues with compatibility & requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. 


Mayur Rustagi
Ph: +1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <[hidden email]> wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

Holden Karau
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. 

Hope this helps!

Cheers,

Holden :)


On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <[hidden email]> wrote:
Its not used as default serializer for some issues with compatibility & requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. 


Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <[hidden email]> wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
Cell : 425-233-8271
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

b0c1
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices?
- my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
- After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local?

Thanks guys

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <[hidden email]> wrote:
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. 

Hope this helps!

Cheers,

Holden :)


On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <[hidden email]> wrote:
Its not used as default serializer for some issues with compatibility & requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. 


Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <[hidden email]> wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
Cell : 425-233-8271

Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

Holden Karau


On Wed, Jun 25, 2014 at 4:16 PM, boci <[hidden email]> wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. 
- my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? 
- After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. 

Thanks guys

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <[hidden email]> wrote:
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. 

Hope this helps!

Cheers,

Holden :)


On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <[hidden email]> wrote:
Its not used as default serializer for some issues with compatibility & requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. 


Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <[hidden email]> wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271




--
Cell : 425-233-8271
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

b0c1
That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can))

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <[hidden email]> wrote:


On Wed, Jun 25, 2014 at 4:16 PM, boci <[hidden email]> wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. 
- my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? 
- After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. 

Thanks guys

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <[hidden email]> wrote:
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. 

Hope this helps!

Cheers,

Holden :)


On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <[hidden email]> wrote:
Its not used as default serializer for some issues with compatibility & requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. 


Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <[hidden email]> wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271




--
Cell : 425-233-8271

Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

MLnick
You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat (https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200).


On Thu, Jun 26, 2014 at 9:04 AM, boci <[hidden email]> wrote:
That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can))

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <[hidden email]> wrote:


On Wed, Jun 25, 2014 at 4:16 PM, boci <[hidden email]> wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. 
- my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? 
- After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. 

Thanks guys

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <[hidden email]> wrote:
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. 

Hope this helps!

Cheers,

Holden :)


On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <[hidden email]> wrote:
Its not used as default serializer for some issues with compatibility & requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. 


Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <[hidden email]> wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271


Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

b0c1
Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem the output index is depend by the input data.

Thanks

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <[hidden email]> wrote:
You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat (https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200).


On Thu, Jun 26, 2014 at 9:04 AM, boci <[hidden email]> wrote:
That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can))

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <[hidden email]> wrote:


On Wed, Jun 25, 2014 at 4:16 PM, boci <[hidden email]> wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. 
- my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? 
- After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. 

Thanks guys

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <[hidden email]> wrote:
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. 

Hope this helps!

Cheers,

Holden :)


On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <[hidden email]> wrote:
Its not used as default serializer for some issues with compatibility & requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. 


Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <[hidden email]> wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271



Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

Holden Karau
Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call  saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =>
     val jobconf = ...
     data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)


On Thu, Jun 26, 2014 at 2:23 PM, boci <[hidden email]> wrote:
Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem the output index is depend by the input data.

Thanks

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <[hidden email]> wrote:
You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat (https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200).


On Thu, Jun 26, 2014 at 9:04 AM, boci <[hidden email]> wrote:
That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can))

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <[hidden email]> wrote:


On Wed, Jun 25, 2014 at 4:16 PM, boci <[hidden email]> wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. 
- my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? 
- After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. 

Thanks guys

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <[hidden email]> wrote:
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. 

Hope this helps!

Cheers,

Holden :)


On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <[hidden email]> wrote:
Its not used as default serializer for some issues with compatibility & requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. 


Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <[hidden email]> wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271






--
Cell : 425-233-8271
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

b0c1
Wow, thanks your fast answer, it's help a lot...

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <[hidden email]> wrote:
Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call  saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =>
     val jobconf = ...
     data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)


On Thu, Jun 26, 2014 at 2:23 PM, boci <[hidden email]> wrote:
Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem the output index is depend by the input data.

Thanks

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <[hidden email]> wrote:
You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat (https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200).


On Thu, Jun 26, 2014 at 9:04 AM, boci <[hidden email]> wrote:
That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can))

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <[hidden email]> wrote:


On Wed, Jun 25, 2014 at 4:16 PM, boci <[hidden email]> wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. 
- my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? 
- After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. 

Thanks guys

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <[hidden email]> wrote:
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. 

Hope this helps!

Cheers,

Holden :)


On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <[hidden email]> wrote:
Its not used as default serializer for some issues with compatibility & requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. 


Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <[hidden email]> wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271






--
Cell : 425-233-8271

Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

Holden Karau
Just your luck I happened to be working on that very talk today :) Let me know how your experiences with Elasticsearch & Spark go :)


On Thu, Jun 26, 2014 at 3:17 PM, boci <[hidden email]> wrote:
Wow, thanks your fast answer, it's help a lot...

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <[hidden email]> wrote:
Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call  saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =>
     val jobconf = ...
     data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)


On Thu, Jun 26, 2014 at 2:23 PM, boci <[hidden email]> wrote:
Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem the output index is depend by the input data.

Thanks

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <[hidden email]> wrote:
You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat (https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200).


On Thu, Jun 26, 2014 at 9:04 AM, boci <[hidden email]> wrote:
That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can))

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <[hidden email]> wrote:


On Wed, Jun 25, 2014 at 4:16 PM, boci <[hidden email]> wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. 
- my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? 
- After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. 

Thanks guys

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <[hidden email]> wrote:
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. 

Hope this helps!

Cheers,

Holden :)


On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <[hidden email]> wrote:
Its not used as default serializer for some issues with compatibility & requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. 


Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <[hidden email]> wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271






--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271




--
Cell : 425-233-8271
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

b0c1
Another question. In the foreachRDD I will initialize the JobConf, but in this place how can I get information from the items?
I have an identifier in the data which identify the required ES index (so how can I set dynamic index in the foreachRDD) ?

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <[hidden email]> wrote:
Just your luck I happened to be working on that very talk today :) Let me know how your experiences with Elasticsearch & Spark go :)


On Thu, Jun 26, 2014 at 3:17 PM, boci <[hidden email]> wrote:
Wow, thanks your fast answer, it's help a lot...

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <[hidden email]> wrote:
Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call  saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =>
     val jobconf = ...
     data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)


On Thu, Jun 26, 2014 at 2:23 PM, boci <[hidden email]> wrote:
Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem the output index is depend by the input data.

Thanks

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <[hidden email]> wrote:
You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat (https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200).


On Thu, Jun 26, 2014 at 9:04 AM, boci <[hidden email]> wrote:
That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can))

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <[hidden email]> wrote:


On Wed, Jun 25, 2014 at 4:16 PM, boci <[hidden email]> wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. 
- my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? 
- After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. 

Thanks guys

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <[hidden email]> wrote:
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. 

Hope this helps!

Cheers,

Holden :)


On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <[hidden email]> wrote:
Its not used as default serializer for some issues with compatibility & requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. 


Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <[hidden email]> wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271






--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271




--
Cell : 425-233-8271

Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

b0c1
Ok I found dynamic resources, but I have a frustrating problem. This is the flow:
kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save

My problem is: if I do this it's not work, the enrich functions not called, but if I put a print it's does. for example if I do this:
kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD

The enrich X and enrich Y called but enrich Z not
if I put the print after the enrich Z it's will be printed. How can I solve this? (what can I do to call the foreachRDD I put breakpoint inside the map function (where I'm generate the writable) but it's not called)

Any idea?

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Fri, Jun 27, 2014 at 4:53 PM, boci <[hidden email]> wrote:
Another question. In the foreachRDD I will initialize the JobConf, but in this place how can I get information from the items?
I have an identifier in the data which identify the required ES index (so how can I set dynamic index in the foreachRDD) ?

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <[hidden email]> wrote:
Just your luck I happened to be working on that very talk today :) Let me know how your experiences with Elasticsearch & Spark go :)


On Thu, Jun 26, 2014 at 3:17 PM, boci <[hidden email]> wrote:
Wow, thanks your fast answer, it's help a lot...

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <[hidden email]> wrote:
Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call  saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =>
     val jobconf = ...
     data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)


On Thu, Jun 26, 2014 at 2:23 PM, boci <[hidden email]> wrote:
Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem the output index is depend by the input data.

Thanks

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <[hidden email]> wrote:
You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat (https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200).


On Thu, Jun 26, 2014 at 9:04 AM, boci <[hidden email]> wrote:
That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can))

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <[hidden email]> wrote:


On Wed, Jun 25, 2014 at 4:16 PM, boci <[hidden email]> wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. 
- my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? 
- After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. 

Thanks guys

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <[hidden email]> wrote:
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. 

Hope this helps!

Cheers,

Holden :)


On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <[hidden email]> wrote:
Its not used as default serializer for some issues with compatibility & requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. 


Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <[hidden email]> wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271






--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271




--
Cell : 425-233-8271


Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch enrich

Holden Karau
So a few quick questions:

1) What cluster are you running this against? Is it just local? Have you tried local[4]?
2) When you say breakpoint, how are you setting this break point? There is a good chance your breakpoint mechanism doesn't work in a distributed environment, could you instead cause a side effect (like writing to a file)?

Cheers,

Holden :)


On Fri, Jun 27, 2014 at 2:04 PM, boci <[hidden email]> wrote:
Ok I found dynamic resources, but I have a frustrating problem. This is the flow:
kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save

My problem is: if I do this it's not work, the enrich functions not called, but if I put a print it's does. for example if I do this:
kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD

The enrich X and enrich Y called but enrich Z not
if I put the print after the enrich Z it's will be printed. How can I solve this? (what can I do to call the foreachRDD I put breakpoint inside the map function (where I'm generate the writable) but it's not called)

Any idea?

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Fri, Jun 27, 2014 at 4:53 PM, boci <[hidden email]> wrote:
Another question. In the foreachRDD I will initialize the JobConf, but in this place how can I get information from the items?
I have an identifier in the data which identify the required ES index (so how can I set dynamic index in the foreachRDD) ?

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <[hidden email]> wrote:
Just your luck I happened to be working on that very talk today :) Let me know how your experiences with Elasticsearch & Spark go :)


On Thu, Jun 26, 2014 at 3:17 PM, boci <[hidden email]> wrote:
Wow, thanks your fast answer, it's help a lot...

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <[hidden email]> wrote:
Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call  saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =>
     val jobconf = ...
     data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)


On Thu, Jun 26, 2014 at 2:23 PM, boci <[hidden email]> wrote:
Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem the output index is depend by the input data.

Thanks

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <[hidden email]> wrote:
You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat (https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200).


On Thu, Jun 26, 2014 at 9:04 AM, boci <[hidden email]> wrote:
That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can))

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <[hidden email]> wrote:


On Wed, Jun 25, 2014 at 4:16 PM, boci <[hidden email]> wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. 
- my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? 
- After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. 

Thanks guys

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <[hidden email]> wrote:
So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. 

Hope this helps!

Cheers,

Holden :)


On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <[hidden email]> wrote:
Its not used as default serializer for some issues with compatibility & requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. 


Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <[hidden email]> wrote:
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271






--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271




--
Cell : <a href="tel:425-233-8271" value="+14252338271" target="_blank">425-233-8271





--
Cell : 425-233-8271
Reply | Threaded
Open this post in threaded view
|

RE: ElasticSearch enrich

amoc
In reply to this post by b0c1

b0c1, could you post your code? I am interested in your solution.

 

Thanks

Adrian

 

From: boci [mailto:[hidden email]]
Sent: June-26-14 6:17 PM
To: [hidden email]
Subject: Re: ElasticSearch enrich

 

Wow, thanks your fast answer, it's help a lot...

 

b0c1


----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]

 

On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <[hidden email]> wrote:

Hi b0c1,

 

I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call  saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD.

 

e.g.

 

stream.foreachRDD{(data, time) =>

     val jobconf = ...

     data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)

}

 

Hope that helps :)

 

Cheers,

 

Holden :)

 

On Thu, Jun 26, 2014 at 2:23 PM, boci <[hidden email]> wrote:

Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem the output index is depend by the input data.

 

Thanks


----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]

 

On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <[hidden email]> wrote:

You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat (https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 

For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200).

 

On Thu, Jun 26, 2014 at 9:04 AM, boci <[hidden email]> wrote:

That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can))

 

b0c1


----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]

 

On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <[hidden email]> wrote:

 

On Wed, Jun 25, 2014 at 4:16 PM, boci <[hidden email]> wrote:

Hi guys, thanks the direction now I have some problem/question:

- in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices?

In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. 

- my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? 

- After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local?

I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. 

 

Thanks guys

 

b0c1

 

 


----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: [hidden email]

 

On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <[hidden email]> wrote:

So I'm giving a talk at the Spark summit on using Spark & ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick & dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients.

 

This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. 

 

Hope this helps!

 

Cheers,

 

Holden :)

 

On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <[hidden email]> wrote:

Its not used as default serializer for some issues with compatibility & requirement to register the classes..

 

Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. 

 


Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" target="_blank">+1 (760) 203 3257

 

On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <[hidden email]> wrote:

I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.

 



 

--
Cell : <a href="tel:425-233-8271" target="_blank">425-233-8271

 



 

--
Cell : <a href="tel:425-233-8271" target="_blank">425-233-8271

 

 

 



 

--
Cell : 425-233-8271

 

12