spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patrick McCarthy <pmccar...@dstillery.com.INVALID>
Subject Re: How to make pyspark use custom python?
Date Thu, 06 Sep 2018 14:07:27 GMT
It looks like for whatever reason your cluster isn't using the python you
distributed, or said distribution doesn't contain what you think.

I've used the following with success to deploy a conda environment to my
cluster at runtime:
https://henning.kropponline.de/2016/09/24/running-pyspark-with-conda-env/

On Thu, Sep 6, 2018 at 2:58 AM, Hyukjin Kwon <gurwls223@gmail.com> wrote:

> Are you doubly sure if it is an issue in Spark? I used custom python
> several times with setting it in PYSPARK_PYTHON before and it was no
> problem.
>
> 2018년 9월 6일 (목) 오후 2:21, mithril <twinmegami@gmail.com>님이 작성:
>
>> For better looking , please see
>> https://stackoverflow.com/questions/52178406/howto-make-
>> pyspark-use-custom-python
>> <https://stackoverflow.com/questions/52178406/howto-make-
>> pyspark-use-custom-python>
>>
>> ----------------------
>>
>>
>> I am using zeppelin connect remote spark cluster.
>>
>> remote spark is using system python 2.7 .
>>
>> I want to switch to miniconda3, install a lib pyarrow.
>> What I do is :
>>
>> 1. Download miniconda3, install some libs, scp miniconda3 folder to spark
>> master and slaves.
>> 2. adding `PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"` to
>> `spark-env.sh` in spark master and slaves.
>> 3. restart spark and zeppelin
>> 4. Running code
>>
>>     %spark.pyspark
>>
>>         import pandas as pd
>>         from pyspark.sql.functions import pandas_udf,PandasUDFType
>>
>>
>>         @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
>>         def process_order_items(pdf):
>>
>>             pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count']
>>
>>             d = {'has_discount':'count',
>>                 'clearance':'count',
>>                 'count': ['count', 'sum'],
>>                 'price_guide':'max',
>>                 'total_price': 'sum'
>>
>>             }
>>
>>             pdf1 = pdf.groupby('day').agg(d)
>>             pdf1.columns = pdf1.columns.map('_'.join)
>>             d1 = {'has_discount_count':'discount_order_count',
>>                 'clearance_count':'clearance_order_count',
>>                 'count_count':'order_count',
>>                 'count_sum':'sale_count',
>>                 'price_guide_max':'price_guide',
>>                 'total_price_sum': 'total_price'
>>             }
>>
>>             pdf2 = pdf1.rename(columns=d1)
>>
>>             pdf2.loc[:, 'discount_sale_count'] =
>> pdf.loc[pdf.has_discount>0,
>> 'count'].resample(freq).sum()
>>             pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0,
>> 'count'].resample(freq).sum()
>>             pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count
>>
>>             pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index)
>>
>>             return pdf2
>>
>>
>>         results = df.groupby("store_id",
>> "product_id").apply(process_order_items)
>>
>>         results.select(['store_id', 'price']).show(5)
>>
>>
>> Got error :
>>
>>     Py4JJavaError: An error occurred while calling o172.showString.
>>     : org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in
>> stage 6.0 (TID 143, 10.104.33.18, executor 2):
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>       File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 230, in main
>>         process()
>>       File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 225, in process
>>         serializer.dump_stream(func(split_index, iterator), outfile)
>>       File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> line
>> 150, in <lambda>
>>         func = lambda _, it: map(mapper, it)
>>       File "/usr/local/spark/python/lib/pyspark.zip/pyspark/
>> serializers.py",
>> line 276, in load_stream
>>         import pyarrow as pa
>>     ImportError: No module named pyarrow
>>
>>
>> `10.104.33.18` is spark master,  so I think the `PYSPARK_PYTHON` is not
>> set
>> correctly .
>>
>> `pyspark`
>>
>> I login to master and slaves, run `pyspark interpreter` in each, and found
>> `import pyarrow` do not throw exception .
>>
>>
>> PS: `pyarrow` also installed in the machine which running zeppelin.
>>
>> --------------
>>
>> More info:
>>
>>
>> 1. spark cluster is installed in A, B, C , zeppelin is installed in D.
>> 2. `PYSPARK_PYTHON` is set in `spark-env.sh` in each A, B, C
>> 3. `import pyarrow` is fine with `/usr/local/spark/bin/pyspark` in A, B
>> ,C /
>> 4. `import pyarrow` is fine on A, B ,C custom python(miniconda3)
>> 5. `import pyarrow` is fine on D's default python(miniconda3, path is
>> different with A, B ,C , but it is doesn't matter)
>>
>>
>>
>> So I completely coundn't understand why it doesn't work.
>>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>

Mime
View raw message