spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vetal king <greenve...@gmail.com>
Subject Re: Problem using saveAsNewAPIHadoopFile API
Date Fri, 25 Mar 2016 15:06:48 GMT
Hi Sebastian,

Yes... my mistake... you are right. Every partition will create a different
file.

Shridhar

On Fri, Mar 25, 2016 at 6:58 PM, Sebastian Piu <sebastian.piu@gmail.com>
wrote:

> I dont  understand about the race condition comment you mention.
> Have you seen this somewhere? That timestamp will be the same on each
> worker for that rdd, and each worker is handling a different partition
> which will be reflected on the filename, so no data will be overwriting. In
> fact this is what saveAsNewHadoopFile on a DStream is doing as far as I
> recall
>
> On Fri, 25 Mar 2016, 11:22 vetal king, <greenvetal@gmail.com> wrote:
>
>> Hi Surendra,
>>
>> Thanks for your suggestion. I tried MultipleOutputForma and
>> MultipleTextOutputFormat. But the result was the same. The folder would
>> always contain a single file part-r-00000, and this file gets overwritten
>> everytime.
>>
>> This is how I am invoking the API
>>             dataToWrite.saveAsHadoopFile(directory, String.class,
>> String.class, MultipleOutputFormat.class);
>> Am I missing something?
>>
>> Regards,
>> Shridhar
>>
>>
>> On Wed, Mar 23, 2016 at 11:55 AM, Surendra , Manchikanti <
>> surendra.manchikanti@gmail.com> wrote:
>>
>>> Hi Vetal,
>>>
>>> You may try with MultiOutPutFormat instead of TextOutPutFormat in
>>> saveAsNewAPIHadoopFile().
>>>
>>> Regards,
>>> Surendra M
>>>
>>> -- Surendra Manchikanti
>>>
>>> On Tue, Mar 22, 2016 at 10:26 AM, vetal king <greenvetal@gmail.com>
>>> wrote:
>>>
>>>> We are using Spark 1.4 for Spark Streaming. Kafka is data source for
>>>> the Spark Stream.
>>>>
>>>> Records are published on Kafka every second. Our requirement is to
>>>> store records published on Kafka in a single folder per minute. The stream
>>>> will read records every five seconds. For instance records published during
>>>> 1200 PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM
>>>> in folder "1201" and so on.
>>>>
>>>> The code I wrote is as follows
>>>>
>>>> //First Group records in RDD by date
>>>> stream.foreachRDD (rddWithinStream -> {
>>>>     JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory
= rddWithinStream.mapToPair(t -> {
>>>>     return new Tuple2<String, String> (targetHadoopFolder, t._2());
>>>> }).groupByKey();
>>>> // All records grouped by folders they will be stored in
>>>>
>>>>
>>>> // Create RDD for each target folder.
>>>> for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
>>>>     JavaPairRDD <String, Iterable<String>> rddByKey = rddGroupedByDirectory.filter(groupedTuples
-> {
>>>>     return groupedTuples._1().equals(hadoopFolder);
>>>>     });
>>>>
>>>> // And store it in Hadoop
>>>>     rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class,
TextOutputFormat.class);
>>>> }
>>>>
>>>> Since the Stream processes data every five seconds,
>>>> saveAsNewAPIHadoopFile gets invoked multiple times in a minute. This causes
>>>> "Part-00000" file to be overwritten every time.
>>>>
>>>> I was expecting that in the directory specified by "directory"
>>>> parameter, saveAsNewAPIHadoopFile will keep creating part-0000N file even
>>>> when I've a sinlge worker node.
>>>>
>>>> Any help/alternatives are greatly appreciated.
>>>>
>>>> Thanks.
>>>>
>>>
>>>
>>

Mime
View raw message