spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gourav Sengupta <gourav.sengu...@gmail.com>
Subject Re: what is the right syntax for self joins in Spark 2.3.0 ?
Date Thu, 08 Mar 2018 17:18:43 GMT
super interesting.

On Wed, Mar 7, 2018 at 11:44 AM, kant kodali <kanth909@gmail.com> wrote:

> It looks to me that the StateStore described in this doc
> <https://docs.google.com/document/d/1i528WI7KFica0Dg1LTQfdQMsW8ai3WDvHmUvkH1BKg4/edit>
Actually
> has full outer join and every other join is a filter of that. Also the doc
> talks about update mode but looks like Spark 2.3 ended up with append mode?
> Anyways the moment it is in master I am ready to test so JIRA tickets on
> this would help to keep track. please let me know.
>
> Thanks!
>
> On Tue, Mar 6, 2018 at 9:16 PM, kant kodali <kanth909@gmail.com> wrote:
>
>> Sorry I meant Spark 2.4 in my previous email
>>
>> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <kanth909@gmail.com> wrote:
>>
>>> Hi TD,
>>>
>>> I agree I think we are better off either with a full fix or no fix. I am
>>> ok with the complete fix being available in master or some branch. I guess
>>> the solution for me is to just build from the source.
>>>
>>> On a similar note, I am not finding any JIRA tickets related to full
>>> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
>>> it two implement both of these? It turns out the update mode and full outer
>>> join is very useful and required in my case, therefore, I'm just asking.
>>>
>>> Thanks!
>>>
>>> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
>>> tathagata.das1565@gmail.com> wrote:
>>>
>>>> I thought about it.
>>>> I am not 100% sure whether this fix should go into 2.3.1.
>>>>
>>>> There are two parts to this bug fix to enable self-joins.
>>>>
>>>> 1. Enabling deduping of leaf logical nodes by extending
>>>> MultInstanceRelation
>>>>   - This is safe to be backported into the 2.3 branch as it does not
>>>> touch production code paths.
>>>>
>>>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>>>> micro-batch plan is spliced into the streaming plan.
>>>>   - This touches core production code paths and therefore, may not safe
>>>> to backport.
>>>>
>>>> Part 1 enables self-joins in all but a small fraction of self-join
>>>> queries. That small fraction can produce incorrect results, and part 2
>>>> avoids that.
>>>>
>>>> So for 2.3.1, we can enable self-joins by merging only part 1, but it
>>>> can give wrong results in some cases. I think that is strictly worse than
>>>> no fix.
>>>>
>>>> TD
>>>>
>>>>
>>>>
>>>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <kanth909@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi TD,
>>>>>
>>>>> I pulled your commit that is listed on this ticket
>>>>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did
>>>>> the following steps and self joins work after I cherry-pick your commit!
>>>>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
>>>>> targeted for 2.3.1 :(
>>>>>
>>>>> git clone https://github.com/apache/spark.gitcd spark
>>>>> git fetch
>>>>> git checkout branch-2.3
>>>>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
>>>>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
>>>>> ./build/mvn -DskipTests compile
>>>>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr
-Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>>>>>
>>>>>
>>>>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> Thanks for testing out stream-stream joins and reporting this issue.
>>>>>> I am going to take a look at this.
>>>>>>
>>>>>> TD
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth909@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> if I change it to the below code it works. However, I don't believe
>>>>>>> it is the solution I am looking for. I want to be able to do
it in raw
>>>>>>> SQL and moreover, If a user gives a big chained raw spark SQL
join query I
>>>>>>> am not even sure how to make copies of the dataframe to achieve
the
>>>>>>> self-join. Is there any other way here?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> import org.apache.spark.sql.streaming.Trigger
>>>>>>>
>>>>>>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>>>> val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>>>>
>>>>>>> jdf.createOrReplaceTempView("table")
>>>>>>> jdf1.createOrReplaceTempView("table")
>>>>>>>
>>>>>>> val resultdf = spark.sql("select * from table inner join table1
on table.offset=table1.offset")
>>>>>>>
>>>>>>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth909@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> If I change it to this
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth909@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I have the following code
>>>>>>>>>
>>>>>>>>> import org.apache.spark.sql.streaming.Trigger
>>>>>>>>>
>>>>>>>>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>>>>>>
>>>>>>>>> jdf.createOrReplaceTempView("table")
>>>>>>>>>
>>>>>>>>> val resultdf = spark.sql("select * from table as x inner
join table as y on x.offset=y.offset")
>>>>>>>>>
>>>>>>>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()
>>>>>>>>>
>>>>>>>>> and I get the following exception.
>>>>>>>>>
>>>>>>>>> org.apache.spark.sql.AnalysisException: cannot resolve
'`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp,
x.partition]; line 1 pos 50;
>>>>>>>>> 'Project [*]
>>>>>>>>> +- 'Join Inner, ('x.offset = 'y.offset)
>>>>>>>>>    :- SubqueryAlias x
>>>>>>>>>    :  +- SubqueryAlias table
>>>>>>>>>    :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None),
kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
>>>>>>>>>    +- SubqueryAlias y
>>>>>>>>>       +- SubqueryAlias table
>>>>>>>>>          +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None),
kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
>>>>>>>>>
>>>>>>>>> any idea whats wrong here?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message