spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Issue while calling foreach in Pyspark
Date Fri, 07 May 2021 20:08:31 GMT
I have suspicion that this may be caused by your cluster as it appears that
you are running this in YARN mode like below

spark-submit --master yarn --deploy-mode client xyx.py

What happens if you try running it in local mode?

spark-submit --master local[2] xyx.py

Is this run in a managed cluster like GCP dataproc?

HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 7 May 2021 at 19:17, rajat kumar <kumar.rajat20del@gmail.com> wrote:

> Thanks Mich and Sean for the response . Yes Sean is right. This is a batch
> job.
>
>   I am having only 10 records in the dataframe still it is giving this
> exception
>
> Following are the full logs.
>
> File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line
> 584, in foreach
>     self.rdd.foreach(f)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 789, in
> foreach
>     self.mapPartitions(processPartition).count()  # Force evaluation
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in
> count
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in
> sum
>     return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in
> fold
>     vals = self.mapPartitions(func).collect()
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in
> collect
>     sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>   File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 1257, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63,
> in deco
>     return f(*a, **kw)
>   File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line
> 328, in get_return_value
>     format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage
> 3.0 (TID 10, 10.244.158.5, executor 1):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 364, in
> main
>     func, profiler, deserializer, serializer = read_command(pickleSer,
> infile)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in
> read_command
>     command = serializer._read_with_length(file)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
> 172, in _read_with_length
>     return self.loads(obj)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
> 580, in loads
>     return pickle.loads(obj, encoding=encoding)
>   File
> "/opt/dataflow/python/lib/python3.6/site-packages/module/read_data.py",
> line 10, in <module>
>     spark = SparkSession.builder.appName("test").getOrCreate()
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line
> 173, in getOrCreate
>     sc = SparkContext.getOrCreate(sparkConf)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367,
> in getOrCreate
>     SparkContext(conf=conf or SparkConf())
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 133,
> in __init__
>     SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 316,
> in _ensure_initialized
>     SparkContext._gateway = gateway or launch_gateway(conf)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
> 46, in launch_gateway
>     return _launch_gateway(conf)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
> 108, in _launch_gateway
>     raise Exception("Java gateway process exited before sending its port
> number")
> Exception: Java gateway process exited before sending its port number
>
> On Fri, May 7, 2021 at 9:35 PM Sean Owen <srowen@gmail.com> wrote:
>
>> foreach definitely works :)
>> This is not a streaming question.
>> The error says that the JVM worker died for some reason. You'd have to
>> look at its logs to see why.
>>
>> On Fri, May 7, 2021 at 11:03 AM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am not convinced foreach works even in 3.1.1
>>> Try doing the same with foreachBatch
>>>
>>>                      foreachBatch(sendToSink). \
>>>                     trigger(processingTime='2 seconds'). \
>>>
>>> and see it works
>>>
>>> HTH
>>>
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 7 May 2021 at 16:07, rajat kumar <kumar.rajat20del@gmail.com>
>>> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I am using Spark 2.4.4 with Python
>>>>
>>>> While using below line:
>>>>
>>>> dataframe.foreach(lambda record : process_logs(record))
>>>>
>>>>
>>>> My use case is , process logs will download the file from cloud storage
>>>> using Python code and then it will save the processed data.
>>>>
>>>> I am getting the following error
>>>>
>>>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py",
>>>> line 46, in launch_gateway
>>>>     return _launch_gateway(conf)
>>>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py",
>>>> line 108, in _launch_gateway
>>>>     raise Exception("Java gateway process exited before sending its
>>>> port number")
>>>> Exception: Java gateway process exited before sending its port number
>>>>
>>>> Can anyone pls suggest what can be done?
>>>>
>>>> Thanks
>>>> Rajat
>>>>
>>>

Mime
View raw message