spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eugen.wintersber...@gmail.com
Subject Re: Kafka structured straming - how to read headers
Date Thu, 03 Dec 2020 22:11:17 GMT
Hi German,
  unfortunately the documentation does not help. 

On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote:
> Hello, 
> see if this works, from the documentation:
> 
> 
> // Subscribe to 1 topic, with headers
> val df = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
> .option("subscribe", "topic1")
> .option("includeHeaders", "true")
> .load()
> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)",
> "headers")
> .as[(String, String, Map)
I am actually wondering what this code is doing since using Map without
key and value type is rather strange. And this is actually what I get
when I am trying to do this


The error messages make totally sense. Now I can try to add the type
parameters to Map, assuming that the key is a String type and the value
a byte array, in this case I get

What seems to work is this.

I will try to continue in this direction. I'll let you know about my
findings. The documentation is in any case wrong.


> On Thu, 3 Dec 2020 at 18:22, <eugen.wintersberger@gmail.com> wrote:
> > Hi folks,
> >   I am trying to read the message headers from a Kafka structured
> > stream which should be stored in a column named ``headers``. 
> > I try something like this:
> > 
> > val stream = sparkSession.readStream.format("kafka")......load()
> > stream.map(row => { 
> >  ...
> >  val headers = row.getAs[HeaderT]("headers")
> > ....
> > })
> > 
> > My question is: what would be HeaderT?
> > 
> > Thanks in advance
> > 
> >  Eugen


Mime
View raw message