spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: how to increase threads per executor
Date Fri, 03 Jun 2016 11:03:09 GMT
The general way passing parameters to spark-submit are as follows (note
that I use a generic shell script to submit jobs). Replace ${JAR_FILE} with
appropriate values. In general you can pass all these driver-memory,
executor-memory to shell script as variables if you wish without hard
coding them

${SPARK_HOME}/bin/spark-submit \
                --packages com.databricks:spark-csv_2.11:1.3.0 \
                --master spark://50.140.197.217:7077 \
                --driver-memory 4G \
                --num-executors 5 \
                --executor-memory 4G \
                --executor-cores 2 \
                --conf "spark.scheduler.mode=FAIR" \
                --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps" \
                --jars
/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
                --class ${FILE_NAME} \
                --conf "spark.ui.port=55556" \
                --conf "spark.driver.port=54631" \
                --conf "spark.fileserver.port=54731" \
                --conf "spark.blockManager.port=54832" \
                --conf "spark.kryoserializer.buffer.max=512" \
                ${JAR_FILE} \
                >> ${LOG_FILE}

For twitter stuff pass twitter specific parameters after the ${JAR_FILE}

${SPARK_HOME}/bin/spark-submit \
                --packages com.databricks:spark-csv_2.11:1.3.0 \
                --driver-memory 2G \
                --num-executors 1 \
                --executor-memory 2G \
                --master local[2] \
                --executor-cores 2 \
                --conf "spark.scheduler.mode=FAIR" \
                --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps" \
                --jars
/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
                --class
"com.databricks.apps.twitter_classifier.${FILE_NAME}" \
                --conf "spark.ui.port=55555" \
                --conf "spark.driver.port=54631" \
                --conf "spark.fileserver.port=54731" \
                --conf "spark.blockManager.port=54832" \
                --conf "spark.kryoserializer.buffer.max=512" \
                ${JAR_FILE} \
                ${OUTPUT_DIRECTORY:-/tmp/tweets} \
                ${NUM_TWEETS_TO_COLLECT:-10000} \
                ${OUTPUT_FILE_INTERVAL_IN_SECS:-10} \
                ${OUTPUT_FILE_PARTITIONS_EACH_INTERVAL:-1} \
                >> ${LOG_FILE}

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 3 June 2016 at 09:10, Jacek Laskowski <jacek@japila.pl> wrote:

