spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michał Zieliński <zielinski.mich...@gmail.com>
Subject Re: Do not wrap result of a UDAF in an Struct
Date Tue, 29 Mar 2016 11:15:00 GMT
Matthias,

You don't need StructType, you can have ArrayType directly

def bufferSchema: StructType = StructType(StructField("vals",
DataTypes.createArrayType(StringType)) :: Nil)
def dataType: DataType = DataTypes.createArrayType(StringType)
def evaluate(buffer: Row): Any = buffer.getSeq[String](0)


On 29 March 2016 at 11:32, Matthias Niehoff <matthias.niehoff@codecentric.de
> wrote:

> Hi,
>
> given is a simple DF:
>
> root
>  |-- id1: string (nullable = true)
>  |-- id2: string (nullable = true)
>  |-- val: string (nullable = true)
>
> I run an UDAF on this DF with groupBy($“id1“,$“id2“).agg(udaf($“val“) as
> „valsStruct“).
> The aggregates simply stores all val in Set.
>
> The result is:
>
> root
>  |-- id1: string (nullable = true)
>  |-- id2: integer (nullable = true)
>  |-- valsStruct: struct (nullable = true)
>  |    |-- vals: array (nullable = true)
>  |    |    |-- element: string (containsNull = true)
>
> But i would expect:
>
> root
>  |-- id1: string (nullable = true)
>  |-- id2: integer (nullable = true)
>  |-- vals: array (nullable = true)
>  |    |— element: string (containsNull = true)
>
> What I’m doing right now is to add a new columns val with valsStruct.vals
> as a value and drop valsStruct afterwards, but i’m quite sure there is a
> more elegant way. I tried various implementations of the evaluate method,
> but none of those worked for me. Can you tell me what I am missing here?
>
> The implementation of the UDAF:
>
> class AggregateVals extends UserDefinedAggregateFunction {
>
>   def inputSchema: StructType = StructType(Array(
>     StructField("val", StringType, true)
>   ))
>
>   def bufferSchema: StructType = StructType(Array(
>     StructField("vals", ArrayType(StringType, true))
>   ))
>
>   def dataType: DataType = bufferSchema
>
>   def deterministic: Boolean = true
>
>   def initialize(buffer: MutableAggregationBuffer): Unit = {
>     buffer(0) = Seq[String]()
>   }
>
>   def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
>     val existing: Seq[String] = buffer.getSeq[String](0)
>     val newBuffer = existing :+ input.getAs[String](0)
>     buffer(0) = newBuffer
>   }
>
>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
>     buffer1(0) = buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)
>   }
>
>   def evaluate(buffer: Row): Any = {
>     buffer
>   }
> }
>
> --
> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> 172.1702676
> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
> evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet
>

Mime
View raw message