spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Standalone cluster node utilization
Date Fri, 15 Jul 2016 07:28:28 GMT
With standalone cluster you have one driver on edge (where you are running
spark-submit) and multiple executors each on different nodes assuming you
have started your slaves with enough workers.

One test would be if you try another code line to see if process is
parallelised with each executor running the same code on a different subset
of data.

Or try to do away with temp table and do it through functional programming?

val s =
HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
val rs = s.join(t,s("time_id")===t("time_id"),
"fullouter").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))

See whether that uses all executors?

HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 July 2016 at 06:28, Jakub Stransky <stransky.ja@gmail.com> wrote:

> Hi Mich,
>
> sorry that was probably a typo or I messed it up.  I am running spark
> cluster in standalone mode with master + 6 worker machines. Each worker
> machine has one executor on it.
> Hope that now clarifies, so it is not one machine with six executors.
>
> The problem is that when I try to read dataset it is get loaded just by
> one executor holding all the blocks which effectively means that data are
> not distributed.
>
> The way how we read the data is
>     val df_init = sqlContext.read
>       .jdbc(
>         url = Configuration.dbUrl,
>         table = Configuration.dbTable,
>         prop
>       )
>
>     df_init.registerTempTable("df_init")
>
> That is essentially one issue - not this distribution of the data.
>
> Next strange behavior comes after. Lets state it as a fact that all blocks
> are located on one node so after the column selection and filtering we try
> repartition data in hope that this will cause to redistribute the data. But
> that doesn't happen either - all the RDD blocks still reside on the source
> server where the data get loaded. Our code is following:
>
>     val df_init = sqlContext.read
>       .jdbc(
>         url = Configuration.dbUrl,
>         table = Configuration.dbTable,
>         prop
>       )
>
>     df_init.registerTempTable("df_init")
>
>     val df = (if (Configuration.dataSubset) {
>       val (loadingCondition, TypeId) = if (args.length > 1) {
>         (args(1), args(2))
>       }
>       else
>         (Configuration.dataCondition, Configuration.dataType)
>
>       sqlContext.sql(
>         s"""SELECT  statmement ... Condition = '$Condition'""".stripMargin)
>     } else {
>       df_init
>     }).*repartition*(Configuration.appPartitioning)
>
> df.persist()
>
>  We would expect that df should be spread across the nodes but that is not
> the case and all data are still located on one server. I am not sure if
> that is somehow related to temTable usage or ... I was convinced that
> repartition will cause reshuffle accross the nodes but that is not true in
> this case. We are using number of partition 96 as we have 6 machines  with
> 8 cores.
>
> Anybody see any issue here?
>
> Thanks
> Jakub
>
>
> On 14 July 2016 at 23:07, Mich Talebzadeh <mich.talebzadeh@gmail.com>
> wrote:
>
>> Thanks for details
>>
>> You mentioned
>>
>> "I have a spark  cluster running in a single mode, master + 6 executors."
>>
>> Do you mean running in a single NODE?
>>
>> JDBC read
>>
>> This one reads from Oracle table
>>
>> val c = HiveContext.load("jdbc",
>> Map("url" -> _ORACLEserver,
>> "dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC
>> FROM sh.channels)",
>> "user" -> _username_ORACLE,
>> "password" -> _password_ORACLE))
>>
>> c.registerTempTable("t_c")
>>
>> Then basically you run some SQL on the temp table df_ini. BTW personally
>> to avoid ambiguity I would call temp table different from its df something
>> like t_fd_init (just a suggestion)
>>
>> If you are running a single node then what is the evidence that all those
>> executors are needed. Have you tried caching the temp table before running
>> SQL?
>>
>> //Cache the table
>> HiveContext.cacheTable("df_ini")
>>
>> If you are running sql on temp table then you are just creating multiple
>> smaller result sets and logically depending on the size of the result set,
>> there may not be a need for much parallel processing. A serial scan may be
>> sufficient.
>>
>> Although with my tables cached I see all workers are doing something
>>
>> [image: Inline images 2]
>>
>> So really difficult to answer
>>
>> HTH
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 14 July 2016 at 18:31, Jakub Stransky <stransky.ja@gmail.com> wrote:
>>
>>> HI Talebzadeh,
>>>
>>> sorry I forget to answer last part of your question:
>>>
>>> At O/S level you should see many CoarseGrainedExecutorBackend through
>>> jps each corresponding to one executor. Are they doing anything?
>>>
>>> There is one worker with one executor bussy and the rest is almost idle:
>>>
>>>   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+
>>> COMMAND
>>>  9305 spark     20   0 30.489g 5.075g  22256 S  * 0.3 18.5*   0:36.25
>>> java
>>>
>>> The only one - bussy one is running all 8cores machine
>>>
>>>   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+
>>> COMMAND
>>>  9580 zdata     20   0 29.664g 0.021t   6836 S* 676.7 79.4*  40:08.61
>>> java
>>>
>>>
>>> Thanks
>>> Jakub
>>>
>>> On 14 July 2016 at 19:22, Jakub Stransky <stransky.ja@gmail.com> wrote:
>>>
>>>> HI Talebzadeh,
>>>>
>>>> we are using 6 worker machines - running.
>>>>
>>>> We are reading the data through sqlContext (data frame) as it is
>>>> suggested in the documentation over the JdbcRdd
>>>>
>>>> prop just specifies name, password, and driver class.
>>>>
>>>> Right after this data load we register it as a temp table
>>>>
>>>>     val df_init = sqlContext.read
>>>>       .jdbc(
>>>>         url = Configuration.dbUrl,
>>>>         table = Configuration.dbTable,
>>>>         prop
>>>>       )
>>>>
>>>>     df_init.registerTempTable("df_init")
>>>>
>>>> Afterwords we do some data filtering, column selection and filtering
>>>> some rows with sqlContext.sql ("select statement here")
>>>>
>>>> and after this selection we try to repartition the data in order to get
>>>> them distributed across the cluster and that seems it is not working. And
>>>> then we persist that filtered and selected dataFrame.
>>>>
>>>> And the desired state should be filtered dataframe should be
>>>> distributed accross the nodes in the cluster.
>>>>
>>>> Jakub
>>>>
>>>>
>>>>
>>>> On 14 July 2016 at 19:03, Mich Talebzadeh <mich.talebzadeh@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Jakub,
>>>>>
>>>>> Sounds like one executor. Can you point out:
>>>>>
>>>>>
>>>>>    1. The number of slaves/workers you are running
>>>>>    2. Are you using JDBC to read data in?
>>>>>    3. Do you register DF as temp table and if so have you cached temp
>>>>>    table
>>>>>
>>>>> Sounds like only one executor is active and the rest are sitting idele.
>>>>>
>>>>> At O/S level you should see many CoarseGrainedExecutorBackend through
>>>>> jps each corresponding to one executor. Are they doing anything?
>>>>>
>>>>> HTH
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>> On 14 July 2016 at 17:18, Jakub Stransky <stransky.ja@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have a spark  cluster running in a single mode, master + 6
>>>>>> executors.
>>>>>>
>>>>>> My application is reading a data from database via DataFrame.read
>>>>>> then there is a filtering of rows. After that I re-partition data
and I
>>>>>> wonder why on the executors page of the driver UI I see RDD blocks
all
>>>>>> allocated still on single executor machine
>>>>>>
>>>>>> [image: Inline images 1]
>>>>>> As highlighted on the picture above. I did expect that after
>>>>>> re-partition the data will be shuffled across cluster but that is
obviously
>>>>>> not happening here.
>>>>>>
>>>>>> I can understand that database read is happening in non-parallel
>>>>>> fashion but re-partition  should fix it as far as I understand.
>>>>>>
>>>>>> Could someone experienced clarify that?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Jakub Stransky
>>>> cz.linkedin.com/in/jakubstransky
>>>>
>>>>
>>>
>>>
>>> --
>>> Jakub Stransky
>>> cz.linkedin.com/in/jakubstransky
>>>
>>>
>>
>
>
> --
> Jakub Stransky
> cz.linkedin.com/in/jakubstransky
>
>

Mime
View raw message