Hi TD,

I agree I think we are better off either with a full fix or no fix. I am ok with the complete fix being available in master or some branch. I guess the solution for me is to just build from the source. 

On a similar note, I am not finding any JIRA tickets related to full outer joins and update mode for maybe say Spark 2.3. I wonder how hard is it two implement both of these? It turns out the update mode and full outer join is very useful and required in my case, therefore, I'm just asking.

Thanks!

On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <tathagata.das1565@gmail.com> wrote:
I thought about it. 
I am not 100% sure whether this fix should go into 2.3.1. 

There are two parts to this bug fix to enable self-joins.

1. Enabling deduping of leaf logical nodes by extending MultInstanceRelation 
  - This is safe to be backported into the 2.3 branch as it does not touch production code paths. 

2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch plan is spliced into the streaming plan. 
  - This touches core production code paths and therefore, may not safe to backport. 

Part 1 enables self-joins in all but a small fraction of self-join queries. That small fraction can produce incorrect results, and part 2 avoids that.

So for 2.3.1, we can enable self-joins by merging only part 1, but it can give wrong results in some cases. I think that is strictly worse than no fix.

TD



On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <kanth909@gmail.com> wrote:
Hi TD,

I pulled your commit that is listed on this ticket https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the following steps and self joins work after I cherry-pick your commit! Good Job! I was hoping it will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(

git clone https://github.com/apache/spark.git
cd spark
git fetch
git checkout branch-2.3
git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests compile
./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn

On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <tathagata.das1565@gmail.com> wrote:
Hey, 

Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth909@gmail.com> wrote:
if I change it to the below code it works. However, I don't believe it is the solution I am looking for. I want to be able to do it in raw SQL and moreover, If a user gives a big chained raw spark SQL join query I am not even sure how to make copies of the dataframe to achieve the self-join. Is there any other way here?



import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()

On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth909@gmail.com> wrote:
If I change it to this 




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth909@gmail.com> wrote:
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
any idea whats wrong here?

Thanks!