flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Bucher <jbuc...@expedia.com>
Subject Scala case classes with a generic parameter
Date Wed, 01 Jun 2016 18:11:15 GMT
Hi,

I have been trying to get a case class with a generic parameter working with Filnk 1.0.3 and
have been having some trouble. However when I compile I get the following error:
debug-type-bug/src/main/scala/com/example/flink/jobs/CaseClassWithGeneric.scala:40: error:
could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.example.flink.jobs.CaseClassWithGeneric.TestGen[String]]
[ERROR]           .apply(new AggregateOrigins)

I am importing org.apache.flink.api.scala._ and the generic type is defined as [T: TypeInformation]
as suggested here: https://ci.apache.org/projects/flink/flink-docs-master/internals/types_serialization.html

The full code for the program is as follows:

package com.example.flink.jobs

import java.util.{Properties}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer08}
import org.apache.flink.api.scala._

object CaseClassWithGeneric {
  case class TestGen[T: TypeInformation](item: T) {}

  class AggregateOrigins extends WindowFunction[String, TestGen[String], String, TimeWindow]
{
    def apply(key: String, win: TimeWindow, values: Iterable[String], col: Collector[TestGen[String]]):
Unit = {
      values.foreach(x => { })
      col.collect(new TestGen[String]("Foo"))
    }
  }

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties();
    val messageStream = env.addSource(
      new FlinkKafkaConsumer08("topic", new SimpleStringSchema, properties))
          .keyBy(s => s)
          .timeWindow(Time.days(1))
          .apply(new AggregateOrigins)
    messageStream.print()
    env.execute("Simple Job")
  }
}

When I dug into the apply() function definition I found the following:

def apply[R: TypeInformation](
    function: WindowFunction[T, R, K, W]): DataStream[R] = {

  val cleanFunction = clean(function)
  val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, W](cleanFunction)
  asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]]))
}

As Far as I can tell TestGen[String] should correspond to [R: TypeInformation] in apply. Am
I missing something or is it not possible to define case class with a generic parameter?

Thanks,

James Bucher
Mime
View raw message