spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pedro Rodriguez <ski.rodrig...@gmail.com>
Subject Re: Call Scala API from PySpark
Date Thu, 30 Jun 2016 21:43:00 GMT
That was indeed the case, using UTF8Deserializer makes everything work
correctly.

Thanks for the tips!

On Thu, Jun 30, 2016 at 3:32 PM, Pedro Rodriguez <ski.rodriguez@gmail.com>
wrote:

> Quick update, I was able to get most of the plumbing to work thanks to the
> code Holden posted and browsing more source code.
>
> I am running into this error which makes me think that maybe I shouldn't
> be leaving the default python RDD serializer/pickler in place and do
> something else
> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/rdd.py#L182:
> _pickle.UnpicklingError: A load persistent id instruction was encountered,
> but no persistent_load function was specified.
>
>
> On Thu, Jun 30, 2016 at 2:13 PM, Pedro Rodriguez <ski.rodriguez@gmail.com>
> wrote:
>
>> Thanks Jeff and Holden,
>>
>> A little more context here probably helps. I am working on implementing
>> the idea from this article to make reads from S3 faster:
>> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
>> (although my name is Pedro, I am not the author of the article). The reason
>> for wrapping SparkContext is so that the code change is from sc.textFile to
>> sc.s3TextFile in addition to configuring AWS keys correctly (seeing if we
>> can open source our library, but depends on company). Overall, its a very
>> light wrapper and perhaps calling it a context is not quite the right name
>> because of that.
>>
>> At the end of the day I make a sc.parallelize call and return an
>> RDD[String] as described in that blog post. I found a post from Py4J
>> mailing list that reminded my that the JVM gateway needs the jars in
>> spark.driver/executor.extraClassPath in addition to the spark.jars option.
>> With that, I can see the classes now. Looks like I need to do as you
>> suggest and wrap it using Java in order to go the last mile to calling the
>> method/constructor. I don't know yet how to get the RDD back to pyspark
>> though so any pointers on that would be great.
>>
>> Thanks for the tip on code Holden, I will take a look to see if that can
>> give me some insight on how to write the Python code part.
>>
>> Thanks!
>> Pedro
>>
>> On Thu, Jun 30, 2016 at 12:23 PM, Holden Karau <holden@pigscanfly.ca>
>> wrote:
>>
>>> So I'm a little biased - I think the bet bride between the two is using
>>> DataFrames. I've got some examples in my talk and on the high performance
>>> spark GitHub
>>> https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/simple_perf_test.py
>>> calls some custom scala code.
>>>
>>> Using a custom context is a bit trixie though because of how the
>>> launching is done, as Jeff Zhang points out you would need to wrap it in a
>>> JavaSparkContext and then you could override the _intialize_context
>>> function in context.py
>>>
>>> On Thu, Jun 30, 2016 at 11:06 AM, Jeff Zhang <zjffdu@gmail.com> wrote:
>>>
>>>> Hi Pedro,
>>>>
>>>> Your use case is interesting.  I think launching java gateway is the
>>>> same as native SparkContext, the only difference is on creating your custom
>>>> SparkContext instead of native SparkContext. You might also need to wrap
it
>>>> using java.
>>>>
>>>>
>>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py#L172
>>>>
>>>>
>>>>
>>>> On Thu, Jun 30, 2016 at 9:53 AM, Pedro Rodriguez <
>>>> ski.rodriguez@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have written a Scala package which essentially wraps the
>>>>> SparkContext around a custom class that adds some functionality specific
to
>>>>> our internal use case. I am trying to figure out the best way to call
this
>>>>> from PySpark.
>>>>>
>>>>> I would like to do this similarly to how Spark itself calls the JVM
>>>>> SparkContext as in:
>>>>> https://github.com/apache/spark/blob/v1.6.2/python/pyspark/context.py
>>>>>
>>>>> My goal would be something like this:
>>>>>
>>>>> Scala Code (this is done):
>>>>> >>> import com.company.mylibrary.CustomContext
>>>>> >>> val myContext = CustomContext(sc)
>>>>> >>> val rdd: RDD[String] = myContext.customTextFile("path")
>>>>>
>>>>> Python Code (I want to be able to do this):
>>>>> >>> from company.mylibrary import CustomContext
>>>>> >>> myContext = CustomContext(sc)
>>>>> >>> rdd = myContext.customTextFile("path")
>>>>>
>>>>> At the end of each code, I should be working with an ordinary
>>>>> RDD[String].
>>>>>
>>>>> I am trying to access my Scala class through sc._jvm as below, but not
>>>>> having any luck so far.
>>>>>
>>>>> My attempts:
>>>>> >>> a = sc._jvm.com.company.mylibrary.CustomContext
>>>>> >>> dir(a)
>>>>> ['<package or class name>']
>>>>>
>>>>> Example of what I want::
>>>>> >>> a = sc._jvm.PythonRDD
>>>>> >>> dir(a)
>>>>> ['anonfun$6', 'anonfun$8', 'collectAndServe',
>>>>> 'doubleRDDToDoubleRDDFunctions', 'getWorkerBroadcasts', 'hadoopFile',
>>>>> 'hadoopRDD', 'newAPIHadoopFile', 'newAPIHadoopRDD',
>>>>> 'numericRDDToDoubleRDDFunctions', 'rddToAsyncRDDActions',
>>>>> 'rddToOrderedRDDFunctions', 'rddToPairRDDFunctions',
>>>>> 'rddToPairRDDFunctions$default$4', 'rddToSequenceFileRDDFunctions',
>>>>> 'readBroadcastFromFile', 'readRDDFromFile', 'runJob',
>>>>> 'saveAsHadoopDataset', 'saveAsHadoopFile', 'saveAsNewAPIHadoopFile',
>>>>> 'saveAsSequenceFile', 'sequenceFile', 'serveIterator', 'valueOfPair',
>>>>> 'writeIteratorToStream', 'writeUTF']
>>>>>
>>>>> The next thing I would run into is converting the JVM RDD[String] back
>>>>> to a Python RDD, what is the easiest way to do this?
>>>>>
>>>>> Overall, is this a good approach to calling the same API in Scala and
>>>>> Python?
>>>>>
>>>>> --
>>>>> Pedro Rodriguez
>>>>> PhD Student in Distributed Machine Learning | CU Boulder
>>>>> UC Berkeley AMPLab Alumni
>>>>>
>>>>> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
>>>>> Github: github.com/EntilZha | LinkedIn:
>>>>> https://www.linkedin.com/in/pedrorodriguezscience
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>>
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience

Mime
View raw message