spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stadin, Benjamin" <Benjamin.Sta...@heidelberg-mobil.com>
Subject Re: ETL process design
Date Wed, 28 Jan 2015 09:47:34 GMT
Hi Danny,

What you describe sounds like you may also consider to use Spring XD instead, at least for
the file-centric stuff.

Regards
Ben

Von meinem iPad gesendet

> Am 28.01.2015 um 10:42 schrieb Danny Yates <danny@codeaholics.org>:
> 
> Hi,
> 
> My apologies for what has ended up as quite a long email with a lot of open-ended questions,
but, as you can see, I'm really struggling to get started and would appreciate some guidance
from people with more experience. I'm new to Spark and "big data" in general, and I'm struggling
with what I suspect is actually a fairly simple problem.
> 
> For background, this process will run on an EMR cluster in AWS. My files are all in S3,
but the S3 access is pretty straightforward in that environment, so I'm not overly concerned
about that at the moment.
> 
> I have a process (or rather, a number of processes) which drop "JSON" events into files
in directories in S3 structured by the date the events arrived. I say "JSON" because they're
one JSON message per line, rather than one per file. That is, they are amenable to being loaded
with sc.jsonFile(). The directory structure is s3://bucket/path/yyyy-mm-dd/many-files-here,
where yyyy-mm-dd is the received date of the events.
> 
> Depending on the environment, there could be 4,000 - 5,000 files in each directory, each
having up to 3,000 lines (events) in. So plenty of scope for parallelism. In general, there
will be something like 2,000,000 events per day initially.
> 
> The incoming events are of different types (page views, item purchases, etc.) but are
currently all bundled into the same set of input files. So the JSON is not uniform across
different lines within each file. I'm amenable to changing this if that's helpful and having
the events broken out into different files by event type.
> 
> Oh, and there could be duplicates too, which will need removing. :-)
> 
> My challenge is to take these files and transfer them into a more long-term storage format
suitable for both overnight analytics and also ad-hoc querying. I'm happy for this process
to just happen once a day - say, at 1am and process the whole of the previous day's received
data.
> 
> I'm thing that having Parquet files stored in Hive-like partitions would be a sensible
way forward: s3://bucket/different-path/t=type/y=yyyy/m=mm/d=dd/whatever.parquet. Here, yyyy,
mm and dd represent the time the event happened, rather than the time it arrived. Does that
sound sensible? Do you have any other recommendations?
> 
> So I need to read each line, parse the JSON, deduplicate the data, decided which event
type it is, and output it to the right file in the right directory.
> 
> I'm struggling with... well... most of it, if I'm honest. Here's what I have so far.
> 
> val data = sc.textFile("s3://..../yyyy-mm-dd/*")  // load all files for given received
date
> 
> // Deduplicate
> val dedupe = data.map(line => {
>     val json = new com.fasterxml.jackson.databind.ObjectMapper().readTree(line);
>     val _id = json.get("_id").asText();   // _id is a key that can be used to dedupe
>     val event = json.get("event").asText();    // event is the event type
>     val ts = json.get("timestamp").asText();    // timestamp is the when the event happened
> 
>     (_id, (event, ts, line))   // I figure having event, ts and line at this point will
save time later
> }).reduceByKey((a, b) => a)   // For any given pair of lines with the same _id, pick
one arbitrarily
> 
> At this point, I guess I'm going to have to split this apart by event type (I'm happy
to have a priori knowledge of the event types) and "formally" parse each line using a schema
to get a SchemaRDD so I can write out Parquet files. I have exactly zero idea how to approach
this part.
> 
> The other wrinkle here is that Spark seems to want to "own" the directory it writes to.
But it's possible that on any given run we might pick up a few left-over events for a previous
day, so we need to be able to handle the situation where we're adding events for a day we've
already processed.
> 
> Many thanks,
> 
> Danny.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message