spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gaurav sharma <>
Subject How to pass arguments dynamically, that needs to be used in executors
Date Thu, 11 Jun 2015 12:23:26 GMT

I am using Kafka Spark cluster for real time aggregation analytics use case
in production.

Cluster details
6 nodes, each node running 1 Spark and kafka processes each.
Node1              -> 1 Master , 1 Worker, 1 Driver,
   1 Kafka process
Node 2,3,4,5,6 -> 1 Worker prcocess each                                  1
Kafka process each

Spark version 1.3.0
Kafka Veriosn 0.8.1

I am using Kafka Directstream for Kafka Spark Integration.
Analytics code is written in using Spark Java API.

Problem Statement :

      I want to accept a paramter as command line argument, and pass on to
the executors.
      (want to use the paramter in rdd.foreach method which is executed on

      I understand that when driver is started, only the jar is transported
to all the Workers.
      But i need to use the dynamically passed command line argument in the
reduce operation executed on executors.

Code Snippets for better understanding my problem :

public class KafkaReconcilationJob {

private static Logger logger =
 public static void main(String[] args) throws Exception {
                          CommandLineArguments.CLICK_THRESHOLD =
-------------------------------------------------------> I want to use this
command line argument


JavaRDD<AggregatedAdeStats> adeAggregatedFilteredData =
adeAudGeoAggDataRdd.filter(new Function<AggregatedAdeStats, Boolean>() {
public Boolean call(AggregatedAdeStats adeAggregatedObj) throws Exception {
if(adeAggregatedObj.getImpr() > CommandLineArguments.IMPR_THRESHOLD ||
adeAggregatedObj.getClick() > CommandLineArguments.CLICK_THRESHOLD){
return true;
}else {
return false;

The above mentioned Filter operation gets executed on executor which has 0
as the value of the static field CommandLineArguments.CLICK_THRESHOLD


View raw message