flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Philip Lee <philjj...@gmail.com>
Subject Hi, join with two columns of both tables
Date Sun, 08 Nov 2015 18:13:51 GMT
I want to join two tables with two columns like

//    AND sr_customer_sk      = ws_bill_customer_sk
//    AND sr_item_sk          = ws_item_sk

val srJoinWs = storeReturn.join(webSales).where(_._item_sk).equalTo(_._item_sk){
    (storeReturn: StoreReturn, webSales: WebSales, out:
Collector[(Long,Long,Long)]) =>
      if(storeReturn._customer_sk.equals(webSales._bill_customer_sk))
        out.collect(storeReturn._item_sk,storeReturn._customer_sk,storeReturn._ticket_number)
      else
        None
}

According to the explaination from join phase, I should do like it if
I want to join like the way. Isn't it right?

But the thing is it does not work in that Type dismatch; expected
TypeInformation[Long], actual(StoreReturn, WebSales,
Collector[(Long,Long,Long)]) => Any

I tried many ways but it still does not work.

Any suggestion?

Best Regards,

Phil

Mime
View raw message