From dev-return-31429-apmail-spark-dev-archive=spark.apache.org@spark.apache.org Fri Nov 27 13:32:14 2020 Return-Path: X-Original-To: apmail-spark-dev-archive@locus.apache.org Delivered-To: apmail-spark-dev-archive@locus.apache.org Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by minotaur.apache.org (Postfix) with ESMTP id E9C9C1A73F for ; Fri, 27 Nov 2020 13:32:13 +0000 (UTC) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id 9DAEC6866D for ; Fri, 27 Nov 2020 13:32:12 +0000 (UTC) Received: (qmail 36597 invoked by uid 500); 27 Nov 2020 13:32:01 -0000 Delivered-To: apmail-spark-dev-archive@spark.apache.org Received: (qmail 36424 invoked by uid 500); 27 Nov 2020 13:32:00 -0000 Mailing-List: contact dev-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list dev@spark.apache.org Received: (qmail 36378 invoked by uid 99); 27 Nov 2020 13:32:00 -0000 Received: from spamproc1-he-de.apache.org (HELO spamproc1-he-de.apache.org) (116.203.196.100) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Nov 2020 13:32:00 +0000 Received: from localhost (localhost [127.0.0.1]) by spamproc1-he-de.apache.org (ASF Mail Server at spamproc1-he-de.apache.org) with ESMTP id C5BD81FF39D for ; Fri, 27 Nov 2020 13:31:59 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamproc1-he-de.apache.org X-Spam-Flag: NO X-Spam-Score: 0 X-Spam-Level: X-Spam-Status: No, score=0 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamproc1-he-de.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-he-de.apache.org ([116.203.227.195]) by localhost (spamproc1-he-de.apache.org [116.203.196.100]) (amavisd-new, port 10024) with ESMTP id AbmvGxe5oadb for ; Fri, 27 Nov 2020 13:31:58 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2a00:1450:4864:20::429; helo=mail-wr1-x429.google.com; envelope-from=cloud0fan@gmail.com; receiver= Received: from mail-wr1-x429.google.com (mail-wr1-x429.google.com [IPv6:2a00:1450:4864:20::429]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id AE32E7F7AD for ; Fri, 27 Nov 2020 13:31:58 +0000 (UTC) Received: by mail-wr1-x429.google.com with SMTP id t4so5563675wrr.12 for ; Fri, 27 Nov 2020 05:31:58 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=EY2C0X10Oa7GuvgaUSyYvCc0H5nn7RJn7AW76Qe9gtU=; b=K1XoZLvtHgXiHgLGPmxm8zLZHigSwPXDG8OJ5tXQCOI1f5iTINL0YAvBAwBHNTQfwZ 9/rP+wjRGnyV6qaQX+GZX6rEKXpfjdGXX7LJZE2S3fAQGh9OtNTAI7jXiyGJus2GGx7d TbMU1LHZyzKlNhl7uWtmo2l3mmJ6sksg8iUL8/B8/vxIAKWj39f/sTVAdGTREtMKEYZH Oh11rStbseZaWc+TZa6uaa3hSUcSS9v7wcVXhiN1fvy4Z31P1HMamJt8k+REhUJtYnx7 bZMLbCWYAZSSGU16qTFxo4Nj0MbefWIIyXpgPuns4SBzjKmAprpeaNNCd2viwOZjU+d+ oZVg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=EY2C0X10Oa7GuvgaUSyYvCc0H5nn7RJn7AW76Qe9gtU=; b=inf3p4nVCIhcgnTNKIoHvxKticYvjJ92l0juS/uAJelgHYGNqI6acW0CsrCLff5tsD QCxh74dw6MAVWnKod8dAN3cquLvM7TjUgAcrwpTQH+B1LjuYamJJZg0ljPEa0t1SRTdw xbrqBCDzlv/IYGeZO1czUr8hrF5OLaCegcl691Gmo6jkwcTNjEMU7FDMl993sfS/cLB9 dHLxKz5wCL/EyXGLylefQPX2WQcwfABYHVh2G05n0wkUKnJFET1e98T3iwwIjTdxtM50 VO5BR/iv1kUFOTpJy9xzNAVWqg3lrpM9yTHffIRFkN+Mg1hHKmEn4gDzet3O2NTr2aLZ GrwQ== X-Gm-Message-State: AOAM531TKsGpVbQc2wzaDrqJd+dcYllIxjVWGaUGsOXxjl0t4dRKFxxq u6daVCQUNuwBfDwoWM4hTSzAgTGz9MekG4b1oEo= X-Google-Smtp-Source: ABdhPJz2EQZlSzTP8t8sHHI+5/XzMXmOndOBqpL++zq8L5a/c2clJhaH6JuCkutjENotGlKAsfmXv3eM1sgCZA6k7w0= X-Received: by 2002:adf:e481:: with SMTP id i1mr10359093wrm.282.1606483917189; Fri, 27 Nov 2020 05:31:57 -0800 (PST) MIME-Version: 1.0 References: In-Reply-To: From: Wenchen Fan Date: Fri, 27 Nov 2020 21:31:45 +0800 Message-ID: Subject: Re: How to convert InternalRow to Row. To: Jason Jun Cc: Spark dev list Content-Type: multipart/alternative; boundary="00000000000038bfca05b516af87" --00000000000038bfca05b516af87 Content-Type: text/plain; charset="UTF-8" InternalRow is an internal/developer API that might change overtime. Right now, the way to convert it to Row is to use `RowEncoder`, but you need to know the data schema: val encoder = RowEncoder(schema) val row = encoder.fromRow(internalRow) On Fri, Nov 27, 2020 at 6:16 AM Jason Jun wrote: > Hi dev, > > i'm working on generating custom pipeline on the fly, which means I > generate SparkPlan along with each node in my pipeline. > > So, my pipeline end up with PipeLineRelation extending BaseRelation like: > > case class PipeLineRelation(schema: StructType, pipeLinePlan: > LogicalPlan)(@transient override val sqlContext: SQLContext) extends > BaseRelation with PrunedFilteredScan { > override def needConversion: Boolean = true > override def unhandledFilters(filters: Array[Filter]): Array[Filter] = > filters > > override def buildScan(requiredColumns: Array[String], filters: > Array[Filter]): RDD[Row] = { > ... > val sparkPlan = > sqlContext.sparkSession.sessionState.planner.plan(pipeLinePlan).next() > *sparkPlan.execute().mapPartitions* { itr => > itr.map { internalRow => > val values = prunedColumnWithIndex.map { case (index, columnType) > => > internalRow.get(index, columnType) > } > * Row.fromSeq(values) // Line 46* > } > } > } > } > > I'm getting InternalRow by executing subsequent Spark Plan, and converting > it into Row using Row.fromSeq(). i saw values at Line 46 are what i exactly > want : > ------ > values = {Object[5]@14277} > 0 = {UTF8String@14280} "Thin" > 1 = {UTF8String@14281} "Cell phone" > 2 = {Integer@14282} 6000 > 3 = {Integer@14283} 2 > 4 = {Integer@14284} 12000 > > but execution of Line 46 ended up with this error : > ------ > Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most > recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor > driver): java.lang.IllegalArgumentException: The value (2) of the type > (java.lang.Integer) cannot be converted to the string type > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396) > at > org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60) > at > org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > ---- > > Is it existing bug? otherwise how do I convert InternalRow to Row? > > Thanks in advance. > Jason > --00000000000038bfca05b516af87 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
InternalRow is an internal/developer API that might change= overtime. Right=C2=A0now, the way to convert it to Row is to use=C2=A0 `Ro= wEncoder`, but you need to know the data schema:
val encoder =3D RowEncoder(schema)=C2=A0
val row =3D encoder.fromRow(internalRow)

