spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kant kodali <kanth...@gmail.com>
Subject Re: what is the right syntax for self joins in Spark 2.3.0 ?
Date Wed, 07 Mar 2018 11:44:30 GMT
It looks to me that the StateStore described in this doc
<https://docs.google.com/document/d/1i528WI7KFica0Dg1LTQfdQMsW8ai3WDvHmUvkH1BKg4/edit>
Actually
has full outer join and every other join is a filter of that. Also the doc
talks about update mode but looks like Spark 2.3 ended up with append mode?
Anyways the moment it is in master I am ready to test so JIRA tickets on
this would help to keep track. please let me know.

Thanks!

On Tue, Mar 6, 2018 at 9:16 PM, kant kodali <kanth909@gmail.com> wrote:

> Sorry I meant Spark 2.4 in my previous email
>
> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <kanth909@gmail.com> wrote:
>
>> Hi TD,
>>
>> I agree I think we are better off either with a full fix or no fix. I am
>> ok with the complete fix being available in master or some branch. I guess
>> the solution for me is to just build from the source.
>>
>> On a similar note, I am not finding any JIRA tickets related to full
>> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
>> it two implement both of these? It turns out the update mode and full outer
>> join is very useful and required in my case, therefore, I'm just asking.
>>
>> Thanks!
>>
>> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> I thought about it.
>>> I am not 100% sure whether this fix should go into 2.3.1.
>>>
>>> There are two parts to this bug fix to enable self-joins.
>>>
>>> 1. Enabling deduping of leaf logical nodes by extending
>>> MultInstanceRelation
>>>   - This is safe to be backported into the 2.3 branch as it does not
>>> touch production code paths.
>>>
>>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>>> micro-batch plan is spliced into the streaming plan.
>>>   - This touches core production code paths and therefore, may not safe
>>> to backport.
>>>
>>> Part 1 enables self-joins in all but a small fraction of self-join
>>> queries. That small fraction can produce incorrect results, and part 2
>>> avoids that.
>>>
>>> So for 2.3.1, we can enable self-joins by merging only part 1, but it
>>> can give wrong results in some cases. I think that is strictly worse than
>>> no fix.
>>>
>>> TD
>>>
>>>
>>>
>>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <kanth909@gmail.com> wrote:
>>>
>>>> Hi TD,
>>>>
>>>> I pulled your commit that is listed on this ticket
>>>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did
>>>> the following steps and self joins work after I cherry-pick your commit!
>>>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
>>>> targeted for 2.3.1 :(
>>>>
>>>> git clone https://github.com/apache/spark.gitcd spark
>>>> git fetch
>>>> git checkout branch-2.3
>>>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
>>>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
>>>> ./build/mvn -DskipTests compile
>>>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7
-Phive -Phive-thriftserver -Pmesos -Pyarn
>>>>
>>>>
>>>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
>>>> tathagata.das1565@gmail.com> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> Thanks for testing out stream-stream joins and reporting this issue.
I
>>>>> am going to take a look at this.
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth909@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> if I change it to the below code it works. However, I don't believe
>>>>>> it is the solution I am looking for. I want to be able to do it in
raw
>>>>>> SQL and moreover, If a user gives a big chained raw spark SQL join
query I
>>>>>> am not even sure how to make copies of the dataframe to achieve the
>>>>>> self-join. Is there any other way here?
>>>>>>
>>>>>>
>>>>>>
>>>>>> import org.apache.spark.sql.streaming.Trigger
>>>>>>
>>>>>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>>> val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>>>
>>>>>> jdf.createOrReplaceTempView("table")
>>>>>> jdf1.createOrReplaceTempView("table")
>>>>>>
>>>>>> val resultdf = spark.sql("select * from table inner join table1 on
table.offset=table1.offset")
>>>>>>
>>>>>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth909@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> If I change it to this
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth909@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I have the following code
>>>>>>>>
>>>>>>>> import org.apache.spark.sql.streaming.Trigger
>>>>>>>>
>>>>>>>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>>>>>
>>>>>>>> jdf.createOrReplaceTempView("table")
>>>>>>>>
>>>>>>>> val resultdf = spark.sql("select * from table as x inner
join table as y on x.offset=y.offset")
>>>>>>>>
>>>>>>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()
>>>>>>>>
>>>>>>>> and I get the following exception.
>>>>>>>>
>>>>>>>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`'
given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition];
line 1 pos 50;
>>>>>>>> 'Project [*]
>>>>>>>> +- 'Join Inner, ('x.offset = 'y.offset)
>>>>>>>>    :- SubqueryAlias x
>>>>>>>>    :  +- SubqueryAlias table
>>>>>>>>    :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None),
kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
>>>>>>>>    +- SubqueryAlias y
>>>>>>>>       +- SubqueryAlias table
>>>>>>>>          +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None),
kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
>>>>>>>>
>>>>>>>> any idea whats wrong here?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message