spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Rosen <rosenvi...@gmail.com>
Subject Re: [PySpark]: reading arbitrary Hadoop InputFormats
Date Thu, 31 Oct 2013 18:11:05 GMT
Hi Nick,

This is a nice start.  I'd prefer to keep the Java sequenceFileAsText() and
newHadoopFileAsText() methods inside PythonRDD instead of adding them to
JavaSparkContext, since I think these methods are unlikely to be used
directly by Java users (you can add these methods to the PythonRDD
companion object, which is how readRDDFromPickleFile is implemented:
https://github.com/apache/incubator-spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L255
)

For MsgPack, the UnpicklingError is because the Python worker expects to
receive its input in a pickled format.  In my prototype of custom
serializers, I modified the PySpark worker to receive its
serialization/deserialization function as input (
https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/worker.py#L41)
and added logic to pass the appropriate serializers based on each stage's
input and output formats (
https://github.com/JoshRosen/spark/blob/59b6b43916dc84fc8b83f22eb9ce13a27bc51ec0/python/pyspark/rdd.py#L42
).

At some point, I'd like to port my custom serializers code to PySpark; if
anyone's interested in helping, I'd be glad to write up some additional
notes on how this should work.

- Josh

On Wed, Oct 30, 2013 at 2:25 PM, Nick Pentreath <nick.pentreath@gmail.com>wrote:

> Thanks Josh, Patrick for the feedback.
>
> Based on Josh's pointers I have something working for JavaPairRDD ->
> PySpark RDD[(String, String)]. This just calls the toString method on each
> key and value as before, but without the need for a delimiter. For
> SequenceFile, it uses SequenceFileAsTextInputFormat which itself calls
> toString to convert to Text for keys and values. We then call toString
> (again) ourselves to get Strings to feed to writeAsPickle.
>
> Details here: https://gist.github.com/MLnick/7230588
>
> This also illustrates where the "wrapper function" api would fit in. All
> that is required is to define a T => String for key and value.
>
> I started playing around with MsgPack and can sort of get things to work in
> Scala, but am struggling with getting the raw bytes to be written properly
> in PythonRDD (I think it is treating them as pickled byte arrays when they
> are not, but when I removed the 'stripPickle' calls and amended the length
> (-6) I got "UnpicklingError: invalid load key, ' '. ").
>
> Another issue is that MsgPack does well at writing "structures" - like Java
> classes with public fields that are fairly simple - but for example the
> Writables have private fields so you end up with nothing being written.
> This looks like it would require custom "Templates" (serialization
> functions effectively) for many classes, which means a lot of custom code
> for a user to write to use it. Fortunately for most of the common Writables
> a toString does the job. Will keep looking into it though.
>
> Anyway, Josh if you have ideas or examples on the "Wrapper API from Python"
> that you mentioned, I'd be interested to hear them.
>
> If you think this is worth working up as a Pull Request covering
> SequenceFiles and custom InputFormats with default toString conversions and
> the ability to specify Wrapper functions, I can clean things up more, add
> some functionality and tests, and also test to see if common things like
> the "normal" Writables and reading from things like HBase and Cassandra can
> be made to work nicely (any other common use cases that you think make
> sense?).
>
> Thoughts, comments etc welcome.
>
> Nick
>
>
>
> On Fri, Oct 25, 2013 at 11:03 PM, Patrick Wendell <pwendell@gmail.com
> >wrote:
>
> > As a starting point, a version where people just write their own
> "wrapper"
> > functions to convert various HadoopFiles into String <K, V> files could
> go
> > a long way. We could even have a few built-in versions, such as dealing
> > with Sequence files that are <String, String>. Basically, the user needs
> to
> > write a translator in Java/Scala that produces textual records from
> > whatever format that want. Then, they make sure this is included in the
> > classpath when running PySpark.
> >
> > As Josh is saying, I'm pretty sure this is already possible, but we may
> > want to document it for users. In many organizations they might have 1-2
> > people who can write the Java/Scala to do this but then many more people
> > who are comfortable using python once it's setup.
> >
> > - Patrick
> >
> > On Fri, Oct 25, 2013 at 11:00 AM, Josh Rosen <rosenville@gmail.com>
> wrote:
> >
> > > Hi Nick,
> > >
> > > I've seen several requests for SequenceFile support in PySpark, so
> > there's
> > > definitely demand for this feature.
> > >
> > > I like the idea of passing MsgPack'ed data (or some other structured
> > > format) from Java to the Python workers.  My early prototype of custom
> > > serializers (described at
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals#PySparkInternals-customserializers
> > > )
> > > might be useful for implementing this.  Proper custom serializer
> support
> > > would handle the bookkeeping for tracking each stage's input and output
> > > formats and supplying the appropriate deserialization functions to the
> > > Python worker, so the Python worker would be able to directly read the
> > > MsgPack'd data that's sent to it.
> > >
> > > Regarding a wrapper API, it's actually possible to initially transform
> > data
> > > using Scala/Java and perform the remainder of the processing in
> PySpark.
> > >  This involves adding the appropriate compiled to the Java classpath
> and
> > a
> > > bit of work in Py4J to create the Java/Scala RDD and wrap it for use by
> > > PySpark.  I can hack together a rough example of this if anyone's
> > > interested, but it would need some work to be developed into a
> > > user-friendly API.
> > >
> > > If you wanted to extend your proof-of-concept to handle the cases where
> > > keys and values have parseable toString() values, I think you could
> > remove
> > > the need for a delimiter by creating a PythonRDD from the newHadoopFile
> > > JavaPairRDD and adding a new method to writeAsPickle (
> > >
> > >
> >
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L224
> > > )
> > > to dump its contents as a pickled pair of strings.  (Aside: most of
> > > writeAsPickle() would probably need be eliminated or refactored when
> > adding
> > > general custom serializer support).
> > >
> > > - Josh
> > >
> > > On Thu, Oct 24, 2013 at 11:18 PM, Nick Pentreath
> > > <nick.pentreath@gmail.com>wrote:
> > >
> > > > Hi Spark Devs
> > > >
> > > > I was wondering what appetite there may be to add the ability for
> > PySpark
> > > > users to create RDDs from (somewhat) arbitrary Hadoop InputFormats.
> > > >
> > > > In my data pipeline for example, I'm currently just using Scala
> (partly
> > > > because I love it but also because I am heavily reliant on quite
> custom
> > > > Hadoop InputFormats for reading data). However, many users may prefer
> > to
> > > > use PySpark as much as possible (if not for everything). Reasons
> might
> > > > include the need to use some Python library. While I don't do it
> yet, I
> > > can
> > > > certainly see an attractive use case for using say scikit-learn /
> numpy
> > > to
> > > > do data analysis & machine learning in Python. Added to this my
> > cofounder
> > > > knows Python well but not Scala so it can be very beneficial to do a
> > lot
> > > of
> > > > stuff in Python.
> > > >
> > > > For text-based data this is fine, but reading data in from more
> complex
> > > > Hadoop formats is an issue.
> > > >
> > > > The current approach would of course be to write an ETL-style
> > Java/Scala
> > > > job and then process in Python. Nothing wrong with this, but I was
> > > thinking
> > > > about ways to allow Python to access arbitrary Hadoop InputFormats.
> > > >
> > > > Here is a quick proof of concept:
> > https://gist.github.com/MLnick/7150058
> > > >
> > > > This works for simple stuff like SequenceFile with simple Writable
> > > > key/values.
> > > >
> > > > To work with more complex files, perhaps an approach is to manipulate
> > > > Hadoop JobConf via Python and pass that in. The one downside is of
> > course
> > > > that the InputFormat (well actually the Key/Value classes) must have
> a
> > > > toString that makes sense so very custom stuff might not work.
> > > >
> > > > I wonder if it would be possible to take the objects that are yielded
> > via
> > > > the InputFormat and convert them into some representation like
> > ProtoBuf,
> > > > MsgPack, Avro, JSON, that can be read relatively more easily from
> > Python?
> > > >
> > > > Another approach could be to allow a simple "wrapper API" such that
> one
> > > can
> > > > write a wrapper function T => String and pass that into an
> > > > InputFormatWrapper that takes an arbitrary InputFormat and yields
> > Strings
> > > > for the keys and values. Then all that is required is to compile that
> > > > function and add it to the SPARK_CLASSPATH and away you go!
> > > >
> > > > Thoughts?
> > > >
> > > > Nick
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message