spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "indraneel r (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-26206) Spark structured streaming with kafka integration fails in update mode
Date Tue, 04 Dec 2018 06:21:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-26206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708245#comment-16708245
] 

indraneel r commented on SPARK-26206:
-------------------------------------

[~kabhwan] 

Here are some of my observations:

 - The error only ones when you get the data for the second batch. The first batch goes through
fine. You may not see the output sometimes, not sure why though, but you can see the output
once you start pumping new data into the kafka topic. And this is when it throws the error.

 - This same query works well with spark 2.3.0.

 

Heres how some sample data looks like:



 
{code:java}
[
 {
 "timestamp": 1541043341540,
 "cid": "333-333-333",
 "uid": "111111-111-111",
 "sessionId": "111111-111-111",
 "merchantId": "1111",
 "event": "2222-222-222",
 "ip": "1.1.1.1",
 "refUrl": "",
 "referrer": "",
 "section": "lorem",
 "tag": "lorem,ipsum",
 "eventType": "Random_event_1",
 "sid": "qwwewew"
 },
 {
 "timestamp": 1541043341540,
 "cid": "333-444-444",
 "uid": "111111-555-111",
 "sessionId": "444411-111-111",
 "merchantId": "3331",
 "event": "2222-222-333",
 "ip": "1.1.2.1",
 "refUrl": "",
 "referrer": "",
 "section": "ipsum",
 "tag": "lorem,ipsum2",
 "eventType": "Random_event_2",
 "sid": "xxxdfffwewe"
 }]
{code}
 

 

