spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gourav Sengupta <gourav.sengu...@gmail.com>
Subject Re: How to pass sparkSession from driver to executor
Date Tue, 03 Apr 2018 17:04:34 GMT
Hi,

I think that what you are facing is documented in SPARK:
http://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-


May I ask what are you trying to achieve here? From what I understand, you
have a list of JSON files which you want to read separately, as they have
different schemas. Is that right?

Can you please let me know the following details as well:
1. SPARK Version
2. The full code
3. The path to the files


Regards,
Gourav Sengupta

On Thu, Sep 21, 2017 at 4:55 PM, ayan guha <guha.ayan@gmail.com> wrote:

> The point here is - spark session is not available in executors. So, you
> have to use appropriate storage clients.
>
> On Fri, Sep 22, 2017 at 1:44 AM, lucas.gary@gmail.com <
> lucas.gary@gmail.com> wrote:
>
>> I'm not sure what you're doing.  But I have in the past used spark to
>> consume a manifest file and then execute a .mapPartition on the result like
>> this:
>>
>>
>> def map_key_to_event(s3_events_data_lake):
>>
>>     def _map_key_to_event(event_key_list, s3_client=test_stub):
>>         print("Events in list")
>>         start = time.time()
>>
>>         return_list = []
>>
>>         if s3_client is None:
>>             s3_client = boto3.Session().client('s3')
>>
>>         for event_key in event_key_list:
>>           try:
>>             response = s3_client.get_object(Bucket=s3_events_data_lake,
>> Key=event_key)
>>             contents = response['Body'].read().decode('utf-8')
>>             entity = json.loads(contents)
>>             event_type = json.loads(entity["Message"])["type"]
>>             entity["Message"] = json.loads(entity["Message"])
>>             # json.dumps here because Spark doesn't have a good json
>> datatype.
>>             return_list.append((event_type, json.dumps(entity)))
>>           except Exception:
>>             print("Key: {k} did not yield a valid object:
>> {o}".format(k=event_key, o=contents))
>>
>>         end = time.time()
>>         print('time elapsed:')
>>         print(end - start)
>>
>>         return return_list
>>
>>     return _map_key_to_event
>>
>>
>> pkeys = spark.context.parallelize(full_list_for_time_slice, 32)
>> print("partitions: ")
>> print(pkeys.getNumPartitions())
>> events = pkeys.mapPartitions(map_func)
>>
>>
>>
>>
>>
>> In this case I'm loading heterogeneous json files with wildly different
>> schemas, then saving them into parquet file / event type (IE turning one
>> big heterogeneous dump into numerous smaller homogenous dumps)
>>
>> I'm sure this isn't the only or even best way to do it.
>>
>> The underlying issue is that you're trying to violate the programming
>> model.  The model in this case consists of telling the driver what you want
>> and then having the executors go do it.
>>
>> Spark Context is a driver level abstraction, it kind of doesn't make
>> sense in the executor context, the executor is acting on behalf of the
>> driver and shouldn't need a back reference to it.  You'd end up with some
>> interesting execution graphs.
>>
>> This is a common pattern in spark as far as I can tell.  IE calling a map
>> and and then doing something with the items in the executor, either
>> computing or enriching.  My case above is a bit weird and I'm not certain
>> it's the right mechanism in that I'm literally taking a manifest file and
>> turning it into 'n' actual records.
>>
>> Also, if you're going to be constructing a connection string / jdbc call
>> / s3 client... You really don't want to use a straight .map(func).  You'll
>> end up instantiating a connection on every iteration.
>>
>> Hope this is somewhat helpful.
>>
>> Gary
>>
>> On 21 September 2017 at 06:31, Weichen Xu <weichen.xu@databricks.com>
>> wrote:
>>
>>> Spark do not allow executor code using `sparkSession`.
>>> But I think you can move all json files to one directory, and them run:
>>>
>>> ```
>>> spark.read.json("/path/to/jsonFileDir")
>>> ```
>>> But if you want to get filename at the same time, you can use
>>> ```
>>> spark.sparkContext.wholeTextFiles("/path/to/jsonFileDir")...
>>> ```
>>>
>>> On Thu, Sep 21, 2017 at 9:18 PM, Riccardo Ferrari <ferrarir@gmail.com>
>>> wrote:
>>>
>>>> Depends on your use-case however broadcasting
>>>> <https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables>
>>>> could be a better option.
>>>>
>>>> On Thu, Sep 21, 2017 at 2:03 PM, Chackravarthy Esakkimuthu <
>>>> chaku.mitcs@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I want to know how to pass sparkSession from driver to executor.
>>>>>
>>>>> I have a spark program (batch job) which does following,
>>>>>
>>>>> #################
>>>>>
>>>>> val spark = SparkSession.builder().appName("SampleJob").config("spark.
>>>>> master", "local") .getOrCreate()
>>>>>
>>>>> val df = this is dataframe which has list of file names (hdfs)
>>>>>
>>>>> df.foreach { fileName =>
>>>>>
>>>>>       *spark.read.json(fileName)*
>>>>>
>>>>>       ...... some logic here....
>>>>> }
>>>>>
>>>>> #################
>>>>>
>>>>>
>>>>> *spark.read.json(fileName) --- this fails as it runs in executor. When
>>>>> I put it outside foreach, i.e. in driver, it works.*
>>>>>
>>>>> As I am trying to use spark (sparkSession) in executor which is not
>>>>> visible outside driver. But I want to read hdfs files inside foreach,
how
>>>>> do I do it.
>>>>>
>>>>> Can someone help how to do this.
>>>>>
>>>>> Thanks,
>>>>> Chackra
>>>>>
>>>>
>>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Mime
View raw message