spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pankaj Wahane <pankaj.wah...@qiotec.com>
Subject Question on take function - Spark Java API
Date Wed, 26 Aug 2015 02:55:13 GMT
Hi community members,


> Apache Spark is Fantastic and very easy to learn.. Awesome work!!!
> 
> Question:
> 
> I have multiple files in a folder and and the first line in each file is name of the
asset that the file belongs to. Second line is csv header row and data starts from third row..
> 
> Ex: File 1
> 
> TestAsset01
> Time,dp_1,dp_2,dp_3
> 11-01-2015 15:00:00,123,456,789
> 11-01-2015 15:00:01,123,456,789
> . . .
> 
> Ex: File 2
> 
> TestAsset02
> Time,dp_1,dp_2,dp_3
> 11-01-2015 15:00:00,1230,4560,7890
> 11-01-2015 15:00:01,1230,4560,7890
> . . .
> 
> I have got nearly 1000 files in each folder sizing ~10G
> 
> I am using apache spark Java api to read all this files.
> 
> Following is code extract that I am using:
> 
> try (JavaSparkContext sc = new JavaSparkContext(conf)) {
>             Map<String, String> readingTypeMap = getReadingTypesMap(sc);
>             //Read File
>             JavaRDD<String> data = sc.textFile(resourceBundle.getString(FOLDER_NAME));
>             //Get Asset
>             String asset = data.take(1).get(0);
>             //Extract Time Series Data
>             JavaRDD<String> actualData = data.filter(line -> line.contains(DELIMERTER));
>             //Strip header
>             String header = actualData.take(1).get(0);
>             String[] headers = header.split(DELIMERTER);
>             //Extract actual data
>             JavaRDD<String> timeSeriesLines = actualData.filter(line -> !line.equals(header));
>             //Extract valid records
>             JavaRDD<String> validated = timeSeriesLines.filter(line -> validate(line));
>             //Find Granularity
>             Integer granularity = toInt(resourceBundle.getString(GRANULARITY));
>             //Transform to TSD objects
>             JavaRDD<TimeSeriesData> tsdFlatMap = transformTotimeSeries(validated,
asset, readingTypeMap, headers, granularity);
> 
>             //Save to Cassandra
>             javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString("cassandra.tsd.keyspace"),
>                     "time_series_data", mapToRow(TimeSeriesData.class)).saveToCassandra();
> 
>             System.out.println("Total Records: " + timeSeriesLines.count());
>             System.out.println("Valid Records: " + validated.count());
>         }
> Within TimeSeriesData Object I need to set the asset name for the reading, so I need
output of data.take(1) to be different for different files.
> 
> 
> Thank You.
> 
> Best Regards,
> Pankaj
> 
> 


-- 


QIO Technologies Limited is a limited company registered in England & Wales 
at 1 Curzon Street, London, England, W1J 5HD, with registered number 
09368431 

This message and the information contained within it is intended solely for 
the addressee and may contain confidential or privileged information. If 
you have received this message in error please notify QIO Technologies 
Limited immediately and then permanently delete this message. If you are 
not the intended addressee then you must not copy, transmit, disclose or 
rely on the information contained in this message or in any attachment to 
it, all such use is prohibited to maximum extent possible by law.

Mime
View raw message