spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From German Schiavon <gschiavonsp...@gmail.com>
Subject Re: Kafka structured straming - how to read headers
Date Fri, 04 Dec 2020 14:42:20 GMT
Yep I've tried this morning:

[image: Captura de pantalla 2020-12-04 a las 15.42.09.png]

On Fri, 4 Dec 2020 at 13:25, Jungtaek Lim <kabhwan.opensource@gmail.com>
wrote:

> Yeah that would be great, once you manually verified it. Thanks!
>
> On Fri, Dec 4, 2020 at 5:54 PM German Schiavon <gschiavonspark@gmail.com>
> wrote:
>
>> I could change it , to something more generic like ?
>>
>> .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
>>   .as[(String, String, Array[(String, Array[Byte])])]
>>
>>
>> On Fri, 4 Dec 2020 at 09:18, Jungtaek Lim <kabhwan.opensource@gmail.com>
>> wrote:
>>
>>> Probably it's missing - that was Map from the initial proposal and
>>> changed to Array during review. We look to miss updating the doc in sync
>>> during review.
>>>
>>> I'd love to see a volunteer to fix the doc. If there's no volunteer I
>>> might be able to take a look.
>>>
>>> On Fri, Dec 4, 2020 at 4:09 PM German Schiavon <gschiavonspark@gmail.com>
>>> wrote:
>>>
>>>> oh, I didn't see that Map, it is weird :/
>>>>
>>>> I did this and it works :
>>>>
>>>> case class Header(key: String, value: Array[Byte])
>>>>
>>>> import spark.implicits._
>>>>   val kafkaDF = spark
>>>>     .readStream
>>>>     .format("kafka")
>>>>     .option("kafka.bootstrap.servers", "localhost:9092")
>>>>     .option("subscribe", "test")
>>>>     .option("includeHeaders", "true")
>>>>     .option("startingOffsets", "earliest")
>>>>     .load()
>>>>
>>>>
>>>> kafkaDF
>>>>   .select('key.cast("string"), 'value.cast("string"), 'headers.as[Header])
>>>>   .writeStream
>>>>   .format("console")
>>>>   .option("truncate", false)
>>>>   .start()
>>>>   .awaitTermination()
>>>>
>>>>
>>>> On Thu, 3 Dec 2020 at 23:11, <eugen.wintersberger@gmail.com> wrote:
>>>>
>>>>> Hi German,
>>>>>   unfortunately the documentation does not help.
>>>>>
>>>>> On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> see if this works, from the documentation:
>>>>>
>>>>>
>>>>> // Subscribe to 1 topic, with headersval df = spark
>>>>>   .readStream
>>>>>   .format("kafka")
>>>>>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>>>>>   .option("subscribe", "topic1")
>>>>>   .option("includeHeaders", "true")
>>>>>   .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)",
"headers")
>>>>>   .as[(String, String, Map)
>>>>>
>>>>> I am actually wondering what this code is doing since using Map
>>>>> without key and value type is rather strange. And this is actually what
I
>>>>> get when I am trying to do this
>>>>>
>>>>>
>>>>> The error messages make totally sense. Now I can try to add the type
>>>>> parameters to Map, assuming that the key is a String type and the value
a
>>>>> byte array, in this case I get
>>>>>
>>>>> What seems to work is this.
>>>>>
>>>>> I will try to continue in this direction. I'll let you know about my
>>>>> findings. The documentation is in any case wrong.
>>>>>
>>>>>
>>>>> On Thu, 3 Dec 2020 at 18:22, <eugen.wintersberger@gmail.com> wrote:
>>>>>
>>>>> Hi folks,
>>>>>   I am trying to read the message headers from a Kafka structured
>>>>> stream which should be stored in a column named ``headers``.
>>>>> I try something like this:
>>>>>
>>>>> val stream = sparkSession.readStream.format("kafka")......load()
>>>>>
>>>>> stream.map(row => {
>>>>>
>>>>>  ...
>>>>>
>>>>>  val headers = row.getAs[HeaderT]("headers")
>>>>>
>>>>> ....
>>>>>
>>>>> })
>>>>>
>>>>>
>>>>> My question is: what would be *HeaderT*?
>>>>>
>>>>> Thanks in advance
>>>>>
>>>>>  Eugen
>>>>>
>>>>>
>>>>>

Mime
View raw message