spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mithril <twinmeg...@gmail.com>
Subject How to make pyspark use custom python?
Date Thu, 06 Sep 2018 06:21:36 GMT
For better looking , please see 
https://stackoverflow.com/questions/52178406/howto-make-pyspark-use-custom-python
<https://stackoverflow.com/questions/52178406/howto-make-pyspark-use-custom-python>
 

----------------------


I am using zeppelin connect remote spark cluster.

remote spark is using system python 2.7 .

I want to switch to miniconda3, install a lib pyarrow. 
What I do is :

1. Download miniconda3, install some libs, scp miniconda3 folder to spark
master and slaves.
2. adding `PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"` to
`spark-env.sh` in spark master and slaves.
3. restart spark and zeppelin
4. Running code 

    %spark.pyspark
        
        import pandas as pd
        from pyspark.sql.functions import pandas_udf,PandasUDFType
        
        
        @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
        def process_order_items(pdf):
        
            pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count']
            
            d = {'has_discount':'count',
                'clearance':'count',
                'count': ['count', 'sum'],
                'price_guide':'max',
                'total_price': 'sum'
                
            }
            
            pdf1 = pdf.groupby('day').agg(d)
            pdf1.columns = pdf1.columns.map('_'.join)
            d1 = {'has_discount_count':'discount_order_count',
                'clearance_count':'clearance_order_count',
                'count_count':'order_count',
                'count_sum':'sale_count',
                'price_guide_max':'price_guide',
                'total_price_sum': 'total_price'
            }
                
            pdf2 = pdf1.rename(columns=d1)
        
            pdf2.loc[:, 'discount_sale_count'] = pdf.loc[pdf.has_discount>0,
'count'].resample(freq).sum()
            pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0,
'count'].resample(freq).sum()
            pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count
        
            pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index)
            
            return pdf2
        
        
        results = df.groupby("store_id",
"product_id").apply(process_order_items)
        
        results.select(['store_id', 'price']).show(5)


Got error :

    Py4JJavaError: An error occurred while calling o172.showString.
    : org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in
stage 6.0 (TID 143, 10.104.33.18, executor 2):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
230, in main
        process()
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
225, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
150, in <lambda>
        func = lambda _, it: map(mapper, it)
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py",
line 276, in load_stream
        import pyarrow as pa
    ImportError: No module named pyarrow


`10.104.33.18` is spark master,  so I think the `PYSPARK_PYTHON` is not set
correctly . 

`pyspark`

I login to master and slaves, run `pyspark interpreter` in each, and found
`import pyarrow` do not throw exception .


PS: `pyarrow` also installed in the machine which running zeppelin. 

--------------

More info:


1. spark cluster is installed in A, B, C , zeppelin is installed in D.
2. `PYSPARK_PYTHON` is set in `spark-env.sh` in each A, B, C 
3. `import pyarrow` is fine with `/usr/local/spark/bin/pyspark` in A, B ,C /
4. `import pyarrow` is fine on A, B ,C custom python(miniconda3)
5. `import pyarrow` is fine on D's default python(miniconda3, path is
different with A, B ,C , but it is doesn't matter)



So I completely coundn't understand why it doesn't work. 







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message