spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Will Briggs <wrbri...@gmail.com>
Subject Re: SparkContext & Threading
Date Fri, 05 Jun 2015 21:05:54 GMT
Your lambda expressions on the RDDs in the SecondRollup class are closing around the context,
and Spark has special logic to ensure that all variables in a closure used on an RDD are Serializable
- I hate linking to Quora, but there's a good explanation here: http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark


On June 5, 2015, at 4:14 PM, Lee McFadden <spleeman@gmail.com> wrote:



On Fri, Jun 5, 2015 at 12:58 PM Marcelo Vanzin <vanzin@cloudera.com> wrote:

You didn't show the error so the only thing we can do is speculate. You're probably sending
the object that's holding the SparkContext reference over  the network at some point (e.g.
it's used by a task run in an executor), and that's why you were getting that exception.


Apologies - the full error is as follows.  All I did here was remove the @transient annotation
from the sc variable in my class constructor.  In addition, the full code for the classes
and launching process is included below.


Error traceback:

```

Exception in thread "pool-5-thread-1" java.lang.Error: org.apache.spark.SparkException: Task
not serializable

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.SparkException: Task not serializable

        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

        at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)

        at org.apache.spark.rdd.RDD.map(RDD.scala:288)

        at io.icebrg.analytics.spark.SecondRollup.run(ConnTransforms.scala:33)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        ... 2 more

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)

        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)

        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)

        ... 7 more 

```


Code:

```

class SecondRollup(sc: SparkContext, connector: CassandraConnector, scanPoint: DateTime) extends
Runnable with Serializable {


  def run {

    val conn = sc.cassandraTable("alpha_test", "sensor_readings")

      .select("data")

      .where("timep = ?", scanPoint)

      .where("sensorid IN ?", System.sensors)

      .map(r => Json.parse(r.getString("data")))

      .cache()


    conn.flatMap(AffectedIp.fromJson)

      .map(a => (AffectedIp.key(a), a))

      .reduceByKey(AffectedIp.reduce)

      .map(_._2)

      .map(a => AffectedIp.reduceWithCurrent(connector, a))

      .saveToCassandra("alpha_test", "affected_hosts")


    conn.flatMap(ServiceSummary.fromnJson)

      .map(s => (ServiceSummary.key(s), s))

      .reduceByKey(ServiceSummary.reduce)

      .map(_._2)

      .saveToCassandra("alpha_test", "service_summary_rollup")


  }

}


object Transforms {

  private val appNameBase = "Transforms%s"

  private val dtFormatter = DateTimeFormat.forPattern("yyyyMMddHH")


  def main(args: Array[String]) {

    if (args.size < 2) {

      println("""Usage: ConnTransforms <start> <end>

        <start>     DateTime to start processing at. Format: yyyyMMddHH

        <end>       DateTime to end processing at.  Format: yyyyMMddHH""")

      sys.exit(1)

    }


    // withZoneRetainFields gives us a UTC time as specified on the command line.

    val start = dtFormatter.parseDateTime(args(0)).withZoneRetainFields(DateTimeZone.UTC)

    val end = dtFormatter.parseDateTime(args(1)).withZoneRetainFields(DateTimeZone.UTC)


    println("Processing rollups from %s to %s".format(start, end))


    // Create the spark context.

    val conf = new SparkConf()

      .setAppName(appNameBase.format("Test"))


    val connector = CassandraConnector(conf)


    val sc = new SparkContext(conf)


    // Set up the threadpool for running Jobs.

    val pool = Executors.newFixedThreadPool(10)


    pool.execute(new SecondRollup(sc, connector, start))

    

    //for (dt <- new TimeRanger(start, end)) {

    //  // Always run the second rollups.

    //  pool.execute(new SecondRollup(sc, connector, dt))

    //  if (Granularity.Minute.isGranularity(dt)) pool.execute(new MinuteRollup(sc, connector,
dt))

    //}


    // stop the pool from accepting new tasks

    pool.shutdown()


    // We've submitted all the tasks.

    while (!pool.isTerminated()) pool.awaitTermination(10, TimeUnit.SECONDS)

  }

}

```

Mime
View raw message