spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sean Owen <so...@cloudera.com>
Subject Re: JavaStreamingContextFactory checkpoint directory NotSerializableException
Date Thu, 06 Nov 2014 14:08:24 GMT
Erm, you are trying to do all the work in the create() method. This is
definitely not what you want to do. It is just supposed to make the
JavaSparkStreamingContext. A further problem is that you're using
anonymous inner classes, which are non-static and contain a reference
to the outer class. The closure cleaner can sometimes get rid of that,
but perhaps not here. Consider a static inner class if you can't
resolve it other ways. There is probably however at least another
issue in this code ...

On Thu, Nov 6, 2014 at 1:43 PM, Vasu C <vasuc.bigdata@gmail.com> wrote:
> HI Sean,
>
> Below is my java code and using spark 1.1.0. Still getting the same error.
> Here Bean class is serialized. Not sure where exactly is the problem.
> What am I doing wrong here ?
>
> public class StreamingJson {
> public static void main(String[] args) throws Exception {
> final String HDFS_FILE_LOC = args[0];
> final String IMPALA_TABLE_LOC = args[1];
> final String TEMP_TABLE_NAME = args[2];
> final String HDFS_CHECKPOINT_DIR = args[3];
>
> JavaStreamingContextFactory contextFactory = new
> JavaStreamingContextFactory() {
> public JavaStreamingContext create() {
> SparkConf sparkConf = new SparkConf().setAppName(
> "test").set("spark.cores.max", "3");
>
> final JavaStreamingContext jssc = new JavaStreamingContext(
> sparkConf, new Duration(500));
>
> final JavaHiveContext javahiveContext = new JavaHiveContext(
> jssc.sc());
>
> javahiveContext.createParquetFile(Bean.class,
> IMPALA_TABLE_LOC, true, new Configuration())
> .registerTempTable(TEMP_TABLE_NAME);
>
> final JavaDStream<String> textFileStream = jssc
> .textFileStream(HDFS_FILE_LOC);
>
> textFileStream
> .foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {
>
> @Override
> public Void call(JavaRDD<String> rdd, Time time)
> throws Exception {
> if (rdd != null) {
> if (rdd.count() > 0) {
> JavaSchemaRDD schRdd = javahiveContext
> .jsonRDD(rdd);
> schRdd.insertInto(TEMP_TABLE_NAME);
> }
> }
> return null;
> }
> });
> jssc.checkpoint(HDFS_CHECKPOINT_DIR);
> return jssc;
> }
> };
> JavaStreamingContext context = JavaStreamingContext.getOrCreate(
> HDFS_CHECKPOINT_DIR, contextFactory);
> context.start(); // Start the computation
> context.awaitTermination();
> }
> }
>
>
>
> Regards,
>    Vasu C
>
> On Thu, Nov 6, 2014 at 1:33 PM, Sean Owen <sowen@cloudera.com> wrote:
>>
>> No, not the same thing then. This just means you accidentally have a
>> reference to the unserializable enclosing test class in your code.
>> Just make sure the reference is severed.
>>
>> On Thu, Nov 6, 2014 at 8:00 AM, Vasu C <vasuc.bigdata@gmail.com> wrote:
>> > Thanks for pointing to the issue.
>> >
>> > Yes I think its the same issue, below is Exception
>> >
>> >
>> > ERROR OneForOneStrategy: TestCheckpointStreamingJson$1
>> > java.io.NotSerializableException: TestCheckpointStreamingJson
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message