spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: what is the right syntax for self joins in Spark 2.3.0 ?
Date Fri, 09 Mar 2018 02:14:47 GMT
This doc is unrelated to the stream-stream join we added in Structured
Streaming. :)

That said we added append mode first because it easier to reason about the
semantics of append mode especially in the context of outer joins. You
output a row only when you know it wont be changed ever. The semantics of
update mode in outer joins is trickier to reason about and expose through
the APIs. Consider a left outer join. As soon as we get a left-side record
with a key K that does not have a match, do we output *(K, leftValue, null)*?
And if we do so, then later get 2 matches from the right side, we have to
output *(K, leftValue, rightValue1) and (K, leftValue, rightValue2)*. But
how do we convey that *rightValue1* and *rightValue2 *together replace the
earlier *null*, rather than *rightValue2* replacing *rightValue1* replacing
*null?*

We will figure these out in future releases. For now, we have released
append mode, which allow quite a large range of use cases, including
multiple cascading joins.

TD



On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta <gourav.sengupta@gmail.com>
wrote:

> super interesting.
>
> On Wed, Mar 7, 2018 at 11:44 AM, kant kodali <kanth909@gmail.com> wrote:
>
>> It looks to me that the StateStore described in this doc
>> <https://docs.google.com/document/d/1i528WI7KFica0Dg1LTQfdQMsW8ai3WDvHmUvkH1BKg4/edit>
Actually
>> has full outer join and every other join is a filter of that. Also the doc
>> talks about update mode but looks like Spark 2.3 ended up with append mode?
>> Anyways the moment it is in master I am ready to test so JIRA tickets on
>> this would help to keep track. please let me know.
>>
>> Thanks!
>>
>> On Tue, Mar 6, 2018 at 9:16 PM, kant kodali <kanth909@gmail.com> wrote:
>>
>>> Sorry I meant Spark 2.4 in my previous email
>>>
>>> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <kanth909@gmail.com> wrote:
>>>
>>>> Hi TD,
>>>>
>>>> I agree I think we are better off either with a full fix or no fix. I
>>>> am ok with the complete fix being available in master or some branch. I
>>>> guess the solution for me is to just build from the source.
>>>>
>>>> On a similar note, I am not finding any JIRA tickets related to full
>>>> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
>>>> it two implement both of these? It turns out the update mode and full outer
>>>> join is very useful and required in my case, therefore, I'm just asking.
>>>>
>>>> Thanks!
>>>>
>>>> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
>>>> tathagata.das1565@gmail.com> wrote:
>>>>
>>>>> I thought about it.
>>>>> I am not 100% sure whether this fix should go into 2.3.1.
>>>>>
>>>>> There are two parts to this bug fix to enable self-joins.
>>>>>
>>>>> 1. Enabling deduping of leaf logical nodes by extending
>>>>> MultInstanceRelation
>>>>>   - This is safe to be backported into the 2.3 branch as it does not
>>>>> touch production code paths.
>>>>>
>>>>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>>>>> micro-batch plan is spliced into the streaming plan.
>>>>>   - This touches core production code paths and therefore, may not
>>>>> safe to backport.
>>>>>
>>>>> Part 1 enables self-joins in all but a small fraction of self-join
>>>>> queries. That small fraction can produce incorrect results, and part
2
>>>>> avoids that.
>>>>>
>>>>> So for 2.3.1, we can enable self-joins by merging only part 1, but it
>>>>> can give wrong results in some cases. I think that is strictly worse
than
>>>>> no fix.
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <kanth909@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi TD,
>>>>>>
>>>>>> I pulled your commit that is listed on this ticket
>>>>>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I
did
>>>>>> the following steps and self joins work after I cherry-pick your
commit!
>>>>>> Good Job! I was hoping it will be part of 2.3.0 but looks like it
is
>>>>>> targeted for 2.3.1 :(
>>>>>>
>>>>>> git clone https://github.com/apache/spark.gitcd spark
>>>>>> git fetch
>>>>>> git checkout branch-2.3
>>>>>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
>>>>>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
>>>>>> ./build/mvn -DskipTests compile
>>>>>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr
-Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>
>>>>>>> Hey,
>>>>>>>
>>>>>>> Thanks for testing out stream-stream joins and reporting this
issue.
>>>>>>> I am going to take a look at this.
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth909@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> if I change it to the below code it works. However, I don't
believe
>>>>>>>> it is the solution I am looking for. I want to be able to
do it in raw
>>>>>>>> SQL and moreover, If a user gives a big chained raw spark
SQL join query I
>>>>>>>> am not even sure how to make copies of the dataframe to achieve
the
>>>>>>>> self-join. Is there any other way here?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> import org.apache.spark.sql.streaming.Trigger
>>>>>>>>
>>>>>>>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>>>>> val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>>>>>
>>>>>>>> jdf.createOrReplaceTempView("table")
>>>>>>>> jdf1.createOrReplaceTempView("table")
>>>>>>>>
>>>>>>>> val resultdf = spark.sql("select * from table inner join
table1 on table.offset=table1.offset")
>>>>>>>>
>>>>>>>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth909@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> If I change it to this
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth909@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I have the following code
>>>>>>>>>>
>>>>>>>>>> import org.apache.spark.sql.streaming.Trigger
>>>>>>>>>>
>>>>>>>>>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>>>>>>>
>>>>>>>>>> jdf.createOrReplaceTempView("table")
>>>>>>>>>>
>>>>>>>>>> val resultdf = spark.sql("select * from table as
x inner join table as y on x.offset=y.offset")
>>>>>>>>>>
>>>>>>>>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()
>>>>>>>>>>
>>>>>>>>>> and I get the following exception.
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.sql.AnalysisException: cannot resolve
'`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp,
x.partition]; line 1 pos 50;
>>>>>>>>>> 'Project [*]
>>>>>>>>>> +- 'Join Inner, ('x.offset = 'y.offset)
>>>>>>>>>>    :- SubqueryAlias x
>>>>>>>>>>    :  +- SubqueryAlias table
>>>>>>>>>>    :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None),
kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
>>>>>>>>>>    +- SubqueryAlias y
>>>>>>>>>>       +- SubqueryAlias table
>>>>>>>>>>          +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None),
kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
>>>>>>>>>>
>>>>>>>>>> any idea whats wrong here?
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message