Here is another transformation that might cause the error but it has to be one of these two since I only have two transformations

jsonMessagesDStream
.window(new Duration(60000), new Duration(1000))
.mapToPair(new PairFunction<String, String, Long>() {
@Override
public Tuple2<String, Long> call(String s) throws Exception {
//System.out.println(s + " *****************************");
JsonParser parser = new JsonParser();
JsonObject jsonObj = parser.parse(s).getAsJsonObject();

if (jsonObj != null && jsonObj.has("var1")) {
JsonObject jsonObject = jsonObj.get("var1").getAsJsonObject();
if (jsonObject != null && jsonObject.has("var2") && jsonObject.get("var2").getAsBoolean() && jsonObject.has("var3") ) {
long num = jsonObject.get("var3").getAsLong();
                        return new Tuple2<String, Long>("var3", num);
}
}

return new Tuple2<String, Long>("var3", 0L);
}
}).reduceByKey(new Function2<Long, Long, Long>() {
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1+v2;
}
}).foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
@Override
public void call(JavaPairRDD<String, Long> stringIntegerJavaPairRDD) throws Exception {
Map<String, Long> map = new HashMap<>();
Gson gson = new Gson();
stringIntegerJavaPairRDD
.collect()
.forEach((Tuple2<String, Long> KV) -> {
String status = KV._1();
Long count = KV._2();
map.put(status, count);
}
);
NSQReceiver.send(producer, "dashboard", gson.toJson(map).getBytes());
}
});

On Wed, Nov 30, 2016 at 10:40 PM, kant kodali <kanth909@gmail.com> wrote:
Hi Marco,


Here is what my code looks like 

Config config = new Config("hello");
SparkConf sparkConf = config.buildSparkConfig();
sparkConf.setJars(JavaSparkContext.jarOfClass(Driver.class));
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(config.getSparkStremingBatchInterval()));
ssc.sparkContext().setLogLevel("ERROR");

NSQReceiver sparkStreamingReceiver = new NSQReceiver(config, "input_test");
JavaReceiverInputDStream<String> jsonMessagesDStream = ssc.receiverStream(sparkStreamingReceiver);

NSQProducer producer = new NSQProducer()
.addAddress(config.getServerConfig().getProperty("NSQD_IP"), Integer.parseInt(config.getServerConfig().getProperty("NSQD_PORT")))
.start();

jsonMessagesDStream
.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
JsonParser parser = new JsonParser();
JsonObject jsonObj = parser.parse(s).getAsJsonObject();
if (jsonObj != null && jsonObj.has("var1") ) {
JsonObject transactionObject = jsonObj.get("var1").getAsJsonObject();
if(transactionObject != null && transactionObject.has("var2")) {
String key = transactionObject.get("var2").getAsString();
return new Tuple2<>(key, 1);
}
}
return new Tuple2<>("", 0);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
}).foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
@Override
public void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRDD) throws Exception {
Map<String, Integer> map = new HashMap<>();
Gson gson = new Gson();
stringIntegerJavaPairRDD
.collect()
.forEach((Tuple2<String, Integer> KV) -> {
String status = KV._1();
Integer count = KV._2();
map.put(status, count);
}
);
NSQReceiver.send(producer, "output_777", gson.toJson(map).getBytes());
}
});

Thanks,
kant

On Wed, Nov 30, 2016 at 2:11 PM, Marco Mistroni <mmistroni@gmail.com> wrote:

Could you paste reproducible snippet code?
Kr


On 30 Nov 2016 9:08 pm, "kant kodali" <kanth909@gmail.com> wrote:
I have lot of these exceptions happening 

java.lang.Exception: Could not compute split, block input-0-1480539568000 not found


Any ideas what this could be?