spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cheng, Hao" <hao.ch...@intel.com>
Subject RE: [SQL] Elasticsearch-hadoop, exception creating temporary table
Date Thu, 19 Mar 2015 00:27:16 GMT
Seems the elasticsearch-hadoop project was built with an old version of Spark, and then you
upgraded the Spark version in execution env, as I know the StructField changed the definition
in Spark 1.2, can you confirm the version problem first?

From: Todd Nist [mailto:tsindotg@gmail.com]
Sent: Thursday, March 19, 2015 7:49 AM
To: user@spark.apache.org
Subject: [SQL] Elasticsearch-hadoop, exception creating temporary table



I am attempting to access ElasticSearch and expose it’s data through SparkSQL using the
elasticsearch-hadoop project.  I am encountering the following exception when trying to create
a Temporary table from a resource in ElasticSearch.:

15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184
s

Create Temporary Table for querying

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V

at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)

at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)

at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)

at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)

at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33)

at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32)

at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:36)

at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20)

at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103)

at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67)

at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67)

at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)

at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)

at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:108)

at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)

at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87)

at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)



I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch cluster.
The mapping looks as follows:

radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping'

{"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}}

I can read the data just fine doing the following:

import java.io.File



import scala.collection.JavaConversions._



import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.SparkContext._

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{SchemaRDD,SQLContext}



// ES imports

import org.elasticsearch.spark._

import org.elasticsearch.spark.sql._



import io.radtech.spark.utils.{Settings, Spark, ElasticSearch}



object ElasticSearchReadWrite {



  /**

   * Spark specific configuration

   */

  def sparkInit(): SparkContext = {

    val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)

    conf.set("es.nodes", ElasticSearch.Nodes)

    conf.set("es.port", ElasticSearch.HttpPort.toString())

    conf.set("es.index.auto.create", "true");

    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

    conf.set("spark.executor.memory","1g")

    conf.set("spark.kryoserializer.buffer.mb","256")



    val sparkContext = new SparkContext(conf)



    sparkContext

  }



  def main(args: Array[String]) {



    val sc = sparkInit



    val sqlContext = new SQLContext(sc)

    import sqlContext._



    val start = System.currentTimeMillis()



    /*

     * Read from ES and query with with Spark & SparkSQL

     */

    val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}")



    esData.collect.foreach(println(_))



    val end = System.currentTimeMillis()

    println(s"Total time: ${end-start} ms")

This works fine and and prints the content of esData out as one would expect.

15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at ElasticSearchReadWrite.scala:67,
took 6.897443 s

(4,Map(employer -> Tourmania, city -> Eastvale, address -> 986 Wyckoff Avenue, state
-> HI, balance -> 27658, age -> 31, gender -> F, lastname -> Flores, email
-> rodriquezflores@tourmania.com<mailto:rodriquezflores@tourmania.com>, firstname
-> Rodriquez, account_number -> 4))

(9,Map(employer -> Cedward, city -> Olney, address -> 963 Neptune Avenue, state ->
OH, balance -> 24776, age -> 39, gender -> M, lastname -> Meadows, email ->
opalmeadows@cedward.com<mailto:opalmeadows@cedward.com>, firstname -> Opal, account_number
-> 9))

...

As does creating a new index and type like this:

    println("read json in and store in ES")

    // read in JSON and store in ES

    val path = "document.json"

    val rdd : SchemaRDD = sqlContext.jsonFile(path)



    rdd.saveToEs("myIndex/myDoc")

However, when I attempt to access the the table via the sqlContext like this I get the exception
shown above:

    println("Create Temporary Table for querying")



    val schemaRDD: SchemaRDD = sqlContext.sql(

          "CREATE TEMPORARY TABLE account    " +

          "USING org.elasticsearch.spark.sql " +

          "OPTIONS (resource 'bank/account')  " )

  }

}

I'm using ElasticSearch 1.4.4, spark-1.2.1-bin-hadoop2.4, and the elasticsearch-hadoop:

"org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT"

Any insight on what I am doing wrong?

TIA for the assistance.
[https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif]

-Todd
Mime
View raw message