> Spark structured streaming with kafka integration fails in update mode 
> -----------------------------------------------------------------------
>
>                 Key: SPARK-26206
>                 URL: https://issues.apache.org/jira/browse/SPARK-26206
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>         Environment: Operating system : MacOS Mojave
>  spark version : 2.4.0
> spark-sql-kafka-0-10 : 2.4.0
>  kafka version 1.1.1
> scala version : 2.12.7
>            Reporter: indraneel r
>            Priority: Major
>
> Spark structured streaming with kafka integration fails in update mode with compilation
exception in code generation. 
>  Here's the code that was executed:
> {code:java}
> // code placeholder
> override def main(args: Array[String]): Unit = {
>   val spark = SparkSession
>     .builder
>     .master("local[*]")
>     .appName("SparkStreamingTest")
>     .getOrCreate()
>  
>   val kafkaParams = Map[String, String](
>                    "kafka.bootstrap.servers" -> "localhost:9092",
>                    "startingOffsets" -> "earliest",
>                    "subscribe" -> "test_events")
>  
>   val schema = Encoders.product[UserEvent].schema
>   val query = spark.readStream.format("kafka")
>     .options(kafkaParams)
>     .load()
>     .selectExpr("CAST(value AS STRING) as message")
>     .select(from_json(col("message"), schema).as("json"))
>     .select("json.*")
>     .groupBy(window(col("event_time"), "10 minutes"))
>     .count()
>     .writeStream
>     .foreachBatch { (batch: Dataset[Row], batchId: Long) =>
>       println(s"batch : ${batchId}")
>       batch.show(false)
>     }
>     .outputMode("update")
>     .start()
>     query.awaitTermination()
> }{code}
> It succeeds for batch 0 but fails for batch 1 with following exception when more data
is arrives in the stream.
> {code:java}
> 18/11/28 22:07:08 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException:
File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any
enclosing class nor any supertype, nor through a static import
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 25, Column
18: A method named "putLong" is not declared in any enclosing class nor any supertype, nor
through a static import
>     at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>     at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997)
>     at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
>     at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
>     at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
>     at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
>     at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>     at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
>     at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
>     at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
>     at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
>     at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
>     at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>     at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>     at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
>     at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
>     at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
>     at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
>     at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
>     at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:981)
>     at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:215)
>     at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:414)
>     at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:406)
>     at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1295)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
>     at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1306)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:848)
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
>     at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
>     at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
>     at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
>     at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
>     at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
>     at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
>     at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
>     at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
>     at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1290)
>     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372)
>     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369)
>     at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>     at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>     at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>     at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>     at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>     at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>     at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238)
>     at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258)
>     at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164)
>     at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162)
>     at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198)
>     at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176)
>     at org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253)
>     at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
Source)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
Source)
>     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379)
>     at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370)
>     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
Source)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
Source)
>     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
>     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
>     at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
>     at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>     at org.apache.spark.scheduler.Task.run(Task.scala:121)
>     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> 18/11/28 22:07:08 ERROR Executor: Exception in task 28.0 in stage 19.0 (TID 355)
> java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException:
File 'generated.java', Line 25, Column 18: failed to compile: org.codehaus.commons.compiler.CompileException:
File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any
enclosing class nor any supertype, nor through a static import
>     at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
>     at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
>     at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>     at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
>     at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
>     at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
>     at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>     at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>     at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>     at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>     at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238)
>     at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258)
>     at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164)
>     at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162)
>     at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198)
>     at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176)
>     at org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253)
>     at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
Source)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
Source)
>     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379)
>     at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370)
>     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
Source)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
Source)
>     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
>     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
>     at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
>     at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>     at org.apache.spark.scheduler.Task.run(Task.scala:121)
>     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line
25, Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java',
Line 25, Column 18: A method named "putLong" is not declared in any enclosing class nor any
supertype, nor through a static import
>     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304)
>     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372)
>     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369)
>     at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>     at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>     ... 47 more
> 18/11/28 22:07:08 WARN TaskSetManager: Lost task 28.0 in stage 19.0 (TID 355, localhost,
executor driver): java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException:
File 'generated.java', Line 25, Column 18: failed to compile: org.codehaus.commons.compiler.CompileException:
File 'generated.java', Line 25, Column 18: A method named "putLong" is not declared in any
enclosing class nor any supertype, nor through a static import
>     at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
>     at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
>     at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>     at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
>     at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
>     at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
>     at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>     at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>     at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>     at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>     at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1238)
>     at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner$.create(GenerateUnsafeRowJoiner.scala:258)
>     at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner$lzycompute(StreamingAggregationStateManager.scala:164)
>     at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.joiner(StreamingAggregationStateManager.scala:162)
>     at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.restoreOriginalRow(StreamingAggregationStateManager.scala:198)
>     at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV2.get(StreamingAggregationStateManager.scala:176)
>     at org.apache.spark.sql.execution.streaming.StateStoreRestoreExec.$anonfun$doExecute$3(statefulOperators.scala:253)
>     at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
Source)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
Source)
>     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:379)
>     at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.getNext(statefulOperators.scala:370)
>     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
Source)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
Source)
>     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
>     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
>     at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
>     at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
>     at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>     at org.apache.spark.scheduler.Task.run(Task.scala:121)
>     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line
25, Column 18: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java',
Line 25, Column 18: A method named "putLong" is not declared in any enclosing class nor any
supertype, nor through a static import
>     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304)
>     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1372)
>     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1369)
>     at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>     at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>     ... 47 more
> {code}
>  The structure of UserEvent:
> {code:java}
> case class UserEvent(
>  timestamp: String,
>  cid: Option[String],
>  uid: Option[String],
>  sessionId: Option[String],
>  merchantId: Option[String],
>  event: Option[String],
>  ip: Option[String],
>  refUrl: Option[String],
>  referrer: Option[String],
>  section: Option[String],
>  tag: Option[String],
>  eventType: Option[String],
>  sid: Option[String]
>  )
> {code}
> And here's the generated code :
> {code:java}
> /* 001 */ public java.lang.Object generate(Object[] references) {
> /* 002 */ return new SpecificUnsafeRowJoiner();
> /* 003 */ }
> /* 004 */
> /* 005 */ class SpecificUnsafeRowJoiner extends org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner
{
> /* 006 */ private byte[] buf = new byte[64];
> /* 007 */ private UnsafeRow out = new UnsafeRow(2);
> /* 008 */
> /* 009 */
> /* 010 */
> /* 011 */ public UnsafeRow join(UnsafeRow row1, UnsafeRow row2) {
> /* 012 */ // row1: 1 fields, 1 words in bitset
> /* 013 */ // row2: 1, 1 words in bitset
> /* 014 */ // output: 2 fields, 1 words in bitset
> /* 015 */ final int sizeInBytes = row1.getSizeInBytes() + row2.getSizeInBytes() - 8;
> /* 016 */ if (sizeInBytes > buf.length) {
> /* 017 */ buf = new byte[sizeInBytes];
> /* 018 */ }
> /* 019 */
> /* 020 */ final java.lang.Object obj1 = row1.getBaseObject();
> /* 021 */ final long offset1 = row1.getBaseOffset();
> /* 022 */ final java.lang.Object obj2 = row2.getBaseObject();
> /* 023 */ final long offset2 = row2.getBaseOffset();
> /* 024 */
> /* 025 */ Platform.putLong(buf, 16, Platform.getLong(obj1, offset1 + 0) | (Platform.getLong(obj2,
offset2) << 1));
> /* 026 */
> /* 027 */
> /* 028 */ // Copy fixed length data for row1
> /* 029 */ Platform.copyMemory(
> /* 030 */ obj1, offset1 + 8,
> /* 031 */ buf, 24,
> /* 032 */ 8);
> /* 033 */
> /* 034 */
> /* 035 */ // Copy fixed length data for row2
> /* 036 */ Platform.copyMemory(
> /* 037 */ obj2, offset2 + 8,
> /* 038 */ buf, 32,
> /* 039 */ 8);
> /* 040 */
> /* 041 */
> /* 042 */ // Copy variable length data for row1
> /* 043 */ long numBytesVariableRow1 = row1.getSizeInBytes() - 16;
> /* 044 */ Platform.copyMemory(
> /* 045 */ obj1, offset1 + 16,
> /* 046 */ buf, 40,
> /* 047 */ numBytesVariableRow1);
> /* 048 */
> /* 049 */
> /* 050 */ // Copy variable length data for row2
> /* 051 */ long numBytesVariableRow2 = row2.getSizeInBytes() - 16;
> /* 052 */ Platform.copyMemory(
> /* 053 */ obj2, offset2 + 16,
> /* 054 */ buf, 40 + numBytesVariableRow1,
> /* 055 */ numBytesVariableRow2);
> /* 056 */
> /* 057 */ long existingOffset;
> /* 058 */
> /* 059 */ existingOffset = Platform.getLong(buf, 24);
> /* 060 */ if (existingOffset != 0) {
> /* 061 */ Platform.putLong(buf, 24, existingOffset + (8L << 32));
> /* 062 */ }
> /* 063 */
> /* 064 */
> /* 065 */ out.pointTo(buf, sizeInBytes);
> /* 066 */
> /* 067 */ return out;
> /* 068 */ }
> /* 069 */ }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message