<= div class=3D"gmail_quote">
On Fri, Nov= 27, 2020 at 6:16 AM Jason Jun <jae= sjun@gmail.com> wrote:
Hi dev,

i'm working o= n generating custom pipeline on the fly, which means I generate SparkPlan a= long with each node in my pipeline.

So, my pipelin= e end up with PipeLineRelation extending=C2=A0BaseRelation like:
=
case class PipeLineRelation(schema: StructType, pipeLinePlan= : LogicalPlan)(@transient override val sqlContext: SQLContext) extends Base= Relation with PrunedFilteredScan {
=C2=A0 override def needConversion: B= oolean =3D true
=C2=A0 override def unhandledFilters(filters: Array[Filt= er]): Array[Filter] =3D filters

=C2=A0 override def buildScan(requir= edColumns: Array[String], filters: Array[Filter]): RDD[Row] =3D {
=C2=A0= =C2=A0 ...
=C2=A0 =C2=A0 val sparkPlan =3D sqlContext.sparkSession.sess= ionState.planner.plan(pipeLinePlan).next()=C2=A0
=C2=A0 =C2=A0 sparkP= lan.execute().mapPartitions { itr =3D>
=C2=A0 =C2=A0 =C2=A0 itr.m= ap { internalRow =3D>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 val values =3D prun= edColumnWithIndex.map { case (index, columnType) =3D>
=C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 internalRow.get(index, columnType)
=C2=A0 =C2=A0 = =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 Row.fromSeq(values) // Li= ne 46
=C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 }
=C2=A0 }
}
=

I'm getting InternalRow by executing subseque= nt Spark Plan, and converting it into Row using Row.fromSeq(). i saw values= at Line 46 are what i exactly want :
------
values =3D= {Object[5]@14277}
=C2=A00 =3D {UTF8String@14280} "Thin"
= =C2=A01 =3D {UTF8String@14281} "Cell phone"
=C2=A02 =3D {Integ= er@14282} 6000
=C2=A03 =3D {Integer@14283} 2
=C2=A04 =3D {Integer@142= 84} 12000

but execution of Line 46 ended up wi= th this error :
------
Job aborted due to stage failure= : Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in= stage 0.0 (TID 0, localhost, executor driver): java.lang.IllegalArgumentEx= ception: The value (2) of the type (java.lang.Integer) cannot be converted = to the string type
at org.apache.spark.sql.catalyst.CatalystTypeConvert= ers$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
a= t org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toC= atalystImpl(CatalystTypeConverters.scala:285)
at org.apache.spark.sql.c= atalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTyp= eConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeCon= verters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.s= cala:396)
at org.apache.spark.sql.execution.RDDConversions$$anonfun$row= ToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60)
at org.apache.s= park.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.a= pply(ExistingRDD.scala:57)
at scala.collection.Iterator$$anon$11.next(I= terator.scala:370)
at org.apache.spark.sql.catalyst.expressions.Generat= edClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
a= t org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIte= rator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$= $anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apa= che.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
= at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.sca= la:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$a= nonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonf= un$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at or= g.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
= at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at o= rg.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd= .MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spar= k.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rd= d.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.= runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Tas= k.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$= 10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSaf= eFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRu= nner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor= .runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.Thread= PoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Threa= d.run(Thread.java:748)
----

Is it ex= isting bug? otherwise how do I convert InternalRow to Row?

Thanks in advance.
Jason
--00000000000038bfca05b516af87--