spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Darin McBeath <ddmcbe...@yahoo.com.INVALID>
Subject Re: spark 1.6 foreachPartition only appears to be running on one executor
Date Fri, 11 Mar 2016 19:35:52 GMT
ok, some more information (and presumably a workaround).

when I initial read in my file, I use the following code.

JavaRDD<String> keyFileRDD = sc.textFile(keyFile)

Looking at the UI, this file has 2 partitions (both on the same executor).

I then subsequently repartition this RDD (to 16)

partKeyFileRDD = keyFileRDD.repartition(16)

Looking again at the UI, this file has 16 partitions now (all on the same executor). When
the forEachPartition runs, this then uses these 16 partitions (all on the same executor).
 I think this is really the problem.  I'm not sure why the repartition didn't spread the partitions
across both executors.

When the mapToPair subsequently runs below both executors are used and things start falling
apart because none of the initialization logic was performed on the one executor.

However, if I modify the code above 

JavaRDD<String> keyFileRDD = sc.textFile(keyFile,16)

Then initial keyFileRDD will be in 16 partitions spread across both executors.  When I execute
my forEachPartition directly on keyFileRDD (since there is no need to repartition), both executors
will now be used (and initialized).

Anyway, don't know if this is my lack of understanding for how repartition should work or
if this is a bug.  Thanks Jacek for starting to dig into this.

Darin.



----- Original Message -----
From: Darin McBeath <ddmcbeath@yahoo.com.INVALID>
To: Jacek Laskowski <jacek@japila.pl>
Cc: user <user@spark.apache.org>
Sent: Friday, March 11, 2016 1:57 PM
Subject: Re: spark 1.6 foreachPartition only appears to be running on one executor

My driver code has the following:

// Init S3 (workers) so we can read the assets
partKeyFileRDD.foreachPartition(new SimpleStorageServiceInit(arg1, arg2, arg3));
// Get the assets.  Create a key pair where the key is asset id and the value is the rec.
JavaPairRDD<String,String> seqFileRDD = partKeyFileRDD.mapToPair(new SimpleStorageServiceAsset());

The worker then has the following.  The issue I believe is that the following log.info statements
only appear in the log file for one of my executors (and not both).  In other words, when
executing the forEachPartition above, Spark appears to think all of the partitions are on
one executor (at least that is the impression I'm left with).  But, when I get to the mapToToPair,
Spark suddenly begins to use both executors.  I have verified that there are 16 partitions
for partKeyFileRDD.



public class SimpleStorageServiceInit implements VoidFunction<Iterator<String>>
 {

privateString arg1;
private String arg2;
private String arg3;

public SimpleStorageServiceInit(arg1, String arg2, String arg3) {
this.arg1 = arg1;
this.arg2= arg2;
this.arg3 = arg3;
log.info("SimpleStorageServiceInit constructor");
log.info("SimpleStorageServiceInit constructor arg1: "+ arg1);
log.info("SimpleStorageServiceInit constructor arg2:"+ arg2);
log.info("SimpleStorageServiceInit constructor arg3: "+ arg3);
}

@Override
public void call(Iterator<String> arg) throws Exception {
log.info("SimpleStorageServiceInit call");
log.info("SimpleStorageServiceInit call arg1: "+ arg1);
log.info("SimpleStorageServiceInit call arg2:"+ arg2);
log.info("SimpleStorageServiceInit call arg3: "+ arg3);
SimpleStorageService.init(this.arg1, this.arg2, this.arg3);
}
}

________________________________
From: Jacek Laskowski <jacek@japila.pl>
To: Darin McBeath <ddmcbeath@yahoo.com> 
Cc: user <user@spark.apache.org>
Sent: Friday, March 11, 2016 1:40 PM
Subject: Re: spark 1.6 foreachPartition only appears to be running on one executor



Hi, 
Could you share the code with foreachPartition? 
Jacek 
11.03.2016 7:33 PM "Darin McBeath" <ddmcbeath@yahoo.com> napisał(a):


>
>I can verify this by looking at the log file for the workers.
>
>Since I output logging statements in the object called by the foreachPartition, I can
see the statements being logged. Oddly, these output statements only occur in one executor
(and not the other).  It occurs 16 times in this executor  since there are 16 partitions.
 This seems odd as there are only 8 cores on the executor and the other executor doesn't appear
to be called at all in the foreachPartition call.  But, when I go to do a map function on
this same RDD then things start blowing up on the other executor as it starts doing work for
some partitions (although, it would appear that all partitions were only initialized on the
other executor). The executor that was used in the foreachPartition call works fine and doesn't
experience issue.  But, because the other executor is failing on every request the job dies.
>
>Darin.
>
>
>________________________________
>From: Jacek Laskowski <jacek@japila.pl>
>To: Darin McBeath <ddmcbeath@yahoo.com>
>Cc: user <user@spark.apache.org>
>Sent: Friday, March 11, 2016 1:24 PM
>Subject: Re: spark 1.6 foreachPartition only appears to be running on one executor
>
>
>
>Hi,
>How do you check which executor is used? Can you include a screenshot of the master's
webUI with workers?
>Jacek
>11.03.2016 6:57 PM "Darin McBeath" <ddmcbeath@yahoo.com.invalid> napisał(a):
>
>I've run into a situation where it would appear that foreachPartition is only running
on one of my executors.
>>
>>I have a small cluster (2 executors with 8 cores each).
>>
>>When I run a job with a small file (with 16 partitions) I can see that the 16 partitions
are initialized but they all appear to be initialized on only one executor.  All of the work
then runs on this  one executor (even though the number of partitions is 16). This seems odd,
but at least it works.  Not sure why the other executor was not used.
>>
>>However, when I run a larger file (once again with 16 partitions) I can see that the
16 partitions are initialized once again (but all on the same executor).  But, this time subsequent
work is now spread across the 2 executors.  This of course results in problems because the
other executor was not initialized as all of the partitions were only initialized on the other
executor.
>>
>>Does anyone have any suggestions for where I might want to investigate?  Has anyone
else seen something like this before?  Any thoughts/insights would be appreciated.  I'm using
the Stand Alone Cluster manager, cluster started with the spark ec2 scripts  and submitting
my job using spark-submit.
>>
>>Thanks.
>>
>>Darin.
>>
>>---------------------------------------------------------------------
>>To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>For additional commands, e-mail: user-help@spark.apache.org

>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message