mahout-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jaume Galí <jg...@konodrac.com>
Subject Error spark-mahout when spark-submit mode cluster
Date Wed, 01 Aug 2018 13:54:11 GMT
Hi everybody, I'm trying to build a basic recomender with Spark and Mahout on Scala. I use
the follow mahout repo to compile mahout with scala 2.11 and spark 2.1.2 mahout_fork <https://github.com/actionml/mahout/tree/sparse-speedup-13.0>
To execute my code I use spark-submit and it run fine when I put --master
 local but when I try to run on a cluster with --master
 spark://vagrant-ubuntu-trusty-64:7077 <spark://vagrant-ubuntu-trusty-64:7077> it fails
always with the same error.

Command (Run Fine):

/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master local \
target/scala-2.11/recomender-0.0.1.jar
Command (ERROR):

/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master spark <spark://vagrant-ubuntu-trusty-64:7077>: <spark://vagrant-ubuntu-trusty-64:7077>//vagrant-ubuntu-trusty-64:7077
<spark://vagrant-ubuntu-trusty-64:7077> \
target/scala-2.11/recomender-0.0.1.jar
Dependencies on Build.sbt :

name := "recomender"
version := "0.0.1"
scalaVersion := "2.11.11"
val mahoutVersion = "0.13.0"
val sparkVersion = "2.1.2"

libraryDependencies ++= {
  Seq(
    "org.apache.spark"        %% "spark-core" % sparkVersion % "provided" ,
    "org.apache.spark"        %% "spark-sql" % sparkVersion % "provided" ,
    "org.apache.spark"        %% "spark-mllib" % sparkVersion % "provided",
    /* Mahout */
    "org.apache.mahout" %% "mahout-spark" % mahoutVersion
      exclude("org.apache.spark", "spark-core_2.11")
      exclude("org.apache.spark", "spark-sql_2.11"),
    "org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
    "org.apache.mahout" % "mahout-math" % mahoutVersion,
    "org.apache.mahout" % "mahout-hdfs" % mahoutVersion
      exclude("com.thoughtworks.xstream", "xstream")
      exclude("org.apache.hadoop", "hadoop-client")
  )
}

resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo"
resolvers += Resolver.mavenLocal

…
Main class:

package com.reco

import org.apache.mahout.sparkbindings.SparkDistributedContext
import org.apache.mahout.sparkbindings._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object GenerateIndicator {

  def main(args: Array[String]) {
    try {

      // Create spark-conf
      val sparkConf = new SparkConf().setAppName("recomender")

      implicit val mahoutCtx: SparkDistributedContext = mahoutSparkContext(
        masterUrl = sparkConf.get("spark.master"),
        appName = "recomender",
        sparkConf = sparkConf,
        // addMahoutJars = true,
        addMahoutJars = false
      )

      implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx)

      val sparkSession = SparkSession
        .builder()
        .appName("recomender")
        .config(sparkConf)
        .getOrCreate()

      val lines = returnData()

      val linesRdd = sdc.sc.parallelize(lines)

      println("...Collecting...")

      linesRdd.collect().foreach( item => {  // ERROR HERE! on collect()
        println(item)
      })

      // Destroy Spark Session
      sparkSession.stop()
      sparkSession.close()

    } catch {
      case e: Exception =>
        println(e)
        throw new Exception(e)

    }

  }

  def returnData() : Array[String] = {
    val lines = Array(
      "17,Action",
      "17,Comedy",
      "17,Crime",
      "17,Horror",
      "17,Thriller",
      "12,Crime",
      "12,Thriller",
      "16,Comedy",
      "16,Romance",
      "20,Drama",
      "20,Romance",
      "7,Drama",
      "7,Sci-Fi",
      // ... more lines in array ...
      "1680,Drama",
      "1680,Romance",
      "1681,Comedy"
    )
    lines
  }

}
Error::

18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at GenerateIndicator.scala:38)
failed in 3.551 s due to Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times,
most recent failure: Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException:
unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7) on 10.0.2.15, executor
0: java.lang.IllegalStateException (unread block data) [duplicate 7]
18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed,
from pool 
18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at GenerateIndicator.scala:38,
took 5.265593 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed
4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException:
unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Thanks a lot for your time.
Cheers.
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message