spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (SPARK-5066) Can not get all key that has same hashcode when reading key ordered from different Streaming.
Date Thu, 26 Feb 2015 23:52:04 GMT

     [ https://issues.apache.org/jira/browse/SPARK-5066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Sean Owen resolved SPARK-5066.
------------------------------
    Resolution: Not a Problem

> Can not get all key that has same hashcode  when reading key ordered  from different
Streaming.
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5066
>                 URL: https://issues.apache.org/jira/browse/SPARK-5066
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: DoingDone9
>            Priority: Critical
>
> when spill is open, data ordered by hashCode will be spilled to disk. We need get all
key that has the same hashCode from different tmp files when merge value, but it just read
the key that has the minHashCode that in a tmp file, we can not read all key.
> Example :
> If file1 has [k1, k2, k3], file2 has [k4,k5,k1].
> And hashcode of k4 < hashcode of k5 < hashcode of k1 <  hashcode of k2 <
 hashcode of k3
> we just  read k1 from file1 and k4 from file2. Can not read all k1.
> Code :
> private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
>     inputStreams.foreach { it =>
>       val kcPairs = new ArrayBuffer[(K, C)]
>       readNextHashCode(it, kcPairs)
>       if (kcPairs.length > 0) {
>         mergeHeap.enqueue(new StreamBuffer(it, kcPairs))
>       }
>     }
>  private def readNextHashCode(it: BufferedIterator[(K, C)], buf: ArrayBuffer[(K, C)]):
Unit = {
>       if (it.hasNext) {
>         var kc = it.next()
>         buf += kc
>         val minHash = hashKey(kc)
>         while (it.hasNext && it.head._1.hashCode() == minHash) {
>           kc = it.next()
>           buf += kc
>         }
>       }
>     }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message