spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From German Schiavon <gschiavonsp...@gmail.com>
Subject Re: Kafka structured straming - how to read headers
Date Fri, 04 Dec 2020 08:54:39 GMT
I could change it , to something more generic like ?

.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  .as[(String, String, Array[(String, Array[Byte])])]


On Fri, 4 Dec 2020 at 09:18, Jungtaek Lim <kabhwan.opensource@gmail.com>
wrote:

> Probably it's missing - that was Map from the initial proposal and changed
> to Array during review. We look to miss updating the doc in sync during
> review.
>
> I'd love to see a volunteer to fix the doc. If there's no volunteer I
> might be able to take a look.
>
> On Fri, Dec 4, 2020 at 4:09 PM German Schiavon <gschiavonspark@gmail.com>
> wrote:
>
>> oh, I didn't see that Map, it is weird :/
>>
>> I did this and it works :
>>
>> case class Header(key: String, value: Array[Byte])
>>
>> import spark.implicits._
>>   val kafkaDF = spark
>>     .readStream
>>     .format("kafka")
>>     .option("kafka.bootstrap.servers", "localhost:9092")
>>     .option("subscribe", "test")
>>     .option("includeHeaders", "true")
>>     .option("startingOffsets", "earliest")
>>     .load()
>>
>>
>> kafkaDF
>>   .select('key.cast("string"), 'value.cast("string"), 'headers.as[Header])
>>   .writeStream
>>   .format("console")
>>   .option("truncate", false)
>>   .start()
>>   .awaitTermination()
>>
>>
>> On Thu, 3 Dec 2020 at 23:11, <eugen.wintersberger@gmail.com> wrote:
>>
>>> Hi German,
>>>   unfortunately the documentation does not help.
>>>
>>> On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote:
>>>
>>> Hello,
>>>
>>> see if this works, from the documentation:
>>>
>>>
>>> // Subscribe to 1 topic, with headersval df = spark
>>>   .readStream
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>>>   .option("subscribe", "topic1")
>>>   .option("includeHeaders", "true")
>>>   .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
>>>   .as[(String, String, Map)
>>>
>>> I am actually wondering what this code is doing since using Map without
>>> key and value type is rather strange. And this is actually what I get when
>>> I am trying to do this
>>>
>>>
>>> The error messages make totally sense. Now I can try to add the type
>>> parameters to Map, assuming that the key is a String type and the value a
>>> byte array, in this case I get
>>>
>>> What seems to work is this.
>>>
>>> I will try to continue in this direction. I'll let you know about my
>>> findings. The documentation is in any case wrong.
>>>
>>>
>>> On Thu, 3 Dec 2020 at 18:22, <eugen.wintersberger@gmail.com> wrote:
>>>
>>> Hi folks,
>>>   I am trying to read the message headers from a Kafka structured stream
>>> which should be stored in a column named ``headers``.
>>> I try something like this:
>>>
>>> val stream = sparkSession.readStream.format("kafka")......load()
>>>
>>> stream.map(row => {
>>>
>>>  ...
>>>
>>>  val headers = row.getAs[HeaderT]("headers")
>>>
>>> ....
>>>
>>> })
>>>
>>>
>>> My question is: what would be *HeaderT*?
>>>
>>> Thanks in advance
>>>
>>>  Eugen
>>>
>>>
>>>

Mime
View raw message