> --executor-cores 1 to be exact.
>
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> On Fri, Jun 3, 2016 at 12:28 AM, Mich Talebzadeh <
> mich.talebzadeh@gmail.com> wrote:
>
>> interesting. a vm with one core!
>>
>> one simple test
>>
>> can you try running with
>>
>> --executor-cores=1
>>
>> and see it works ok please
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 2 June 2016 at 23:15, Andres M Jimenez T <adrz1@hotmail.com> wrote:
>>
>>> Mich, thanks for your time,
>>>
>>>
>>> i am launching spark-submit as follows:
>>>
>>>
>>> bin/spark-submit --class com.example.SparkStreamingImpl --master
>>> spark://dev1.dev:7077 --verbose --driver-memory 1g --executor-memory 1g
>>> --conf "spark.driver.extraJavaOptions=-Dcom.sun.management.jmxremote
>>> -Dcom.sun.management.jmxremote.port=8090
>>> -Dcom.sun.management.jmxremote.rmi.port=8091
>>> -Dcom.sun.management.jmxremote.authenticate=false
>>> -Dcom.sun.management.jmxremote.ssl=false" --conf
>>> "spark.executor.extraJavaOptions=-Dcom.sun.management.jmxremote
>>> -Dcom.sun.management.jmxremote.port=8092
>>> -Dcom.sun.management.jmxremote.rmi.port=8093
>>> -Dcom.sun.management.jmxremote.authenticate=false
>>> -Dcom.sun.management.jmxremote.ssl=false" --conf
>>> "spark.scheduler.mode=FAIR" --conf /home/Processing.jar
>>>
>>>
>>> When i use --executor-cores=12 i get "Initial job has not accepted any
>>> resources; check your cluster UI to ensure that workers are registered and
>>> have sufficient resources".
>>>
>>>
>>> This, because my nodes are single core, but i want to use more than one
>>> thread per core, is this possible?
>>>
>>>
>>> root@dev1:/home/spark-1.6.1-bin-hadoop2.6# lscpu
>>> Architecture:          x86_64
>>> CPU op-mode(s):        32-bit, 64-bit
>>> Byte Order:            Little Endian
>>> CPU(s):                1
>>> On-line CPU(s) list:   0
>>> Thread(s) per core:    1
>>> Core(s) per socket:    1
>>> Socket(s):             1
>>> NUMA node(s):          1
>>> Vendor ID:             GenuineIntel
>>> CPU family:            6
>>> Model:                 58
>>> Model name:            Intel(R) Xeon(R) CPU E5-2690 v2 @ 3.00GHz
>>> Stepping:              0
>>> CPU MHz:               2999.999
>>> BogoMIPS:              5999.99
>>> Hypervisor vendor:     VMware
>>> Virtualization type:   full
>>> L1d cache:             32K
>>> L1i cache:             32K
>>> L2 cache:              256K
>>> L3 cache:              25600K
>>> NUMA node0 CPU(s):     0
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>> ------------------------------
>>> *From:* Mich Talebzadeh <mich.talebzadeh@gmail.com>
>>> *Sent:* Thursday, June 2, 2016 5:00 PM
>>> *To:* Andres M Jimenez T
>>> *Cc:* user@spark.apache.org
>>> *Subject:* Re: how to increase threads per executor
>>>
>>> What are passing as parameters to Spark-submit?
>>>
>>>
>>> ${SPARK_HOME}/bin/spark-submit \
>>>                 --executor-cores=12 \
>>>
>>> Also check
>>>
>>> http://spark.apache.org/docs/latest/configuration.html
>>> Configuration - Spark 1.6.1 Documentation
>>> <http://spark.apache.org/docs/latest/configuration.html>
>>> spark.apache.org
>>> Spark Configuration. Spark Properties. Dynamically Loading Spark
>>> Properties; Viewing Spark Properties; Available Properties. Application
>>> Properties; Runtime Environment
>>>
>>>
>>> Execution Behavior/spark.executor.cores
>>>
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 2 June 2016 at 17:29, Andres M Jimenez T <adrz1@hotmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>>
>>>> I am working with Spark 1.6.1, using kafka direct connect for streaming
>>>> data.
>>>>
>>>> Using spark scheduler and 3 slaves.
>>>>
>>>> Kafka topic is partitioned with a value of 10.
>>>>
>>>>
>>>> The problem i have is, there is only one thread per executor running my
>>>> function (logic implementation).
>>>>
>>>>
>>>> Can anybody tell me how can i increase threads per executor to get
>>>> better use of CPUs?
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>> Here is the code i have implemented:
>>>>
>>>>
>>>> *Driver*:
>>>>
>>>>
>>>> JavaStreamingContext ssc = new JavaStreamingContext(conf, new
>>>> Duration(10000));
>>>>
>>>> //prepare streaming from kafka
>>>>
>>>> Set<String> topicsSet = new
>>>> HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(",")));
>>>>
>>>> Map<String, String> kafkaParams = new HashMap<>();
>>>>
>>>> kafkaParams.put("metadata.broker.list", kafkaBrokers);
>>>>
>>>> kafkaParams.put("group.id", SparkStreamingImpl.class.getName());
>>>>
>>>>
>>>> JavaPairInputDStream<String, String> inputMessages =
>>>> KafkaUtils.createDirectStream(
>>>>
>>>> ssc,
>>>>
>>>> String.class,
>>>>
>>>> String.class,
>>>>
>>>> StringDecoder.class,
>>>>
>>>> StringDecoder.class,
>>>>
>>>> kafkaParams,
>>>>
>>>> topicsSet
>>>>
>>>> );
>>>>
>>>>
>>>> inputMessages.foreachRDD(new ForeachRDDFunction());
>>>>
>>>>
>>>> *ForeachFunction*:
>>>>
>>>>
>>>> class ForeachFunction implements VoidFunction<Tuple2<String, String>>
{
>>>>
>>>> private static final Counter foreachConcurrent =
>>>> ProcessingMetrics.metrics.counter( "foreach-concurrency" );
>>>>
>>>> public ForeachFunction() {
>>>>
>>>> LOG.info("Creating a new ForeachFunction");
>>>>
>>>> }
>>>>
>>>>
>>>> public void call(Tuple2<String, String> t) throws Exception {
>>>>
>>>> foreachConcurrent.inc();
>>>>
>>>> LOG.info("processing message [" + t._1() + "]");
>>>>
>>>> try {
>>>>
>>>> Thread.sleep(1000);
>>>>
>>>> } catch (Exception e) { }
>>>>
>>>> foreachConcurrent.dec();
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>>
>>>> *ForeachRDDFunction*:
>>>>
>>>>
>>>> class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String,
>>>> String>> {
>>>>
>>>> private static final Counter foreachRDDConcurrent =
>>>> ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" );
>>>>
>>>> private ForeachFunction foreachFunction = new ForeachFunction();
>>>>
>>>> public ForeachRDDFunction() {
>>>>
>>>> LOG.info("Creating a new ForeachRDDFunction");
>>>>
>>>> }
>>>>
>>>>
>>>> public void call(JavaPairRDD<String, String> t) throws Exception {
>>>>
>>>> foreachRDDConcurrent.inc();
>>>>
>>>> LOG.info("call from inputMessages.foreachRDD with [" +
>>>> t.partitions().size() + "] partitions");
>>>>
>>>> for (Partition p : t.partitions()) {
>>>>
>>>> if (p instanceof KafkaRDDPartition){
>>>>
>>>> LOG.info("partition [" + p.index() + "] with count [" +
>>>> ((KafkaRDDPartition) p).count() + "]");
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>> t.foreachAsync(foreachFunction);
>>>>
>>>> foreachRDDConcurrent.dec();
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>>
>>>> *The log from driver that tells me my RDD is partitioned to process in
>>>> parallel*:
>>>>
>>>>
>>>> [Stage 70:>  (3 + 3) / 20][Stage 71:>  (0 + 0) / 20][Stage 72:>
 (0 +
>>>> 0) / 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from
>>>> inputMessages.foreachRDD with [20] partitions
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count
>>>> [26]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count
>>>> [27]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count
>>>> [16]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count
>>>> [15]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0]
>>>>
>>>>
>>>> *The log from one of executors showing exactly one message per second
>>>> was processed (only by one thread)*:
>>>>
>>>>
>>>> 16/06/02 08:32:46 INFO SparkStreamingImpl: processing message
>>>> [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8]
>>>>
>>>> 16/06/02 08:32:47 INFO SparkStreamingImpl: processing message
>>>> [e267cde2-ffea-4f7a-9934-f32a3b7218cc]
>>>>
>>>> 16/06/02 08:32:48 INFO SparkStreamingImpl: processing message
>>>> [f055fe3c-0f72-4f41-9a31-df544f1e1cd3]
>>>>
>>>> 16/06/02 08:32:49 INFO SparkStreamingImpl: processing message
>>>> [854faaa5-0abe-49a2-b13a-c290a3720b0e]
>>>>
>>>> 16/06/02 08:32:50 INFO SparkStreamingImpl: processing message
>>>> [1bc0a141-b910-45fe-9881-e2066928fbc6]
>>>>
>>>> 16/06/02 08:32:51 INFO SparkStreamingImpl: processing message
>>>> [67fb99c6-1ca1-4dfb-bffe-43b927fdec07]
>>>>
>>>> 16/06/02 08:32:52 INFO SparkStreamingImpl: processing message
>>>> [de7d5934-bab2-4019-917e-c339d864ba18]
>>>>
>>>> 16/06/02 08:32:53 INFO SparkStreamingImpl: processing message
>>>> [e63d7a7e-de32-4527-b8f1-641cfcc8869c]
>>>>
>>>> 16/06/02 08:32:54 INFO SparkStreamingImpl: processing message
>>>> [1ce931ee-b8b1-4645-8a51-2c697bf1513b]
>>>>
>>>> 16/06/02 08:32:55 INFO SparkStreamingImpl: processing message
>>>> [5367f3c1-d66c-4647-bb44-f5eab719031d]
>>>>
>>>>
>>>
>>
>

Mime
View raw message