spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Holden Karau <hol...@pigscanfly.ca>
Subject Re: how to extract/combine elements of an Array in DStream element?
Date Wed, 29 Oct 2014 22:36:14 GMT
On Wed, Oct 29, 2014 at 3:29 PM, spr <spr@yarcdata.com> wrote:

> I am processing a log file, from each line of which I want to extract the
> zeroth and 4th elements (and an integer 1 for counting) into a tuple.  I
> had
> hoped to be able to index the Array for elements 0 and 4, but Arrays appear
> not to support vector indexing.  I'm not finding a way to extract and
> combine the elements properly, perhaps due to being a SparkStreaming/Scala
> newbie.
>
> My code so far looks like:
>
> 1]    var lines = ssc.textFileStream(dirArg)
> 2]    var linesArray = lines.map( line => (line.split("\t")))
> 3]    var respH = linesArray.map( lineArray => lineArray(4) )
> 4a]  var time  = linesArray.map( lineArray => lineArray(0) )
> 4b]  var time  = linesArray.map( lineArray => (lineArray(0), 1))
> 5]    var newState = respH.union(time)
>
> If I use line 4a and not 4b, it compiles properly.  (I still have issues
> getting my update function to updateStateByKey working, so don't know if it
> _works_ properly.)
>
> If I use line 4b and not 4a, it fails at compile time with
>
> [error]  foo.scala:82: type mismatch;
> [error]  found   : org.apache.spark.streaming.dstream.DStream[(String,
> Int)]
> [error]  required: org.apache.spark.streaming.dstream.DStream[String]
> [error]     var newState = respH.union(time)
>
> This implies that the DStreams being union()ed have to be of identical
> per-element type.  Can anyone confirm that's true?
>
Yes. As shown in the scaladoc/javadoc they have to be the same type. (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.DStream
)

>
> If so, is there a way to extract the needed elements and build the new
> DStream?
>
Maybe you can say what you want your new DStream to look like?  If you just
want to extract the zero and fourth elements and have them together I'd do
the extraction in a single map
e.g. something like
val iLikeCoffeeDStream = linesArrayDStream.map(lineArray => (lineArray(0),
lineArray(4))


>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-extract-combine-elements-of-an-Array-in-DStream-element-tp17676.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>


-- 
Cell : 425-233-8271

Mime
View raw message