spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabor Somogyi <gabor.g.somo...@gmail.com>
Subject Re: Structured Streaming Spark 3.0.1
Date Thu, 21 Jan 2021 08:18:51 GMT
I've doubled checked this and came to the same conclusion just like
Jungtaek.
I've added a comment to the stackoverflow post to reach more poeple with
the answer.

G


On Thu, Jan 21, 2021 at 6:53 AM Jungtaek Lim <kabhwan.opensource@gmail.com>
wrote:

> I quickly looked into the attached log in SO post, and the problem doesn't
> seem to be related to Kafka. The error stack trace is from checkpointing to
> GCS, and the implementation of OutputStream for GCS seems to be provided
> with Google.
>
> Could you please elaborate the stack trace or upload the log with
> redacting secure texts?
>
> On Thu, Jan 21, 2021 at 2:38 PM German Schiavon <gschiavonspark@gmail.com>
> wrote:
>
>> Hi,
>>
>> I couldn't reproduce this error :/ I wonder if there is something else
>> underline causing it...
>>
>> *Input*
>> ➜  kafka_2.12-2.5.0 ./bin/kafka-console-producer.sh --bootstrap-server
>> localhost:9092 --topic test1
>> {"name": "pedro", "age": 50}
>> >{"name": "pedro", "age": 50}
>> >{"name": "pedro", "age": 50}
>> >{"name": "pedro", "age": 50}
>>
>> *Output*
>> ➜  kafka_2.12-2.5.0 ./bin/kafka-console-consumer.sh --bootstrap-server
>> localhost:9092 --topic sink
>> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":1}
>> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":2}
>> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":3}
>> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":4}
>>
>>
>> val rawDF = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "localhost:9092")
>>   .option("subscribe", "test1")
>>   .load
>>   .selectExpr("CAST(value AS STRING)")
>>
>>
>> val groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count"))
>> val kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value")
>>
>> kafka_stream_output
>>   .writeStream
>>   .format("kafka")
>>   .outputMode("update")
>>   .option("kafka.bootstrap.servers", "localhost:9092")
>>   .option("topic", "sink")
>>   .option("checkpointLocation", "/tmp/check")
>>   .start()
>>
>> spark.streams.awaitAnyTermination()
>>
>>
>> On Wed, 20 Jan 2021 at 23:22, gshen <gshen92@gmail.com> wrote:
>>
>>> This SO post is pretty much the exact same issue:
>>>
>>>
>>> https://stackoverflow.com/questions/59962680/spark-structured-streaming-error-while-sending-aggregated-result-to-kafka-topic
>>>
>>> The user mentions it's an issue with
>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>
>>>

Mime
View raw message