spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yong Zhang <java8...@hotmail.com>
Subject Re: CATALYST rule join
Date Tue, 27 Feb 2018 14:08:47 GMT
Not fully understand your question, but maybe you want check out this JIRA https://issues.apache.org/jira/browse/SPARK-17728,
especially in the comments area. There are some discussion about the logic why UDF could be
executed multi times by Spark.

Yong

________________________________
From: tan shai <tan.shai284@gmail.com>
Sent: Tuesday, February 27, 2018 4:19 AM
To: user@spark.apache.org
Subject: Re: CATALYST rule join

Hi,

I need to write a rule to customize the join function using Spark Catalyst optimizer. The
objective to duplicate the second dataset using this process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

    SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

    case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

    case join@Join(left, right, _, Some(condition)) =>

        {

    val attr = right.outputSet.find(x => x.toString().contains("x"))

    val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), Seq(attr.last.toAttribute))

    val explode = Explode(udf)

    val resolvedGenerator = Generate(explode, true,false, qualifier = None, udf.references.toSeq,
right)

    var newRight = Project(resolvedGenerator.output,resolvedGenerator)

    Join(left, newRight , Inner,Option(condition))

        }
      }
    }

But the problem is that the operation `Generate explode` appears many times in the physical
plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you


2018-02-25 23:08 GMT+01:00 tan shai <tan.shai284@gmail.com<mailto:tan.shai284@gmail.com>>:
Hi,

I need to write a rule to customize the join function using Spark Catalyst optimizer. The
objective to duplicate the second dataset using this process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

    SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

    case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

    case join@Join(left, right, _, Some(condition)) =>

        {

    val attr = right.outputSet.find(x => x.toString().contains("x"))

    val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), Seq(attr.last.toAttribute))

    val explode = Explode(udf)

    val resolvedGenerator = Generate(explode, true,false, qualifier = None, udf.references.toSeq,
right)

    var newRight = Project(resolvedGenerator.output,resolvedGenerator)

    Join(left, newRight , Inner,Option(condition))

        }
      }
    }

But the problem is that the operation `Generate explode` appears many times in the physical
plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you.



Mime
View raw message