spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yin Huai (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (SPARK-14803) A bug in EliminateSerialization rule in Catalyst Optimizer
Date Thu, 12 May 2016 17:12:13 GMT

     [ https://issues.apache.org/jira/browse/SPARK-14803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Yin Huai resolved SPARK-14803.
------------------------------
       Resolution: Fixed
    Fix Version/s: 2.0.0

Issue resolved by pull request 12926
[https://github.com/apache/spark/pull/12926]

> A bug in EliminateSerialization rule in Catalyst Optimizer 
> -----------------------------------------------------------
>
>                 Key: SPARK-14803
>                 URL: https://issues.apache.org/jira/browse/SPARK-14803
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>            Reporter: Sun Rui
>             Fix For: 2.0.0
>
>
> When I rebased my PR https://github.com/apache/spark/pull/12493 to master, I found a
bug in EliminateSerialization rule in Catalyst Optimizer, which was introduced in the PR https://github.com/apache/spark/pull/12260.
> The related code is:
> {code}
> object EliminateSerialization extends Rule[LogicalPlan] {
>   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
>     case d @ DeserializeToObject(_, _, s: SerializeFromObject)
>         if (d.outputObjectType == s.inputObjectType)  =>
>       // Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`.
>       val objAttr = Alias(s.child.output.head, "obj")(exprId = d.output.head.exprId)
>       Project(objAttr :: Nil, s.child)
> {code} 
> In my PR, when there are multiple successive calls to dapply(), the SerializeFromObject
and DeserializeToObject logical operators will be eliminated and replaced with a Project operator.
However, the involved object is Row, and there is no support for Row in UnsafeRowWriter. 
> Detailed error message:
> {noformat}
> 1. Error: dapply() on a DataFrame ----------------------------------------------
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1156.0
failed 1 times, most recent failure: Lost task 0.0 in stage 1156.0 (TID 9648, localhost):
java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.commons.compiler.CompileException:
File 'generated.java', Line 31, Column 29: No applicable constructor/method found for actual
parameters "int, org.apache.spark.sql.Row"; candidates are: "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
byte[])", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
org.apache.spark.unsafe.types.UTF8String)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
org.apache.spark.sql.types.Decimal, int, int)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
double)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
byte[], int, int)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
org.apache.spark.unsafe.types.CalendarInterval)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
byte)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
boolean)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
short)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
int)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
long)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int,
float)"
> /* 001 */ 
> /* 002 */ public java.lang.Object generate(Object[] references) {
> /* 003 */   return new SpecificUnsafeProjection(references);
> /* 004 */ }
> /* 005 */ 
> /* 006 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection
{
> /* 007 */   
> /* 008 */   private Object[] references;
> /* 009 */   private UnsafeRow result;
> /* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
> /* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
rowWriter;
> /* 012 */   
> /* 013 */   
> /* 014 */   public SpecificUnsafeProjection(Object[] references) {
> /* 015 */     this.references = references;
> /* 016 */     result = new UnsafeRow(1);
> /* 017 */     this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result,
32);
> /* 018 */     this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder,
1);
> /* 019 */   }
> /* 020 */   
> /* 021 */   // Scala.Function1 need this
> /* 022 */   public java.lang.Object apply(java.lang.Object row) {
> /* 023 */     return apply((InternalRow) row);
> /* 024 */   }
> /* 025 */   
> /* 026 */   public UnsafeRow apply(InternalRow i) {
> /* 027 */     holder.reset();
> /* 028 */     
> /* 029 */     /* input[0, org.apache.spark.sql.Row] */
> /* 030 */     org.apache.spark.sql.Row value = (org.apache.spark.sql.Row)i.get(0, null);
> /* 031 */     rowWriter.write(0, value);
> /* 032 */     result.setTotalSize(holder.totalSize());
> /* 033 */     return result;
> /* 034 */   }
> /* 035 */ }
> /* 036 */ 
> 	at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
> 	at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
> 	at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
> 	at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
> 	at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
> 	at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
> 	at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
> 	at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
> 	at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
> 	at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
> 	at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:636)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:395)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:352)
> 	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:151)
> 	at org.apache.spark.sql.execution.Project$$anonfun$8.apply(basicOperators.scala:67)
> 	at org.apache.spark.sql.execution.Project$$anonfun$8.apply(basicOperators.scala:66)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:771)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:771)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:86)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:254)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message