I thought about it.I am not 100% sure whether this fix should go into 2.3.1.There are two parts to this bug fix to enable self-joins.1. Enabling deduping of leaf logical nodes by extending MultInstanceRelation- This is safe to be backported into the 2.3 branch as it does not touch production code paths.2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch plan is spliced into the streaming plan.- This touches core production code paths and therefore, may not safe to backport.Part 1 enables self-joins in all but a small fraction of self-join queries. That small fraction can produce incorrect results, and part 2 avoids that.So for 2.3.1, we can enable self-joins by merging only part 1, but it can give wrong results in some cases. I think that is strictly worse than no fix.TDOn Thu, Feb 22, 2018 at 2:32 PM, kant kodali <email@example.com> wrote:Hi TD,I pulled your commit that is listed on this ticket https://issues.apache.o
rg/jira/browse/SPARK-23406specifically I did the following steps and self joins work after I cherry-pick your commit! Good Job! I was hoping it will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(git clone https://github.com/apache/spar k.gitcd spark git fetch git checkout branch-2.3 git cherry-pick 658d9d9d785a30857bf35d164e6cbb d9799d6959 export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m "./build/mvn -DskipTests compile ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -PyarnOn Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <firstname.lastname@example.org> wrote:Hey,Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this.TDOn Tue, Feb 20, 2018 at 8:20 PM, kant kodali <email@example.com> wrote:if I change it to the below code it works. However, I don't believe it is the solution I am looking for. I want to be able to do it in raw SQL and moreover, If a user gives a big chained raw spark SQL join query I am not even sure how to make copies of the dataframe to achieve the self-join. Is there any other way here?
.Trigger val jdf = spark.readStream.format("kafkastartingO ").option("kafka.bootstrap.ser vers", "localhost:9092").option("subs cribe", "join_test").option(" ffsets", "earliest").load(); val jdf1 = spark.readStream.format("kafka ").option("kafka.bootstrap.ser vers", "localhost:9092").option("subs cribe", "join_test").option("startingO ffsets", "earliest").load(); jdf.createOrReplaceTempView("t able") jdf1.createOrReplaceTempView(" table") val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset") resultdf.writeStream.outputMod e("append").format("console"). option("truncate", false).trigger(Trigger.Process ingTime(1000)).start()