spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anton Okolnychyi <>
Subject Expression Encoder for Map[Int, String] in a custom Aggregator on a Dataset
Date Thu, 20 Oct 2016 11:12:33 GMT
Hi all,

I am trying to use my custom Aggregator on a GroupedDataset of case classes
to create a hash map using Spark SQL 1.6.2.
My Encoder[Map[Int, String]] is not capable to reconstruct the reduced
values if I define it via ExpressionEncoder().
However, everything works fine if I define it as Encoders.kryo[Map[Int,
I would like to know if I am doing anything wrong.

I have the following use case:

  implicit val intStringMapEncoder: Encoder[Map[Int, String]] =

  val sparkContext = ...
  val sparkSqlContext = new SQLContext(sparkContext)

  import sparkSqlContext.implicits._

  case class StopPoint(line: String, sequenceNumber: Int, id: String)

  val stopPointDS = Seq(StopPoint("33", 1, "1"), StopPoint("33", 2,

  val stopPointSequenceMap = new Aggregator[StopPoint, Map[Int, String],
Map[Int, String]] {
    override def zero = Map[Int, String]()
    override def reduce(map: Map[Int, String], stopPoint: StopPoint) = {
    override def merge(map: Map[Int, String], anotherMap: Map[Int, String])
= {
      map ++ anotherMap
    override def finish(reduction: Map[Int, String]) = reduction

  val resultMap = stopPointDS
In spark.sql.execution.TypedAggregateExpression.scala, I see that each
entry is inserted into the initial map correctly (i.e. reduce() method
works properly).
However, my encoder cannot reconstruct the map from the reduce phase in the
merge phase and I get an empty Map as a result of the merge method.
If I replace my expression-based encoder with
org.apache.spark.sql.Encoders.kryo[Map[Int, String]], I will get the
correct result.
(33, Map(1 -> 1, 2 -> 2))

Any ideas/suggestions are more than welcome.

Anton Okolnychyi

View raw message