spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Something Something <mailinglist...@gmail.com>
Subject Spark Streaming with mapGroupsWithState
Date Mon, 02 Mar 2020 22:37:12 GMT
I am writing a Stateful Streaming application in which I am using
mapGroupsWithState to create aggregates for Groups but I need to create *Groups
based on more than one column in the Input Row*. All the examples in the
'Spark: The Definitive Guide' use only one column such as 'User' or
'Device'. I am using code similar to what's given below. *How do I specify
more than one field in the 'groupByKey'?*

There are other challenges as well. The book says we can use
'updateAcrossEvents' the way given below but I get compile time error
saying:


*Error:(43, 65) missing argument list for method updateAcrossEvents in
object MainUnapplied methods are only converted to functions when a
function type is expected.You can make this conversion explicit by writing
`updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
`updateAcrossEvents`.
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*

Another challenge: Compiler also complains about the my *MyReport*: *Error:(41,
12) Unable to find encoder for type stored in a Dataset.  Primitive types
(Int, String, etc) and Product types (case classes) are supported by
importing spark.implicits._  Support for serializing other types will be
added in future releases.*

Help in resolving these errors would be greatly appreciated. Thanks in
advance.


withEventTime
    .as[MyReport]
  .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
  .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
  .writeStream
  .queryName("test_query")
  .format("memory")
  .outputMode("update")
  .start()

Mime
View raw message