spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabor Somogyi <gabor.g.somo...@gmail.com>
Subject Re: Cannot perform operation after producer has been closed
Date Wed, 09 Dec 2020 20:02:49 GMT
Good to hear :)

On Wed, Dec 9, 2020 at 7:44 PM Eric Beabes <mailinglists19@gmail.com> wrote:

> Gabor,
>
> I waited to revert for a long time to ensure that this is working as
> expected. I am VERY HAPPY to tell you that this configuration change has
> fixed this issue! Not a single task has failed for over 2 weeks!
>
> THANKS once again. Hopefully, at some point we can switch to Spark 3.0.
>
>
> On Fri, Nov 20, 2020 at 7:30 AM Gabor Somogyi <gabor.g.somogyi@gmail.com>
> wrote:
>
>> Happy that saved some time for you :)
>> We've invested quite an effort in the latest releases into streaming and
>> hope there will be less and less headaches like this.
>>
>> On Thu, Nov 19, 2020 at 5:55 PM Eric Beabes <mailinglists19@gmail.com>
>> wrote:
>>
>>> THANK YOU SO MUCH! Will try it out & revert.
>>>
>>> On Thu, Nov 19, 2020 at 8:18 AM Gabor Somogyi <gabor.g.somogyi@gmail.com>
>>> wrote:
>>>
>>>> "spark.kafka.producer.cache.timeout" is available since 2.2.1 which can
>>>> be increased as a temporary workaround.
>>>> This is not super elegant but works which gives enough time to migrate
>>>> to Spark 3.
>>>>
>>>>
>>>> On Wed, Nov 18, 2020 at 11:12 PM Eric Beabes <mailinglists19@gmail.com>
>>>> wrote:
>>>>
>>>>> I must say.. *Spark has let me down in this case*. I am surprised an
>>>>> important issue like this hasn't been fixed in Spark 2.4.
>>>>>
>>>>> I am fighting a battle of 'Spark Structured Streaming' Vs 'Flink' at
>>>>> work & now because Spark 2.4 can't handle this *I've been asked to
>>>>> rewrite the code in Flink*.
>>>>>
>>>>> Moving to Spark 3.0 is not an easy option 'cause Cloudera 6.2 doesn't
>>>>> have a Spark 3.0 parcel!!!! So we can't upgrade to 3.0.
>>>>>
>>>>> So sad. Let me ask one more time. *Is there no way to fix this in
>>>>> Spark 2.4?*
>>>>>
>>>>>
>>>>> On Tue, Nov 10, 2020 at 11:33 AM Eric Beabes <mailinglists19@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> BTW, we are seeing this message as well: *"org.apache.kafka.common.KafkaException:
>>>>>> Producer** closed while send in progress"*. I am assuming this
>>>>>> happens because of the previous issue.."producer has been closed",
right?
>>>>>> Or are they unrelated? Please advise. Thanks.
>>>>>>
>>>>>> On Tue, Nov 10, 2020 at 11:17 AM Eric Beabes <
>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for the reply. We are on Spark 2.4. Is there no way to
get
>>>>>>> this fixed in Spark 2.4?
>>>>>>>
>>>>>>> On Mon, Nov 2, 2020 at 8:32 PM Jungtaek Lim <
>>>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>>>
>>>>>>>> Which Spark version do you use? There's a known issue on
Kafka
>>>>>>>> producer pool in Spark 2.x which was fixed in Spark 3.0,
so you'd like to
>>>>>>>> check whether your case is bound to the known issue or not.
>>>>>>>>
>>>>>>>> https://issues.apache.org/jira/browse/SPARK-21869
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Nov 3, 2020 at 1:53 AM Eric Beabes <
>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I know this is related to Kafka but it happens during
the Spark
>>>>>>>>> Structured Streaming job that's why I am asking on this
mailing list.
>>>>>>>>>
>>>>>>>>> How would you debug this or get around this in Spark
Structured
>>>>>>>>> Streaming? Any tips would be appreciated. Thanks.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> java.lang.IllegalStateException: Cannot perform operation
after
>>>>>>>>> producer has been closed at
>>>>>>>>> org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:853)
>>>>>>>>> at
>>>>>>>>> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:862)
>>>>>>>>> at
>>>>>>>>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.sql.kafka010.KafkaRowWriter.sendRow(KafkaWriteTask.scala:92)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.sql.kafka010.KafkaStreamDataWriter.write(KafkaStreamWriter.scala:95)
>>>>>>>>>
>>>>>>>>

Mime
View raw message