spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lec ssmi <>
Subject Re: Spark Streaming with mapGroupsWithState
Date Tue, 03 Mar 2020 01:11:54 GMT
maybe you can combine the fields you want to use into one field

Something Something <> 于2020年3月3日周二 上午6:37写道:

> I am writing a Stateful Streaming application in which I am using
> mapGroupsWithState to create aggregates for Groups but I need to create *Groups
> based on more than one column in the Input Row*. All the examples in the
> 'Spark: The Definitive Guide' use only one column such as 'User' or
> 'Device'. I am using code similar to what's given below. *How do I
> specify more than one field in the 'groupByKey'?*
> There are other challenges as well. The book says we can use
> 'updateAcrossEvents' the way given below but I get compile time error
> saying:
> *Error:(43, 65) missing argument list for method updateAcrossEvents in
> object MainUnapplied methods are only converted to functions when a
> function type is expected.You can make this conversion explicit by writing
> `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
> `updateAcrossEvents`.
> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*
> Another challenge: Compiler also complains about the my *MyReport*: *Error:(41,
> 12) Unable to find encoder for type stored in a Dataset.  Primitive types
> (Int, String, etc) and Product types (case classes) are supported by
> importing spark.implicits._  Support for serializing other types will be
> added in future releases.*
> Help in resolving these errors would be greatly appreciated. Thanks in
> advance.
> withEventTime
>     .as[MyReport]
>   .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
>   .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
>   .writeStream
>   .queryName("test_query")
>   .format("memory")
>   .outputMode("update")
>   .start()

View raw message