Hi,
I am using Kafka Spark cluster for real time aggregation analytics use case
in production.
Cluster details
6 nodes, each node running 1 Spark and kafka processes each.
Node1 -> 1 Master , 1 Worker, 1 Driver,
1 Kafka process
Node 2,3,4,5,6 -> 1 Worker prcocess each 1
Kafka process each
Spark version 1.3.0
Kafka Veriosn 0.8.1
I am using Kafka Directstream for Kafka Spark Integration.
Analytics code is written in using Spark Java API.
Problem Statement :
I want to accept a paramter as command line argument, and pass on to
the executors.
(want to use the paramter in rdd.foreach method which is executed on
executor)
I understand that when driver is started, only the jar is transported
to all the Workers.
But i need to use the dynamically passed command line argument in the
reduce operation executed on executors.
Code Snippets for better understanding my problem :
public class KafkaReconcilationJob {
private static Logger logger =
Logger.getLogger(KafkaReconcilationJob.class);
public static void main(String[] args) throws Exception {
CommandLineArguments.CLICK_THRESHOLD =
Integer.parseInt(args[12]);
-------------------------------------------------------> I want to use this
command line argument
}
}
JavaRDD<AggregatedAdeStats> adeAggregatedFilteredData =
adeAudGeoAggDataRdd.filter(new Function<AggregatedAdeStats, Boolean>() {
@Override
public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception {
if(adeAggregatedObj.getImpr() > CommandLineArguments.IMPR_THRESHOLD ||
adeAggregatedObj.getClick() > CommandLineArguments.CLICK_THRESHOLD){
return true;
}else {
return false;
}
}
});
The above mentioned Filter operation gets executed on executor which has 0
as the value of the static field CommandLineArguments.CLICK_THRESHOLD
Regards,
Gaurav
|