spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cheng, Hao" <hao.ch...@intel.com>
Subject RE: [SQL] Elasticsearch-hadoop, exception creating temporary table
Date Thu, 19 Mar 2015 01:59:04 GMT
Todd, can you try run the code in Spark shell (bin/spark-shell), maybe you need to write some
fake code to call the function in MappingUtils.scala, in the meantime, can you also check
the jar dependencies tree of your project? Or the download dependency jar files, just in case
multiple versions of spark has been introduced.

From: Todd Nist [mailto:tsindotg@gmail.com]
Sent: Thursday, March 19, 2015 9:04 AM
To: Cheng, Hao
Cc: user@spark.apache.org
Subject: Re: [SQL] Elasticsearch-hadoop, exception creating temporary table


Thanks for the quick response.

The spark server is spark-1.2.1-bin-hadoop2.4 from the Spark download. Here is the startup:

radtech>$ ./sbin/start-master.sh

starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-tnist-org.apache.spark.deploy.master.Master-1-radtech.io<http://radtech.io>.out



Spark assembly has been built with Hive, including Datanucleus jars on classpath

Spark Command: java -cp ::/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/spark-assembly-1.2.1-hadoop2.4.0.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar
-Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.master.Master
--ip radtech.io<http://radtech.io> --port 7077 --webui-port 8080

========================================



15/03/18 20:31:40 INFO Master: Registered signal handlers for [TERM, HUP, INT]

15/03/18 20:31:40 INFO SecurityManager: Changing view acls to: tnist

15/03/18 20:31:40 INFO SecurityManager: Changing modify acls to: tnist

15/03/18 20:31:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls
disabled; users with view permissions: Set(tnist); users with modify permissions: Set(tnist)

15/03/18 20:31:41 INFO Slf4jLogger: Slf4jLogger started

15/03/18 20:31:41 INFO Remoting: Starting remoting

15/03/18 20:31:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@radtech.io:7077<http://sparkMaster@radtech.io:7077>]

15/03/18 20:31:41 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkMaster@radtech.io:7077<http://sparkMaster@radtech.io:7077>]

15/03/18 20:31:41 INFO Utils: Successfully started service 'sparkMaster' on port 7077.

15/03/18 20:31:41 INFO Master: Starting Spark master at spark://radtech.io:7077<http://radtech.io:7077>

15/03/18 20:31:41 INFO Utils: Successfully started service 'MasterUI' on port 8080.

15/03/18 20:31:41 INFO MasterWebUI: Started MasterWebUI at http://192.168.1.5:8080<http://192.168.1.5:8080>

15/03/18 20:31:41 INFO Master: I have been elected leader! New state: ALIVE

My build.sbt for the spark job is as follows:

import AssemblyKeys._



// activating assembly plugin

assemblySettings



name := "elasticsearch-spark"



version := "0.0.1"



val SCALA_VERSION = "2.10.4"



val SPARK_VERSION = "1.2.1"



val defaultSettings = Defaults.coreDefaultSettings ++ Seq(

  organization := "io.radtec",

  scalaVersion := SCALA_VERSION,

  resolvers := Seq(

    //"ods-repo" at "http://artifactory.ods:8082/artifactory/repo",

    Resolver.typesafeRepo("releases")),

  scalacOptions ++= Seq(

    "-unchecked",

    "-deprecation",

    "-Xlint",

    "-Ywarn-dead-code",

    "-language:_",

    "-target:jvm-1.7",

    "-encoding",

    "UTF-8"

  ),

  parallelExecution in Test := false,

  testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"),

  publishArtifact in (Test, packageBin) := true,

  unmanagedSourceDirectories in Compile <<= (scalaSource in Compile)(Seq(_)),

  unmanagedSourceDirectories in Test <<= (scalaSource in Test)(Seq(_)),

  EclipseKeys.createSrc := EclipseCreateSrc.Default + EclipseCreateSrc.Resource,

  credentials += Credentials(Path.userHome / ".ivy2" / ".credentials"),

  publishTo := Some("Artifactory Realm" at "http://artifactory.ods:8082/artifactory/ivy-repo-local")

)



// custom Hadoop client, configured as provided, since it shouldn't go to assembly jar

val hadoopDeps = Seq (

  "org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided"

)



// ElasticSearch Hadoop support

