flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timo Walther (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1899) Table API Bug
Date Thu, 16 Apr 2015 11:58:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14497972#comment-14497972
] 

Timo Walther commented on FLINK-1899:
-------------------------------------

You need to define a new name for your aggregated field using "as".

{code}
.select("communityID, weight.sum as sumTotal")
{code}

should work.

> Table API Bug
> -------------
>
>                 Key: FLINK-1899
>                 URL: https://issues.apache.org/jira/browse/FLINK-1899
>             Project: Flink
>          Issue Type: Bug
>          Components: Expression API
>    Affects Versions: 0.9
>            Reporter: Felix Neutatz
>            Priority: Minor
>
> I want to run the following program
> {code:scala}
> case class WeightedEdge(src: Int, target: Int, weight: Double)
> case class Community(communityID: Int, nodeID: Int)
> case class CommunitySumTotal(communityID: Int, sumTotal: Double)
> val communities: DataSet[Community]
> val weightedEdges: DataSet[WeightedEdge]
> val communitiesTable = communities.toTable 
> val weightedEdgesTable = weightedEdges.toTable
> val sumTotal = communitiesTable.join(weightedEdgesTable)
> 		  .where("nodeID = src")
> 		  .groupBy('communityID)
> 		  .select('communityID, 'weight.sum).toSet[CommunitySumTotal]
> {code}
> but I get this exception. In my opinion the outputs do have the same field types.
> {code:xml}
> Exception in thread "main" org.apache.flink.api.table.ExpressionException: Expression
result type org.apache.flink.api.table.Row(communityID: Integer, intermediate.1: Double) does
not have the samefields as output type io.ssc.trackthetrackers.analysis.algorithms.CommunitySumTotal(communityID:
Integer, sumTotal: Double)
> 	at org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:88)
> 	at org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> 	at org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> 	at io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$.detectCommunities(LouvainCommunityDetection.scala:105)
> 	at io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$delayedInit$body.apply(LouvainCommunityDetection.scala:38)
> 	at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
> 	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> 	at scala.App$$anonfun$main$1.apply(App.scala:71)
> 	at scala.App$$anonfun$main$1.apply(App.scala:71)
> 	at scala.collection.immutable.List.foreach(List.scala:318)
> 	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
> 	at scala.App$class.main(App.scala:71)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message