spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jacek Laskowski <ja...@japila.pl>
Subject Re: how to increase threads per executor
Date Fri, 03 Jun 2016 08:10:06 GMT
--executor-cores 1 to be exact.


Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Fri, Jun 3, 2016 at 12:28 AM, Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> interesting. a vm with one core!
>
> one simple test
>
> can you try running with
>
> --executor-cores=1
>
> and see it works ok please
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 2 June 2016 at 23:15, Andres M Jimenez T <adrz1@hotmail.com> wrote:
>
>> Mich, thanks for your time,
>>
>>
>> i am launching spark-submit as follows:
>>
>>
>> bin/spark-submit --class com.example.SparkStreamingImpl --master
>> spark://dev1.dev:7077 --verbose --driver-memory 1g --executor-memory 1g
>> --conf "spark.driver.extraJavaOptions=-Dcom.sun.management.jmxremote
>> -Dcom.sun.management.jmxremote.port=8090
>> -Dcom.sun.management.jmxremote.rmi.port=8091
>> -Dcom.sun.management.jmxremote.authenticate=false
>> -Dcom.sun.management.jmxremote.ssl=false" --conf
>> "spark.executor.extraJavaOptions=-Dcom.sun.management.jmxremote
>> -Dcom.sun.management.jmxremote.port=8092
>> -Dcom.sun.management.jmxremote.rmi.port=8093
>> -Dcom.sun.management.jmxremote.authenticate=false
>> -Dcom.sun.management.jmxremote.ssl=false" --conf
>> "spark.scheduler.mode=FAIR" --conf /home/Processing.jar
>>
>>
>> When i use --executor-cores=12 i get "Initial job has not accepted any
>> resources; check your cluster UI to ensure that workers are registered and
>> have sufficient resources".
>>
>>
>> This, because my nodes are single core, but i want to use more than one
>> thread per core, is this possible?
>>
>>
>> root@dev1:/home/spark-1.6.1-bin-hadoop2.6# lscpu
>> Architecture:          x86_64
>> CPU op-mode(s):        32-bit, 64-bit
>> Byte Order:            Little Endian
>> CPU(s):                1
>> On-line CPU(s) list:   0
>> Thread(s) per core:    1
>> Core(s) per socket:    1
>> Socket(s):             1
>> NUMA node(s):          1
>> Vendor ID:             GenuineIntel
>> CPU family:            6
>> Model:                 58
>> Model name:            Intel(R) Xeon(R) CPU E5-2690 v2 @ 3.00GHz
>> Stepping:              0
>> CPU MHz:               2999.999
>> BogoMIPS:              5999.99
>> Hypervisor vendor:     VMware
>> Virtualization type:   full
>> L1d cache:             32K
>> L1i cache:             32K
>> L2 cache:              256K
>> L3 cache:              25600K
>> NUMA node0 CPU(s):     0
>>
>>
>> Thanks
>>
>>
>>
>> ------------------------------
>> *From:* Mich Talebzadeh <mich.talebzadeh@gmail.com>
>> *Sent:* Thursday, June 2, 2016 5:00 PM
>> *To:* Andres M Jimenez T
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: how to increase threads per executor
>>
>> What are passing as parameters to Spark-submit?
>>
>>
>> ${SPARK_HOME}/bin/spark-submit \
>>                 --executor-cores=12 \
>>
>> Also check
>>
>> http://spark.apache.org/docs/latest/configuration.html
>> Configuration - Spark 1.6.1 Documentation
>> <http://spark.apache.org/docs/latest/configuration.html>
>> spark.apache.org
>> Spark Configuration. Spark Properties. Dynamically Loading Spark
>> Properties; Viewing Spark Properties; Available Properties. Application
>> Properties; Runtime Environment
>>
>>
>> Execution Behavior/spark.executor.cores
>>
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 2 June 2016 at 17:29, Andres M Jimenez T <adrz1@hotmail.com> wrote:
>>
>>> Hi,
>>>
>>>
>>> I am working with Spark 1.6.1, using kafka direct connect for streaming
>>> data.
>>>
>>> Using spark scheduler and 3 slaves.
>>>
>>> Kafka topic is partitioned with a value of 10.
>>>
>>>
>>> The problem i have is, there is only one thread per executor running my
>>> function (logic implementation).
>>>
>>>
>>> Can anybody tell me how can i increase threads per executor to get
>>> better use of CPUs?
>>>
>>>
>>> Thanks
>>>
>>>
>>> Here is the code i have implemented:
>>>
>>>
>>> *Driver*:
>>>
>>>
>>> JavaStreamingContext ssc = new JavaStreamingContext(conf, new
>>> Duration(10000));
>>>
>>> //prepare streaming from kafka
>>>
>>> Set<String> topicsSet = new
>>> HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(",")));
>>>
>>> Map<String, String> kafkaParams = new HashMap<>();
>>>
>>> kafkaParams.put("metadata.broker.list", kafkaBrokers);
>>>
>>> kafkaParams.put("group.id", SparkStreamingImpl.class.getName());
>>>
>>>
>>> JavaPairInputDStream<String, String> inputMessages =
>>> KafkaUtils.createDirectStream(
>>>
>>> ssc,
>>>
>>> String.class,
>>>
>>> String.class,
>>>
>>> StringDecoder.class,
>>>
>>> StringDecoder.class,
>>>
>>> kafkaParams,
>>>
>>> topicsSet
>>>
>>> );
>>>
>>>
>>> inputMessages.foreachRDD(new ForeachRDDFunction());
>>>
>>>
>>> *ForeachFunction*:
>>>
>>>
>>> class ForeachFunction implements VoidFunction<Tuple2<String, String>>
{
>>>
>>> private static final Counter foreachConcurrent =
>>> ProcessingMetrics.metrics.counter( "foreach-concurrency" );
>>>
>>> public ForeachFunction() {
>>>
>>> LOG.info("Creating a new ForeachFunction");
>>>
>>> }
>>>
>>>
>>> public void call(Tuple2<String, String> t) throws Exception {
>>>
>>> foreachConcurrent.inc();
>>>
>>> LOG.info("processing message [" + t._1() + "]");
>>>
>>> try {
>>>
>>> Thread.sleep(1000);
>>>
>>> } catch (Exception e) { }
>>>
>>> foreachConcurrent.dec();
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> *ForeachRDDFunction*:
>>>
>>>
>>> class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String,
>>> String>> {
>>>
>>> private static final Counter foreachRDDConcurrent =
>>> ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" );
>>>
>>> private ForeachFunction foreachFunction = new ForeachFunction();
>>>
>>> public ForeachRDDFunction() {
>>>
>>> LOG.info("Creating a new ForeachRDDFunction");
>>>
>>> }
>>>
>>>
>>> public void call(JavaPairRDD<String, String> t) throws Exception {
>>>
>>> foreachRDDConcurrent.inc();
>>>
>>> LOG.info("call from inputMessages.foreachRDD with [" +
>>> t.partitions().size() + "] partitions");
>>>
>>> for (Partition p : t.partitions()) {
>>>
>>> if (p instanceof KafkaRDDPartition){
>>>
>>> LOG.info("partition [" + p.index() + "] with count [" +
>>> ((KafkaRDDPartition) p).count() + "]");
>>>
>>> }
>>>
>>> }
>>>
>>> t.foreachAsync(foreachFunction);
>>>
>>> foreachRDDConcurrent.dec();
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> *The log from driver that tells me my RDD is partitioned to process in
>>> parallel*:
>>>
>>>
>>> [Stage 70:>  (3 + 3) / 20][Stage 71:>  (0 + 0) / 20][Stage 72:>  (0
+ 0)
>>> / 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from
>>> inputMessages.foreachRDD with [20] partitions
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count [26]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count [27]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count [16]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count [15]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0]
>>>
>>>
>>> *The log from one of executors showing exactly one message per second
>>> was processed (only by one thread)*:
>>>
>>>
>>> 16/06/02 08:32:46 INFO SparkStreamingImpl: processing message
>>> [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8]
>>>
>>> 16/06/02 08:32:47 INFO SparkStreamingImpl: processing message
>>> [e267cde2-ffea-4f7a-9934-f32a3b7218cc]
>>>
>>> 16/06/02 08:32:48 INFO SparkStreamingImpl: processing message
>>> [f055fe3c-0f72-4f41-9a31-df544f1e1cd3]
>>>
>>> 16/06/02 08:32:49 INFO SparkStreamingImpl: processing message
>>> [854faaa5-0abe-49a2-b13a-c290a3720b0e]
>>>
>>> 16/06/02 08:32:50 INFO SparkStreamingImpl: processing message
>>> [1bc0a141-b910-45fe-9881-e2066928fbc6]
>>>
>>> 16/06/02 08:32:51 INFO SparkStreamingImpl: processing message
>>> [67fb99c6-1ca1-4dfb-bffe-43b927fdec07]
>>>
>>> 16/06/02 08:32:52 INFO SparkStreamingImpl: processing message
>>> [de7d5934-bab2-4019-917e-c339d864ba18]
>>>
>>> 16/06/02 08:32:53 INFO SparkStreamingImpl: processing message
>>> [e63d7a7e-de32-4527-b8f1-641cfcc8869c]
>>>
>>> 16/06/02 08:32:54 INFO SparkStreamingImpl: processing message
>>> [1ce931ee-b8b1-4645-8a51-2c697bf1513b]
>>>
>>> 16/06/02 08:32:55 INFO SparkStreamingImpl: processing message
>>> [5367f3c1-d66c-4647-bb44-f5eab719031d]
>>>
>>>
>>
>

Mime
View raw message