val esHadoopDeps = Seq (

  ("org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT").

    exclude("org.apache.spark", "spark-core_2.10").

    exclude("org.apache.spark", "spark-streaming_2.10").

    exclude("org.apache.spark", "spark-sql_2.10").

    exclude("javax.jms", "jms")

)



val commonDeps = Seq(

  "com.eaio.uuid"             % "uuid"                  % "3.2",

  "joda-time"                 % "joda-time"             % "2.3",

  "org.joda"                  % "joda-convert"          % "1.6"

)



val jsonDeps = Seq(

  "com.googlecode.json-simple"        % "json-simple"                     % "1.1.1",

  "com.fasterxml.jackson.core"        % "jackson-core"                    % "2.5.1",

  "com.fasterxml.jackson.core"        % "jackson-annotations"             % "2.5.1",

  "com.fasterxml.jackson.core"        % "jackson-databind"                % "2.5.1",

  "com.fasterxml.jackson.module"      % "jackson-module-jaxb-annotations" % "2.5.1",

  "com.fasterxml.jackson.module"     %% "jackson-module-scala"            % "2.5.1",

  "com.fasterxml.jackson.dataformat"  % "jackson-dataformat-xml"          % "2.5.1",

  "com.fasterxml.jackson.datatype"    % "jackson-datatype-joda"           % "2.5.1"

)



val commonTestDeps = Seq(

  "org.specs2"               %% "specs2"                   % "2.3.11"       % "test",

  "org.mockito"               % "mockito-all"              % "1.9.5"        % "test",

  "org.scalacheck"           %% "scalacheck"               % "1.11.3"       % "test",

  "org.scalatest"            %% "scalatest"                % "1.9.1"        % "test"

)



// Project definitions



lazy val root = (project in file("."))

        .settings(defaultSettings:_*)

        .settings(libraryDependencies ++= Seq(

                "com.databricks"           %% "spark-csv"             % "0.1",

                // Spark itself, configured as provided, since it shouldn't go to assembly
jar

                "org.apache.spark"         %% "spark-core"            % SPARK_VERSION   %
"provided",

                "org.apache.spark"         %% "spark-streaming"       % SPARK_VERSION   %
"provided",

                "org.apache.spark"         %% "spark-sql"             % SPARK_VERSION   %
"provided",

                "org.apache.spark"         %% "spark-hive"            % SPARK_VERSION   %
"provided",

                ("org.apache.spark"        %% "spark-streaming-kafka" % SPARK_VERSION).

                        exclude("org.apache.spark", "spark-core_2.10").

                        exclude("org.apache.spark", "spark-streaming_2.10").

                        exclude("org.apache.spark", "spark-sql_2.10").

                        exclude("javax.jms", "jms"),

                "org.apache.spark"         %% "spark-streaming"       % SPARK_VERSION   %
"test" classifier "tests",

                "com.typesafe"              % "config"                % "1.2.1",

                "com.typesafe.play"        %% "play-json"             % "2.3.4"

            ) ++ hadoopDeps ++ esHadoopDeps ++ jsonDeps ++ commonTestDeps ++ commonDeps)



resolvers ++= Seq(

  Resolver.sonatypeRepo("snapshots"),

  Resolver.sonatypeRepo("public"),

  "conjars.org<http://conjars.org>" at "http://conjars.org/repo",

  "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",

  "Spray Repository" at "http://repo.spray.cc/",

  "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",

  "Akka Repository" at "http://repo.akka.io/releases/",

  "Twitter4J Repository" at "http://twitter4j.org/maven2/",

  "Apache HBase" at "https://repository.apache.org/content/repositories/releases",

  "Twitter Maven Repo" at "http://maven.twttr.com/",

  "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",

  "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",

  "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/"

)



mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>

  {

    case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard

    case m if m.startsWith("META-INF") => MergeStrategy.discard

    case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first

    case PathList("org", "apache", xs @ _*) => MergeStrategy.first

    case PathList("org", "jboss", xs @ _*) => MergeStrategy.first

    case "about.html"  => MergeStrategy.rename

    case "reference.conf" => MergeStrategy.concat

    case _ => MergeStrategy.first

  }

}

Am I by chance missing an exclude that is bring in an older version of spark into the Assembly;
hmm need to go look at that.

I am using the SNAPSHOT build of elasticsearch-hadoop as it is built against 1.2.1 of spark.
 Per the elasticsearch-hadoop gradle.properties the spark version set to:

