spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sean Owen <sro...@gmail.com>
Subject Re: Unpersist return type
Date Thu, 22 Oct 2020 14:33:02 GMT
That's a compile error. If it said this were ambiguous, I'd say this is
probably another instance where the legacy overloads for Java become
ambiguous in 2.12 / 3.0 so you have to cast your function to the specific
Scala overload. That's not quite the error though, but, might try it?
As you say you can also put in "Unit" at the end to 'fix' it as a
workaround but shouldn't be necessary.
I'm not sure why the second overload doesn't apply as DataFrame is a
Dataset[Row].

On Thu, Oct 22, 2020 at 9:02 AM German Schiavon <gschiavonspark@gmail.com>
wrote:

> Fair enough.
>
> Sorry it does compile but when you run it, it fails.
>
>
> https://stackoverflow.com/questions/63642364/how-to-use-foreachbatch-batchdf-unpersist-appropriately-in-structured-streamin
>
> [image: Captura de pantalla 2020-10-22 a las 16.01.33.png]
>
> On Thu, 22 Oct 2020 at 15:53, Sean Owen <srowen@gmail.com> wrote:
>
>> Probably for purposes of chaining, though won't be very useful here. Like
>> df.unpersist().cache(... some other settings ...)
>>
>> foreachBatch wants a function that evaluates to Unit, but this qualifies
>> - doesn't matter what the value of the block is, if it's ignored.
>> This does seem to compile; are you sure? what error? may not be related
>> to that, quite.
>>
>>
>> On Thu, Oct 22, 2020 at 5:40 AM German Schiavon <gschiavonspark@gmail.com>
>> wrote:
>>
>>> Hello!
>>>
>>> I'd like to ask if there is any reason to return *type *when calling
>>> *dataframe.unpersist*
>>>
>>> def unpersist(blocking: Boolean): this.type = {
>>>   sparkSession.sharedState.cacheManager.uncacheQuery(
>>>     sparkSession, logicalPlan, cascade = false, blocking)
>>>   this
>>> }
>>>
>>>
>>> Just pointing it out because this example from the docs don't compile
>>> since unpersist() is not Unit
>>>
>>> streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
>>>   batchDF.persist()
>>>   batchDF.write.format(...).save(...)  // location 1
>>>   batchDF.write.format(...).save(...)  // location 2
>>>   batchDF.unpersist()}
>>>
>>>
>>> Thanks!
>>>
>>

Mime
View raw message