spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabor Somogyi <gabor.g.somo...@gmail.com>
Subject Re: Structured Streaming Spark 3.0.1
Date Thu, 21 Jan 2021 13:22:32 GMT
If you have an exact version which version of the google connector is used
then the source can be checked to see what really happened:
https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/83a6c9809ad49a44895d59558e666e5fc183e0bf/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopOutputStream.java#L114

The linked code is the master but it just doesn't fit...

G


On Thu, Jan 21, 2021 at 9:18 AM Gabor Somogyi <gabor.g.somogyi@gmail.com>
wrote:

> I've doubled checked this and came to the same conclusion just like
> Jungtaek.
> I've added a comment to the stackoverflow post to reach more poeple with
> the answer.
>
> G
>
>
> On Thu, Jan 21, 2021 at 6:53 AM Jungtaek Lim <kabhwan.opensource@gmail.com>
> wrote:
>
>> I quickly looked into the attached log in SO post, and the problem
>> doesn't seem to be related to Kafka. The error stack trace is from
>> checkpointing to GCS, and the implementation of OutputStream for GCS seems
>> to be provided with Google.
>>
>> Could you please elaborate the stack trace or upload the log with
>> redacting secure texts?
>>
>> On Thu, Jan 21, 2021 at 2:38 PM German Schiavon <gschiavonspark@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I couldn't reproduce this error :/ I wonder if there is something else
>>> underline causing it...
>>>
>>> *Input*
>>> ➜  kafka_2.12-2.5.0 ./bin/kafka-console-producer.sh --bootstrap-server
>>> localhost:9092 --topic test1
>>> {"name": "pedro", "age": 50}
>>> >{"name": "pedro", "age": 50}
>>> >{"name": "pedro", "age": 50}
>>> >{"name": "pedro", "age": 50}
>>>
>>> *Output*
>>> ➜  kafka_2.12-2.5.0 ./bin/kafka-console-consumer.sh --bootstrap-server
>>> localhost:9092 --topic sink
>>> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":1}
>>> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":2}
>>> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":3}
>>> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":4}
>>>
>>>
>>> val rawDF = spark
>>>   .readStream
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "localhost:9092")
>>>   .option("subscribe", "test1")
>>>   .load
>>>   .selectExpr("CAST(value AS STRING)")
>>>
>>>
>>> val groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count"))
>>> val kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value")
>>>
>>> kafka_stream_output
>>>   .writeStream
>>>   .format("kafka")
>>>   .outputMode("update")
>>>   .option("kafka.bootstrap.servers", "localhost:9092")
>>>   .option("topic", "sink")
>>>   .option("checkpointLocation", "/tmp/check")
>>>   .start()
>>>
>>> spark.streams.awaitAnyTermination()
>>>
>>>
>>> On Wed, 20 Jan 2021 at 23:22, gshen <gshen92@gmail.com> wrote:
>>>
>>>> This SO post is pretty much the exact same issue:
>>>>
>>>>
>>>> https://stackoverflow.com/questions/59962680/spark-structured-streaming-error-while-sending-aggregated-result-to-kafka-topic
>>>>
>>>> The user mentions it's an issue with
>>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>
>>>>

Mime
View raw message