sparkVersion = 1.2.1

Other than possibly missing an exclude that is bring in an older version of Spark from some
where, I do see that I am referencing the "org.apache.hadoop" % "hadoop-client" % "2.6.0"
% "provided", but I don't think that is the issue.

Any other thoughts?

-Todd

On Wed, Mar 18, 2015 at 8:27 PM, Cheng, Hao <hao.cheng@intel.com<mailto:hao.cheng@intel.com>>
wrote:
Seems the elasticsearch-hadoop project was built with an old version of Spark, and then you
upgraded the Spark version in execution env, as I know the StructField changed the definition
in Spark 1.2, can you confirm the version problem first?

From: Todd Nist [mailto:tsindotg@gmail.com<mailto:tsindotg@gmail.com>]
Sent: Thursday, March 19, 2015 7:49 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: [SQL] Elasticsearch-hadoop, exception creating temporary table



I am attempting to access ElasticSearch and expose it’s data through SparkSQL using the
elasticsearch-hadoop project.  I am encountering the following exception when trying to create
a Temporary table from a resource in ElasticSearch.:

15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184
s

Create Temporary Table for querying

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V

at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)

at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)

at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)

at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)

at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33)

at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32)

at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:36)

at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20)

at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103)

at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67)

at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67)

at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)

at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)

at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:108)

at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)

at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87)

at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)



I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch cluster.
The mapping looks as follows:

radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping'

{"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}}

I can read the data just fine doing the following:

import java.io.File



import scala.collection.JavaConversions._



import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.SparkContext._

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{SchemaRDD,SQLContext}



// ES imports

import org.elasticsearch.spark._

import org.elasticsearch.spark.sql._



import io.radtech.spark.utils.{Settings, Spark, ElasticSearch}



object ElasticSearchReadWrite {



  /**

   * Spark specific configuration

   */

  def sparkInit(): SparkContext = {

    val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)

    conf.set("es.nodes", ElasticSearch.Nodes)

    conf.set("es.port", ElasticSearch.HttpPort.toString())

    conf.set("es.index.auto.create", "true");

    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

    conf.set("spark.executor.memory","1g")

    conf.set("spark.kryoserializer.buffer.mb","256")



    val sparkContext = new SparkContext(conf)



    sparkContext

  }



  def main(args: Array[String]) {



    val sc = sparkInit



    val sqlContext = new SQLContext(sc)

    import sqlContext._



    val start = System.currentTimeMillis()



    /*

     * Read from ES and query with with Spark & SparkSQL

     */

    val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}")



    esData.collect.foreach(println(_))



    val end = System.currentTimeMillis()

    println(s"Total time: ${end-start} ms")

This works fine and and prints the content of esData out as one would expect.

15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at ElasticSearchReadWrite.scala:67,
took 6.897443 s

(4,Map(employer -> Tourmania, city -> Eastvale, address -> 986 Wyckoff Avenue, state
-> HI, balance -> 27658, age -> 31, gender -> F, lastname -> Flores, email
-> rodriquezflores@tourmania.com<mailto:rodriquezflores@tourmania.com>, firstname
-> Rodriquez, account_number -> 4))

(9,Map(employer -> Cedward, city -> Olney, address -> 963 Neptune Avenue, state ->
OH, balance -> 24776, age -> 39, gender -> M, lastname -> Meadows, email ->
opalmeadows@cedward.com<mailto:opalmeadows@cedward.com>, firstname -> Opal, account_number
-> 9))

...

As does creating a new index and type like this:

    println("read json in and store in ES")

    // read in JSON and store in ES

    val path = "document.json"

    val rdd : SchemaRDD = sqlContext.jsonFile(path)



    rdd.saveToEs("myIndex/myDoc")

However, when I attempt to access the the table via the sqlContext like this I get the exception
shown above:

    println("Create Temporary Table for querying")



    val schemaRDD: SchemaRDD = sqlContext.sql(

          "CREATE TEMPORARY TABLE account    " +

          "USING org.elasticsearch.spark.sql " +

          "OPTIONS (resource 'bank/account')  " )

  }

}

I'm using ElasticSearch 1.4.4, spark-1.2.1-bin-hadoop2.4, and the elasticsearch-hadoop:

"org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT"

Any insight on what I am doing wrong?

TIA for the assistance.

-Todd

Mime
View raw message