spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "K. Shankari" <shank...@eecs.berkeley.edu>
Subject Re: How to map each line to (line number, line)?
Date Wed, 01 Jan 2014 09:35:52 GMT
Why not use a zipped RDD?
http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD

I have used something like this in the past.

> val index = sc.parallelize(Range.Long(0, rdd.count, 1),
rdd.partitions.size)
> val rddWithIndex = rdd.zip(index)

If that doesn't work, then you could try zipPartitions as well, since it
has slightly more relaxed constraints.

Thanks,
Shankari


On Tue, Dec 31, 2013 at 11:39 AM, Christopher Nguyen <ctn@adatao.com> wrote:

> It's a reasonable ask (row indices) in some interactive use cases we've
> come across. We're working on providing support for this at a higher level
> of abstraction.
>
> Sent while mobile. Pls excuse typos etc.
> On Dec 31, 2013 11:34 AM, "Aureliano Buendia" <buendia360@gmail.com>
> wrote:
>
>>
>>
>>
>> On Mon, Dec 30, 2013 at 8:31 PM, Andrew Ash <andrew@andrewash.com> wrote:
>>
>>> Hi Aureliano,
>>>
>>> It's very easy to get lines into (start byte number, line) using
>>> Hadoop's TextInputFormat.  See how SparkContext's textFile() method does it
>>> here:
>>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L291
>>>
>>
>> Thanks for pointing this. while start byte number provides a globally
>> unique index for each line, my application needs the line number.
>>
>> It seems best to go with the source file containing the line numbers,
>> instead of recreating this is in hadoop/spark.
>>
>>
>>>
>>> What is the use case where you must have the global line number in the
>>> file, vs a global ordered unique identifier (my suggestion above) or a
>>> partition-local line number (discussed extensively below)?
>>>
>>> Also if you have any way to do this in plain Hadoop, Spark can use that
>>> as well.
>>>
>>> The fundamental difficulty is that knowing global line number breaks the
>>> assumption Hadoop makes everywhere that each record is independent of all
>>> the others.  Maybe you should consider adding a line number to the
>>> beginning of every line on import time into HDFS instead of doing it
>>> afterwards in Spark.
>>>
>>> Cheers!
>>> Andrew
>>>
>>>
>>> On Mon, Dec 30, 2013 at 12:15 PM, Aureliano Buendia <
>>> buendia360@gmail.com> wrote:
>>>
>>>> I assumed that number of lines in each partition, except the last
>>>> partition, is equal. Isn't this the case? In that case Guillaume's approach
>>>> makes sense.
>>>>
>>>> All of these methods are inefficient. Spark needs to support this
>>>> feature at lower level, as Michael suggested.
>>>>
>>>>
>>>> On Mon, Dec 30, 2013 at 8:01 PM, Guillaume Pitel <
>>>> guillaume.pitel@exensa.com> wrote:
>>>>
>>>>>  You're assuming each partition has the same line count. I don't
>>>>> think it's true (actually, I'm almost certain it's false). And anyway
your
>>>>> code also require two maps.
>>>>>
>>>>> In my code, the sorting as well as the other operations are performed
>>>>> on a very small dataset : one element per partition
>>>>>
>>>>> Guillaume
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>> Did you try the code I sent ? I think the sortBy is probably in the
>>>>>> wrong direction, so change it with -i instead of i
>>>>>>
>>>>>
>>>>>  I'm confused why would need in memory sorting. We just use a loop
>>>>> like any other loops in spark. Why shouldn't this solve the problem?:
>>>>>
>>>>>     val count = lines.count() // lines is the rdd
>>>>>     val partitionLinesCount = count / rdd.partitions.length
>>>>>     linesWithIndex = lines.mapPartitionsWithIndex { (pi, it) =>
>>>>>       var i = pi * partitionLinesCount
>>>>>       it.map {
>>>>>         *line => (i, line)*
>>>>>          i += 1
>>>>>        }
>>>>>     }
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>    [image: eXenSa]
>>>>>  *Guillaume PITEL, Président*
>>>>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>>>>
>>>>>  eXenSa S.A.S. <http://www.exensa.com/>
>>>>>  41, rue Périer - 92120 Montrouge - FRANCE
>>>>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>>>>
>>>>
>>>>
>>>
>>

Mime
View raw message