spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jungtaek Lim <kabhwan.opensou...@gmail.com>
Subject Re: Kafka structured straming - how to read headers
Date Fri, 04 Dec 2020 08:18:18 GMT
Probably it's missing - that was Map from the initial proposal and changed
to Array during review. We look to miss updating the doc in sync during
review.

I'd love to see a volunteer to fix the doc. If there's no volunteer I might
be able to take a look.

On Fri, Dec 4, 2020 at 4:09 PM German Schiavon <gschiavonspark@gmail.com>
wrote:

> oh, I didn't see that Map, it is weird :/
>
> I did this and it works :
>
> case class Header(key: String, value: Array[Byte])
>
> import spark.implicits._
>   val kafkaDF = spark
>     .readStream
>     .format("kafka")
>     .option("kafka.bootstrap.servers", "localhost:9092")
>     .option("subscribe", "test")
>     .option("includeHeaders", "true")
>     .option("startingOffsets", "earliest")
>     .load()
>
>
> kafkaDF
>   .select('key.cast("string"), 'value.cast("string"), 'headers.as[Header])
>   .writeStream
>   .format("console")
>   .option("truncate", false)
>   .start()
>   .awaitTermination()
>
>
> On Thu, 3 Dec 2020 at 23:11, <eugen.wintersberger@gmail.com> wrote:
>
>> Hi German,
>>   unfortunately the documentation does not help.
>>
>> On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote:
>>
>> Hello,
>>
>> see if this works, from the documentation:
>>
>>
>> // Subscribe to 1 topic, with headersval df = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>>   .option("subscribe", "topic1")
>>   .option("includeHeaders", "true")
>>   .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
>>   .as[(String, String, Map)
>>
>> I am actually wondering what this code is doing since using Map without
>> key and value type is rather strange. And this is actually what I get when
>> I am trying to do this
>>
>>
>> The error messages make totally sense. Now I can try to add the type
>> parameters to Map, assuming that the key is a String type and the value a
>> byte array, in this case I get
>>
>> What seems to work is this.
>>
>> I will try to continue in this direction. I'll let you know about my
>> findings. The documentation is in any case wrong.
>>
>>
>> On Thu, 3 Dec 2020 at 18:22, <eugen.wintersberger@gmail.com> wrote:
>>
>> Hi folks,
>>   I am trying to read the message headers from a Kafka structured stream
>> which should be stored in a column named ``headers``.
>> I try something like this:
>>
>> val stream = sparkSession.readStream.format("kafka")......load()
>>
>> stream.map(row => {
>>
>>  ...
>>
>>  val headers = row.getAs[HeaderT]("headers")
>>
>> ....
>>
>> })
>>
>>
>> My question is: what would be *HeaderT*?
>>
>> Thanks in advance
>>
>>  Eugen
>>
>>
>>

Mime
View raw message