spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From German Schiavon <gschiavonsp...@gmail.com>
Subject Re: Apply window function on data consumed from Kafka topic
Date Tue, 15 Jun 2021 06:50:27 GMT
Hi,

If you want help I'd suggest copying the full code, you just shared the
config part.

On the other hand, if you are doing a project *now* I'd also suggest
using Structured
Streaming
<https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>,
I'm sure you would get better support than using DStreams.

Regards.

On Tue, 15 Jun 2021 at 08:39, Muhammed Favas <
favas.muhammed@expeedsoftware.com> wrote:

> Hi All,
>
>
>
> Can some one help me how to resolve this?. It is very important to achieve
> my project objective
>
>
>
> *Regards,*
>
> *Favas  *
>
>
>
> *From:* Muhammed Favas
> *Sent:* Thursday, June 10, 2021 15:26 PM
> *To:* user@spark.apache.org
> *Subject:* Apply window function on data consumed from Kafka topic
>
>
>
> Hi,
>
>
>
>                 I have a requirement to create a spark streaming job that
> get data from kafka broker and need to apply window function on the data
> coming into the spark context.
>
> This is how I connected to kafka from spark
>
>
>
> val kafkaParams = *Map*[String, Object](
>   "bootstrap.servers" -> "<my-srvername>",
>   "key.deserializer" -> * classOf*[StringDeserializer],
>   "value.deserializer" -> * classOf*[StringDeserializer],
>   "group.id" -> "7b37787c-20e7-4614-98ba-6f4212e07bf0",
>   "auto.offset.reset" -> "earliest",
>   "enable.auto.commit" -> (true: java.lang.Boolean)
> )
>
> val topics = *Array*("7b37787c-20e7-4614-98ba-6f4212e07bf0")
> val inputMsg = KafkaUtils.*createDirectStream*[String,String](
>   ssc,
>   *PreferConsistent*,
>   *Subscribe*[String, String](topics, kafkaParams)
> )
>
>
>
> The variable “inputMsg” is of type
> “InputDStream[ConsumerRecord[String,String]]”
>
> When I used window function “inputMsg.window(Minute(1))”, it throws an
> error like below
>
>
>
> ERROR DirectKafkaInputDStream: Kafka ConsumerRecord is not serializable.
> Use .map to extract fields before calling .persist or .window
>
>
>
> Can some one help me on how to use widow function on spark streaming using
> kafka?
>
>
>
> *Regards,*
>
> *Favas  *
>
>
>

Mime
View raw message