spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: what is the right syntax for self joins in Spark 2.3.0 ?
Date Tue, 13 Mar 2018 01:24:08 GMT
You have understood the problem right. However note that your
interpretation of the output *(K, leftValue, null), **(K, leftValue,
rightValue1), **(K, leftValue, rightValue2)* is subject to the knowledge of
the semantics of the join. That if you are processing the output rows
*manually*, you are being aware that the operator is a join where you can
make the semantics interpretation of *"null replaced by first match, then
all matches are just addition rows".* This is however not a general
solution for any sink, and for any operator. We need to find a way to
expose these semantics through the APIs such that a sink can use it without
the knowledge of exactly what operator is in the query writing to the sink.
Therefore we still need some work before we can do join in update mode
clearly.

Hope that makes it clear. :)

On Sat, Mar 10, 2018 at 12:14 AM, kant kodali <kanth909@gmail.com> wrote:

> I will give an attempt to answer this.
>
> since rightValue1 and rightValue2 have the same key "K"(two matches) why
> would it ever be the case *rightValue2* replacing *rightValue1* replacing *null?
> *Moreover, why does user need to care?
>
> The result in this case (after getting 2 matches) should be
>
> *(K, leftValue, rightValue1)*
> *(K, leftValue, rightValue2)*
>
> This basically means only one of them replaced the earlier null. which one
> of two? Depends on whichever arrived first. Other words, "null's" will be
> replaced by first matching row and subsequently, if there is a new matching
> row it will just be another row with the same key in the result table or if
> there a new unmatched row then the result table should have null's for the
> unmatched fields.
>
> From a user perspective, I believe just spitting out nulls for every
> trigger until there is a match and when there is match spitting out the
> joined rows should suffice isn't it?
>
> Sorry if my thoughts are too naive!
>
>
>
>
>
>
>
>
>
>
> On Thu, Mar 8, 2018 at 6:14 PM, Tathagata Das <tathagata.das1565@gmail.com
> > wrote:
>
>> This doc is unrelated to the stream-stream join we added in Structured
>> Streaming. :)
>>
>> That said we added append mode first because it easier to reason about
>> the semantics of append mode especially in the context of outer joins. You
>> output a row only when you know it wont be changed ever. The semantics of
>> update mode in outer joins is trickier to reason about and expose through
>> the APIs. Consider a left outer join. As soon as we get a left-side record
>> with a key K that does not have a match, do we output *(K, leftValue,
>> null)*? And if we do so, then later get 2 matches from the right side,
>> we have to output *(K, leftValue, rightValue1) and (K, leftValue,
>> rightValue2)*. But how do we convey that *rightValue1* and *rightValue2 *together
>> replace the earlier *null*, rather than *rightValue2* replacing
>> *rightValue1* replacing *null?*
>>
>> We will figure these out in future releases. For now, we have released
>> append mode, which allow quite a large range of use cases, including
>> multiple cascading joins.
>>
>> TD
>>
>>
>>
>> On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta <
>> gourav.sengupta@gmail.com> wrote:
>>
>>> super interesting.
>>>
>>> On Wed, Mar 7, 2018 at 11:44 AM, kant kodali <kanth909@gmail.com> wrote:
>>>
>>>> It looks to me that the StateStore described in this doc
>>>> <https://docs.google.com/document/d/1i528WI7KFica0Dg1LTQfdQMsW8ai3WDvHmUvkH1BKg4/edit>
Actually
>>>> has full outer join and every other join is a filter of that. Also the doc
>>>> talks about update mode but looks like Spark 2.3 ended up with append mode?
>>>> Anyways the moment it is in master I am ready to test so JIRA tickets on
>>>> this would help to keep track. please let me know.
>>>>
>>>> Thanks!
>>>>
>>>> On Tue, Mar 6, 2018 at 9:16 PM, kant kodali <kanth909@gmail.com> wrote:
>>>>
>>>>> Sorry I meant Spark 2.4 in my previous email
>>>>>
>>>>> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <kanth909@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi TD,
>>>>>>
>>>>>> I agree I think we are better off either with a full fix or no fix.
I
>>>>>> am ok with the complete fix being available in master or some branch.
I
>>>>>> guess the solution for me is to just build from the source.
>>>>>>
>>>>>> On a similar note, I am not finding any JIRA tickets related to full
>>>>>> outer joins and update mode for maybe say Spark 2.3. I wonder how
hard is
>>>>>> it two implement both of these? It turns out the update mode and
full outer
>>>>>> join is very useful and required in my case, therefore, I'm just
asking.
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>
>>>>>>> I thought about it.
>>>>>>> I am not 100% sure whether this fix should go into 2.3.1.
>>>>>>>
>>>>>>> There are two parts to this bug fix to enable self-joins.
>>>>>>>
>>>>>>> 1. Enabling deduping of leaf logical nodes by extending
>>>>>>> MultInstanceRelation
>>>>>>>   - This is safe to be backported into the 2.3 branch as it does
not
>>>>>>> touch production code paths.
>>>>>>>
>>>>>>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>>>>>>> micro-batch plan is spliced into the streaming plan.
>>>>>>>   - This touches core production code paths and therefore, may
not
>>>>>>> safe to backport.
>>>>>>>
>>>>>>> Part 1 enables self-joins in all but a small fraction of self-join
>>>>>>> queries. That small fraction can produce incorrect results, and
part 2
>>>>>>> avoids that.
>>>>>>>
>>>>>>> So for 2.3.1, we can enable self-joins by merging only part 1,
but
>>>>>>> it can give wrong results in some cases. I think that is strictly
worse
>>>>>>> than no fix.
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <kanth909@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi TD,
>>>>>>>>
>>>>>>>> I pulled your commit that is listed on this ticket
>>>>>>>> https://issues.apache.org/jira/browse/SPARK-23406 specifically
I
>>>>>>>> did the following steps and self joins work after I cherry-pick
your
>>>>>>>> commit! Good Job! I was hoping it will be part of 2.3.0 but
looks like it
>>>>>>>> is targeted for 2.3.1 :(
>>>>>>>>
>>>>>>>> git clone https://github.com/apache/spark.gitcd spark
>>>>>>>> git fetch
>>>>>>>> git checkout branch-2.3
>>>>>>>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
>>>>>>>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
>>>>>>>> ./build/mvn -DskipTests compile
>>>>>>>> ./dev/make-distribution.sh --name custom-spark --pip --r
--tgz -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
>>>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hey,
>>>>>>>>>
>>>>>>>>> Thanks for testing out stream-stream joins and reporting
this
>>>>>>>>> issue. I am going to take a look at this.
>>>>>>>>>
>>>>>>>>> TD
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth909@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> if I change it to the below code it works. However,
I don't
>>>>>>>>>> believe it is the solution I am looking for. I want
to be able to do it in
>>>>>>>>>> raw SQL and moreover, If a user gives a big chained
raw spark SQL join
>>>>>>>>>> query I am not even sure how to make copies of the
dataframe to achieve the
>>>>>>>>>> self-join. Is there any other way here?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> import org.apache.spark.sql.streaming.Trigger
>>>>>>>>>>
>>>>>>>>>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>>>>>>> val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>>>>>>>
>>>>>>>>>> jdf.createOrReplaceTempView("table")
>>>>>>>>>> jdf1.createOrReplaceTempView("table")
>>>>>>>>>>
>>>>>>>>>> val resultdf = spark.sql("select * from table inner
join table1 on table.offset=table1.offset")
>>>>>>>>>>
>>>>>>>>>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth909@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> If I change it to this
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali
<kanth909@gmail.com
>>>>>>>>>>> > wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>
>>>>>>>>>>>> I have the following code
>>>>>>>>>>>>
>>>>>>>>>>>> import org.apache.spark.sql.streaming.Trigger
>>>>>>>>>>>>
>>>>>>>>>>>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>>>>>>>>>
>>>>>>>>>>>> jdf.createOrReplaceTempView("table")
>>>>>>>>>>>>
>>>>>>>>>>>> val resultdf = spark.sql("select * from table
as x inner join table as y on x.offset=y.offset")
>>>>>>>>>>>>
>>>>>>>>>>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()
>>>>>>>>>>>>
>>>>>>>>>>>> and I get the following exception.
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.sql.AnalysisException: cannot
resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic,
x.timestamp, x.partition]; line 1 pos 50;
>>>>>>>>>>>> 'Project [*]
>>>>>>>>>>>> +- 'Join Inner, ('x.offset = 'y.offset)
>>>>>>>>>>>>    :- SubqueryAlias x
>>>>>>>>>>>>    :  +- SubqueryAlias table
>>>>>>>>>>>>    :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None),
kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
>>>>>>>>>>>>    +- SubqueryAlias y
>>>>>>>>>>>>       +- SubqueryAlias table
>>>>>>>>>>>>          +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None),
kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
>>>>>>>>>>>>
>>>>>>>>>>>> any idea whats wrong here?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message