spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matei Zaharia <matei.zaha...@gmail.com>
Subject Re: example of non-line oriented input data?
Date Wed, 19 Mar 2014 00:14:37 GMT
BTW one other thing — in your experience, Diana, which non-text InputFormats would be most
useful to support in Python first? Would it be Parquet or Avro, simple SequenceFiles with
the Hadoop Writable types, or something else? I think a per-file text input format that does
the stuff we did here would also be good.

Matei


On Mar 18, 2014, at 3:27 PM, Matei Zaharia <matei.zaharia@gmail.com> wrote:

> Hi Diana,
> 
> This seems to work without the iter() in front if you just return treeiterator. What
happened when you didn’t include that? Treeiterator should return an iterator.
> 
> Anyway, this is a good example of mapPartitions. It’s one where you want to view the
whole file as one object (one XML here), so you couldn’t implement this using a flatMap,
but you still want to return multiple values. The MLlib example you saw needs Python 2.7 because
unfortunately that is a requirement for our Python MLlib support (see http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries).
We’d like to relax this later but we’re using some newer features of NumPy and Python.
The rest of PySpark works on 2.6.
> 
> In terms of the size in memory, here both the string s and the XML tree constructed from
it need to fit in, so you can’t work on very large individual XML files. You may be able
to use a streaming XML parser instead to extract elements from the data in a streaming fashion,
without every materializing the whole tree. http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreader
is one example.
> 
> Matei
> 
> On Mar 18, 2014, at 7:49 AM, Diana Carroll <dcarroll@cloudera.com> wrote:
> 
>> Well, if anyone is still following this, I've gotten the following code working which
in theory should allow me to parse whole XML files: (the problem was that I can't return the
tree iterator directly.  I have to call iter().  Why?)
>> 
>> import xml.etree.ElementTree as ET
>> 
>> # two source files, format <data> <country name="...">...</country>...</data>
>> mydata=sc.textFile("file:/home/training/countries*.xml") 
>> 
>> def parsefile(iterator):
>>     s = ''
>>     for i in iterator: s = s + str(i)
>>     tree = ET.fromstring(s)
>>     treeiterator = tree.getiterator("country")
>>     # why to I have to convert an iterator to an iterator?  not sure but required
>>     return iter(treeiterator)
>> 
>> mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: element.attrib).collect()
>> 
>> The output is what I expect:
>> [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]
>> 
>> BUT I'm a bit concerned about the construction of the string "s".  How big can my
file be before converting it to a string becomes problematic?
>> 
>> 
>> 
>> On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dcarroll@cloudera.com> wrote:
>> Thanks, Matei.
>> 
>> In the context of this discussion, it would seem mapParitions is essential, because
it's the only way I'm going to be able to process each file as a whole, in our example of
a large number of small XML files which need to be parsed as a whole file because records
are not required to be on a single line.
>> 
>> The theory makes sense but I'm still utterly lost as to how to implement it.  Unfortunately
there's only a single example of the use of mapPartitions in any of the Python example programs,
which is the log regression example, which I can't run because it requires Python 2.7 and
I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6 is unsupported...is
it?)
>> 
>> I'd really really love to see a real life example of a Python use of mapPartitions.
 I do appreciate the very simple examples you provided, but (perhaps because of my novice
status on Python) I can't figure out how to translate those to a real world situation in which
I'm building RDDs from files, not inline collections like [(1,2),(2,3)].
>> 
>> Also, you say that the function called in mapPartitions can return a collection OR
an iterator.  I tried returning an iterator by calling ElementTree getiterator function, but
still got the error telling me my object was not an iterator. 
>> 
>> If anyone has a real life example of mapPartitions returning a Python iterator, that
would be fabulous.
>> 
>> Diana
>> 
>> 
>> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <matei.zaharia@gmail.com> wrote:
>> Oh, I see, the problem is that the function you pass to mapPartitions must itself
return an iterator or a collection. This is used so that you can return multiple output records
for each input record. You can implement most of the existing map-like operations in Spark,
such as map, filter, flatMap, etc, with mapPartitions, as well as new ones that might do a
sliding window over each partition for example, or accumulate data across elements (e.g. to
compute a sum).
>> 
>> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will work:
>> 
>> >>> data.mapPartitions(lambda x: x).collect()
>> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
>> 
>> >>> data.mapPartitions(lambda x: [list(x)]).collect()
>> [[1, 2], [3, 4]]   # Group together the elements of each partition in a single list
(like glom)
>> 
>> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
>> [3, 7]   # Sum each partition separately
>> 
>> However something like data.mapPartitions(lambda x: sum(x)).collect() will *not*
work because sum returns a number, not an iterator. That’s why I put sum(x) inside a list
above.
>> 
>> In practice mapPartitions is most useful if you want to share some data or work across
the elements. For example maybe you want to load a lookup table once from an external file
and then check each element in it, or sum up a bunch of elements without allocating a lot
of vector objects.
>> 
>> Matei
>> 
>> 
>> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dcarroll@cloudera.com> wrote:
>> 
>> > "There’s also mapPartitions, which gives you an iterator for each partition
instead of an array. You can then return an iterator or list of objects to produce from that."
>> >
>> > I confess, I was hoping for an example of just that, because i've not yet been
able to figure out how to use mapPartitions.  No doubt this is because i'm a rank newcomer
to Python, and haven't fully wrapped my head around iterators.  All I get so far in my attempts
to use mapPartitions is the darned "suchnsuch is not an iterator" error.
>> >
>> > def myfunction(iterator): return [1,2,3]
>> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
>> >
>> >
>> >
>> >
>> >
>> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <matei.zaharia@gmail.com>
wrote:
>> > Here’s an example of getting together all lines in a file as one string:
>> >
>> > $ cat dir/a.txt
>> > Hello
>> > world!
>> >
>> > $ cat dir/b.txt
>> > What's
>> > up??
>> >
>> > $ bin/pyspark
>> > >>> files = sc.textFile(“dir”)
>> >
>> > >>> files.collect()
>> > [u'Hello', u'world!', u"What's", u'up??’]   # one element per line, not what
we want
>> >
>> > >>> files.glom().collect()
>> > [[u'Hello', u'world!'], [u"What's", u'up??’]]   # one element per file, which
is an array of lines
>> >
>> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
>> > [u'Hello\nworld!', u"What's\nup??”]    # join back each file into a single
string
>> >
>> > The glom() method groups all the elements of each partition of an RDD into an
array, giving you an RDD of arrays of objects. If your input is small files, you always have
one partition per file.
>> >
>> > There’s also mapPartitions, which gives you an iterator for each partition
instead of an array. You can then return an iterator or list of objects to produce from that.
>> >
>> > Matei
>> >
>> >
>> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dcarroll@cloudera.com> wrote:
>> >
>> > > Thanks Matei.  That makes sense.  I have here a dataset of many many smallish
XML files, so using mapPartitions that way would make sense.  I'd love to see a code example
though ...It's not as obvious to me how to do that as I probably should be.
>> > >
>> > > Thanks,
>> > > Diana
>> > >
>> > >
>> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <matei.zaharia@gmail.com>
wrote:
>> > > Hi Diana,
>> > >
>> > > Non-text input formats are only supported in Java and Scala right now,
where you can use sparkContext.hadoopFile or .hadoopDataset to load data with any InputFormat
that Hadoop MapReduce supports. In Python, you unfortunately only have textFile, which gives
you one record per line. For JSON, you’d have to fit the whole JSON object on one line as
you said. Hopefully we’ll also have some other forms of input soon.
>> > >
>> > > If your input is a collection of separate files (say many .xml files),
you can also use mapPartitions on it to group together the lines because each input file will
end up being a single dataset partition (or map task). This will let you concatenate the lines
in each file and parse them as one XML object.
>> > >
>> > > Matei
>> > >
>> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dcarroll@cloudera.com>
wrote:
>> > >
>> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks like
you are assuming that each line in foo.log contains a complete json object?  (That is, that
the data doesn't contain any records that are split into multiple lines.)  If so, is that
because you know that to be true of your data?  Or did you do as Nicholas suggests and have
some preprocessing on the text input to flatten the data in that way?
>> > >>
>> > >> Thanks,
>> > >> Diana
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <shankark+sys@gmail.com>
wrote:
>> > >> Katrina,
>> > >>
>> > >> Not sure if this is what you had in mind, but here's some simple pyspark
code that I recently wrote to deal with JSON files.
>> > >>
>> > >> from pyspark import SparkContext, SparkConf
>> > >>
>> > >>
>> > >>
>> > >> from operator import add
>> > >> import json
>> > >>
>> > >>
>> > >>
>> > >> import random
>> > >> import numpy as np
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> def concatenate_paragraphs(sentence_array):
>> > >>
>> > >>
>> > >>
>> > >> return ' '.join(sentence_array).split(' ')
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> logFile = 'foo.json'
>> > >> conf = SparkConf()
>> > >>
>> > >>
>> > >>
>> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory",
"1g")
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> sc = SparkContext(conf=conf)
>> > >>
>> > >>
>> > >>
>> > >> logData = sc.textFile(logFile).cache()
>> > >>
>> > >>
>> > >>
>> > >> num_lines = logData.count()
>> > >> print 'Number of lines: %d' % num_lines
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1,
sentence2, ...]}}
>> > >> tm = logData.map(lambda s: (json.loads(s)['key'], len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> op = tm.collect()
>> > >> for key, num_words in op:
>> > >>
>> > >>
>> > >>
>> > >>      print 'state: %s, num_words: %d' % (state, num_words)
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User
List] <[hidden email]> wrote:
>> > >> I don't actually have any data.  I'm writing a course that teaches
students how to do this sort of thing and am interested in looking at a variety of real life
examples of people doing things like that.  I'd love to see some working code implementing
the "obvious work-around" you mention...do you have any to share?  It's an approach that makes
a lot of sense, and as I said, I'd love to not have to re-invent the wheel if someone else
has already written that code.  Thanks!
>> > >>
>> > >> Diana
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]>
wrote:
>> > >> There was a previous discussion about this here:
>> > >>
>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>> > >>
>> > >> How big are the XML or JSON files you're looking to deal with?
>> > >>
>> > >> It may not be practical to deserialize the entire document at once.
In that case an obvious work-around would be to have some kind of pre-processing step that
separates XML nodes/JSON objects with newlines so that you can analyze the data with Spark
in a "line-oriented format". Your preprocessor wouldn't have to parse/deserialize the massive
document; it would just have to track open/closed tags/braces to know when to insert a newline.
>> > >>
>> > >> Then you'd just open the line-delimited result and deserialize the
individual objects/nodes with map().
>> > >>
>> > >> Nick
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]>
wrote:
>> > >> Has anyone got a working example of a Spark application that analyzes
data in a non-line-oriented format, such as XML or JSON?  I'd like to do this without re-inventing
the wheel...anyone care to share?  Thanks!
>> > >>
>> > >> Diana
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> If you reply to this email, your message will be added to the discussion
below:
>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>> > >> To start a new topic under Apache Spark User List, email [hidden email]
>> > >> To unsubscribe from Apache Spark User List, click here.
>> > >> NAML
>> > >>
>> > >>
>> > >> View this message in context: Re: example of non-line oriented input
data?
>> > >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> > >>
>> > >
>> > >
>> >
>> >
>> 
>> 
>> 
> 


Mime
View raw message