spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: what is the right syntax for self joins in Spark 2.3.0 ?
Date Thu, 22 Feb 2018 19:25:14 GMT
Hey,

Thanks for testing out stream-stream joins and reporting this issue. I am
going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth909@gmail.com> wrote:

> if I change it to the below code it works. However, I don't believe it is
> the solution I am looking for. I want to be able to do it in raw SQL and
> moreover, If a user gives a big chained raw spark SQL join query I am not
> even sure how to make copies of the dataframe to achieve the self-join. Is
> there any other way here?
>
>
>
> import org.apache.spark.sql.streaming.Trigger
>
> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe",
"join_test").option("startingOffsets", "earliest").load();
> val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe",
"join_test").option("startingOffsets", "earliest").load();
>
> jdf.createOrReplaceTempView("table")
> jdf1.createOrReplaceTempView("table")
>
> val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")
>
> resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
>
>
> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth909@gmail.com> wrote:
>
>> If I change it to this
>>
>>
>>
>>
>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth909@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I have the following code
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")
>>>
>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>> and I get the following exception.
>>>
>>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input
columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line
1 pos 50;
>>> 'Project [*]
>>> +- 'Join Inner, ('x.offset = 'y.offset)
>>>    :- SubqueryAlias x
>>>    :  +- SubqueryAlias table
>>>    :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None),
kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
>>>    +- SubqueryAlias y
>>>       +- SubqueryAlias table
>>>          +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None),
kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
>>>
>>> any idea whats wrong here?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Mime
View raw message