spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jungtaek Lim <kabhwan.opensou...@gmail.com>
Subject Re: Kafka structured straming - how to read headers
Date Fri, 04 Dec 2020 12:25:12 GMT
Yeah that would be great, once you manually verified it. Thanks!

On Fri, Dec 4, 2020 at 5:54 PM German Schiavon <gschiavonspark@gmail.com>
wrote:

> I could change it , to something more generic like ?
>
> .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
>   .as[(String, String, Array[(String, Array[Byte])])]
>
>
> On Fri, 4 Dec 2020 at 09:18, Jungtaek Lim <kabhwan.opensource@gmail.com>
> wrote:
>
>> Probably it's missing - that was Map from the initial proposal and
>> changed to Array during review. We look to miss updating the doc in sync
>> during review.
>>
>> I'd love to see a volunteer to fix the doc. If there's no volunteer I
>> might be able to take a look.
>>
>> On Fri, Dec 4, 2020 at 4:09 PM German Schiavon <gschiavonspark@gmail.com>
>> wrote:
>>
>>> oh, I didn't see that Map, it is weird :/
>>>
>>> I did this and it works :
>>>
>>> case class Header(key: String, value: Array[Byte])
>>>
>>> import spark.implicits._
>>>   val kafkaDF = spark
>>>     .readStream
>>>     .format("kafka")
>>>     .option("kafka.bootstrap.servers", "localhost:9092")
>>>     .option("subscribe", "test")
>>>     .option("includeHeaders", "true")
>>>     .option("startingOffsets", "earliest")
>>>     .load()
>>>
>>>
>>> kafkaDF
>>>   .select('key.cast("string"), 'value.cast("string"), 'headers.as[Header])
>>>   .writeStream
>>>   .format("console")
>>>   .option("truncate", false)
>>>   .start()
>>>   .awaitTermination()
>>>
>>>
>>> On Thu, 3 Dec 2020 at 23:11, <eugen.wintersberger@gmail.com> wrote:
>>>
>>>> Hi German,
>>>>   unfortunately the documentation does not help.
>>>>
>>>> On Thu, 2020-12-03 at 18:41 +0100, German Schiavon wrote:
>>>>
>>>> Hello,
>>>>
>>>> see if this works, from the documentation:
>>>>
>>>>
>>>> // Subscribe to 1 topic, with headersval df = spark
>>>>   .readStream
>>>>   .format("kafka")
>>>>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>>>>   .option("subscribe", "topic1")
>>>>   .option("includeHeaders", "true")
>>>>   .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
>>>>   .as[(String, String, Map)
>>>>
>>>> I am actually wondering what this code is doing since using Map without
>>>> key and value type is rather strange. And this is actually what I get when
>>>> I am trying to do this
>>>>
>>>>
>>>> The error messages make totally sense. Now I can try to add the type
>>>> parameters to Map, assuming that the key is a String type and the value a
>>>> byte array, in this case I get
>>>>
>>>> What seems to work is this.
>>>>
>>>> I will try to continue in this direction. I'll let you know about my
>>>> findings. The documentation is in any case wrong.
>>>>
>>>>
>>>> On Thu, 3 Dec 2020 at 18:22, <eugen.wintersberger@gmail.com> wrote:
>>>>
>>>> Hi folks,
>>>>   I am trying to read the message headers from a Kafka structured
>>>> stream which should be stored in a column named ``headers``.
>>>> I try something like this:
>>>>
>>>> val stream = sparkSession.readStream.format("kafka")......load()
>>>>
>>>> stream.map(row => {
>>>>
>>>>  ...
>>>>
>>>>  val headers = row.getAs[HeaderT]("headers")
>>>>
>>>> ....
>>>>
>>>> })
>>>>
>>>>
>>>> My question is: what would be *HeaderT*?
>>>>
>>>> Thanks in advance
>>>>
>>>>  Eugen
>>>>
>>>>
>>>>

Mime
View raw message