spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kant kodali <kanth...@gmail.com>
Subject Re: what is the right syntax for self joins in Spark 2.3.0 ?
Date Wed, 07 Mar 2018 05:16:25 GMT
Sorry I meant Spark 2.4 in my previous email

On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <kanth909@gmail.com> wrote:

> Hi TD,
>
> I agree I think we are better off either with a full fix or no fix. I am
> ok with the complete fix being available in master or some branch. I guess
> the solution for me is to just build from the source.
>
> On a similar note, I am not finding any JIRA tickets related to full outer
> joins and update mode for maybe say Spark 2.3. I wonder how hard is it two
> implement both of these? It turns out the update mode and full outer join
> is very useful and required in my case, therefore, I'm just asking.
>
> Thanks!
>
> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <tathagata.das1565@gmail.com
> > wrote:
>
>> I thought about it.
>> I am not 100% sure whether this fix should go into 2.3.1.
>>
>> There are two parts to this bug fix to enable self-joins.
>>
>> 1. Enabling deduping of leaf logical nodes by extending
>> MultInstanceRelation
>>   - This is safe to be backported into the 2.3 branch as it does not
>> touch production code paths.
>>
>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>> micro-batch plan is spliced into the streaming plan.
>>   - This touches core production code paths and therefore, may not safe
>> to backport.
>>
>> Part 1 enables self-joins in all but a small fraction of self-join
>> queries. That small fraction can produce incorrect results, and part 2
>> avoids that.
>>
>> So for 2.3.1, we can enable self-joins by merging only part 1, but it can
>> give wrong results in some cases. I think that is strictly worse than no
>> fix.
>>
>> TD
>>
>>
>>
>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <kanth909@gmail.com> wrote:
>>
>>> Hi TD,
>>>
>>> I pulled your commit that is listed on this ticket
>>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did
>>> the following steps and self joins work after I cherry-pick your commit!
>>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
>>> targeted for 2.3.1 :(
>>>
>>> git clone https://github.com/apache/spark.gitcd spark
>>> git fetch
>>> git checkout branch-2.3
>>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
>>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
>>> ./build/mvn -DskipTests compile
>>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7
-Phive -Phive-thriftserver -Pmesos -Pyarn
>>>
>>>
>>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
>>> tathagata.das1565@gmail.com> wrote:
>>>
>>>> Hey,
>>>>
>>>> Thanks for testing out stream-stream joins and reporting this issue. I
>>>> am going to take a look at this.
>>>>
>>>> TD
>>>>
>>>>
>>>>
>>>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth909@gmail.com>
>>>> wrote:
>>>>
>>>>> if I change it to the below code it works. However, I don't believe it
>>>>> is the solution I am looking for. I want to be able to do it in raw SQL
and
>>>>> moreover, If a user gives a big chained raw spark SQL join query I am
not
>>>>> even sure how to make copies of the dataframe to achieve the self-join.
Is
>>>>> there any other way here?
>>>>>
>>>>>
>>>>>
>>>>> import org.apache.spark.sql.streaming.Trigger
>>>>>
>>>>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>> val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>>
>>>>> jdf.createOrReplaceTempView("table")
>>>>> jdf1.createOrReplaceTempView("table")
>>>>>
>>>>> val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")
>>>>>
>>>>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()
>>>>>
>>>>>
>>>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth909@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> If I change it to this
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth909@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I have the following code
>>>>>>>
>>>>>>> import org.apache.spark.sql.streaming.Trigger
>>>>>>>
>>>>>>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>>>>>
>>>>>>> jdf.createOrReplaceTempView("table")
>>>>>>>
>>>>>>> val resultdf = spark.sql("select * from table as x inner join
table as y on x.offset=y.offset")
>>>>>>>
>>>>>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()
>>>>>>>
>>>>>>> and I get the following exception.
>>>>>>>
>>>>>>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`'
given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition];
line 1 pos 50;
>>>>>>> 'Project [*]
>>>>>>> +- 'Join Inner, ('x.offset = 'y.offset)
>>>>>>>    :- SubqueryAlias x
>>>>>>>    :  +- SubqueryAlias table
>>>>>>>    :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None),
kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
>>>>>>>    +- SubqueryAlias y
>>>>>>>       +- SubqueryAlias table
>>>>>>>          +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None),
kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
>>>>>>>
>>>>>>> any idea whats wrong here?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message