spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ayan guha <guha.a...@gmail.com>
Subject Re: Spark 2.0 issue with left_outer join
Date Sun, 05 Mar 2017 05:55:03 GMT
How about running this -

select * from
(select * , count() over (partition by id order by id) c from filteredDS) f
where f.cnt < 7500


On Sun, Mar 5, 2017 at 12:05 PM, Ankur Srivastava <
ankur.srivastava@gmail.com> wrote:

> Yes every time I run this code with production scale data it fails. Test
> case with small dataset of 50 records on local box runs fine.
>
> Thanks
> Ankur
>
> Sent from my iPhone
>
> On Mar 4, 2017, at 12:09 PM, ayan guha <guha.ayan@gmail.com> wrote:
>
> Just to be sure, can you reproduce the error using sql api?
>
> On Sat, 4 Mar 2017 at 2:32 pm, Ankur Srivastava <
> ankur.srivastava@gmail.com> wrote:
>
>> Adding DEV.
>>
>> Or is there any other way to do subtractByKey using Dataset APIs?
>>
>> Thanks
>> Ankur
>>
>> On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava <
>> ankur.srivastava@gmail.com> wrote:
>>
>> Hi Users,
>>
>> We are facing an issue with left_outer join using Spark Dataset api in
>> 2.0 Java API. Below is the code we have
>>
>> Dataset<Row> badIds = filteredDS.groupBy(col("id").alias("bid")).count()
>>         .filter((FilterFunction<Row>) row -> (Long) row.getAs("count") >
75000);
>> _logger.info("Id count with over 75K records that will be filtered: " + badIds.count());
>>
>> Dataset<SomeData> fiteredRows = filteredDS.join(broadcast(badIds), filteredDS.col("id").equalTo(badDevices.col("bid")),
"left_outer")
>>         .filter((FilterFunction<Row>) row ->  row.getAs("bid") == null)
>>         .map((MapFunction<Row, SomeData>) row -> SomeDataFactory.createObjectFromDDRow(row),
Encoders.bean(DeviceData.class));
>>
>>
>> We get the counts in the log file and then the application fils with
>> below exception
>> Exception in thread "main" java.lang.UnsupportedOperationException: Only
>> code-generated evaluation is supported.
>>         at org.apache.spark.sql.catalyst.expressions.objects.Invoke.
>> eval(objects.scala:118)
>>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
>> org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$
>> canFilterOutNull(joins.scala:109)
>>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
>> anonfun$7.apply(joins.scala:118)
>>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
>> anonfun$7.apply(joins.scala:118)
>>         at scala.collection.LinearSeqOptimized$class.
>> exists(LinearSeqOptimized.scala:93)
>>         at scala.collection.immutable.List.exists(List.scala:84)
>>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
>> org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$
>> buildNewJoinType(joins.scala:118)
>>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
>> anonfun$apply$2.applyOrElse(joins.scala:133)
>>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
>> anonfun$apply$2.applyOrElse(joins.scala:131)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.
>> apply(TreeNode.scala:279)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.
>> apply(TreeNode.scala:279)
>>         at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.
>> withOrigin(TreeNode.scala:69)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
>> TreeNode.scala:278)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
>> apply(TreeNode.scala:321)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode.
>> mapProductIterator(TreeNode.scala:179)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode.
>> transformChildren(TreeNode.scala:319)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
>> TreeNode.scala:284)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
>> apply(TreeNode.scala:321)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode.
>> mapProductIterator(TreeNode.scala:179)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode.
>> transformChildren(TreeNode.scala:319)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
>> TreeNode.scala:284)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
>> apply(TreeNode.scala:321)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode.
>> mapProductIterator(TreeNode.scala:179)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode.
>> transformChildren(TreeNode.scala:319)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
>> TreeNode.scala:284)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
>> apply(TreeNode.scala:321)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode.
>> mapProductIterator(TreeNode.scala:179)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode.
>> transformChildren(TreeNode.scala:319)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
>> TreeNode.scala:284)
>>         at org.apache.spark.sql.catalyst.trees.TreeNode.transform(
>> TreeNode.scala:268)
>>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
>> apply(joins.scala:131)
>>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
>> apply(joins.scala:98)
>>         at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$
>> execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
>>         at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$
>> execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
>>         at scala.collection.IndexedSeqOptimized$class.
>> foldl(IndexedSeqOptimized.scala:57)
>>         at scala.collection.IndexedSeqOptimized$class.
>> foldLeft(IndexedSeqOptimized.scala:66)
>>         at scala.collection.mutable.WrappedArray.foldLeft(
>> WrappedArray.scala:35)
>>         at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$
>> execute$1.apply(RuleExecutor.scala:82)
>>         at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$
>> execute$1.apply(RuleExecutor.scala:74)
>>         at scala.collection.immutable.List.foreach(List.scala:381)
>>         at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(
>> RuleExecutor.scala:74)
>>         at org.apache.spark.sql.execution.QueryExecution.
>> optimizedPlan$lzycompute(QueryExecution.scala:74)
>>         at org.apache.spark.sql.execution.QueryExecution.
>> optimizedPlan(QueryExecution.scala:74)
>>         at org.apache.spark.sql.execution.QueryExecution.
>> sparkPlan$lzycompute(QueryExecution.scala:78)
>>         at org.apache.spark.sql.execution.QueryExecution.
>> sparkPlan(QueryExecution.scala:76)
>>         at org.apache.spark.sql.execution.QueryExecution.
>> executedPlan$lzycompute(QueryExecution.scala:83)
>>         at org.apache.spark.sql.execution.QueryExecution.
>> executedPlan(QueryExecution.scala:83)
>>         at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2555)
>>         at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
>>         at test.Driver.main(Driver.java:106)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke(
>> NativeMethodAccessorImpl.java:62)
>>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
>> deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
>>         at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
>> SparkSubmit.scala:185)
>>         at org.apache.spark.deploy.SparkSubmit$.submit(
>> SparkSubmit.scala:210)
>>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.
>> scala:124)
>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>> Thanks
>> Ankur
>>
>>
>> --
> Best Regards,
> Ayan Guha
>
>


-- 
Best Regards,
Ayan Guha

Mime
View raw message