spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tan shai <tan.shai...@gmail.com>
Subject Re: CATALYST rule join
Date Tue, 27 Feb 2018 09:19:01 GMT
 Hi,

I need to write a rule to customize the join function using Spark Catalyst
optimizer. The objective to duplicate the second dataset using this
process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

    SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

    case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

    case join@Join(left, right, _, Some(condition)) =>

        {

    val attr = right.outputSet.find(x => x.toString().contains("x"))

    val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType),
Seq(attr.last.toAttribute))

    val explode = Explode(udf)

    val resolvedGenerator = Generate(explode, true,false, qualifier = None,
udf.references.toSeq, right)

    var newRight = Project(resolvedGenerator.output,resolvedGenerator)

    Join(left, newRight , Inner,Option(condition))

        }
      }
    }

But the problem is that the operation `Generate explode` appears many times
in the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you


2018-02-25 23:08 GMT+01:00 tan shai <tan.shai284@gmail.com>:

> Hi,
>
> I need to write a rule to customize the join function using Spark Catalyst
> optimizer. The objective to duplicate the second dataset using this
> process:
>
> - Execute a udf on the column called x, this udf returns an array
>
> - Execute an explode function on the new column
>
> Using SQL terms, my objective is to execute this query on the second table
> :
>
>     SELECT EXPLODE(foo(x)) from table2
>
> Where `foo` is is a udf that return an array of elements.
>
> I have this rule:
>
>     case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {
>
>     override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
>
>     case join@Join(left, right, _, Some(condition)) =>
>
>         {
>
>     val attr = right.outputSet.find(x => x.toString().contains("x"))
>
>     val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType),
> Seq(attr.last.toAttribute))
>
>     val explode = Explode(udf)
>
>     val resolvedGenerator = Generate(explode, true,false, qualifier =
> None, udf.references.toSeq, right)
>
>     var newRight = Project(resolvedGenerator.output,resolvedGenerator)
>
>     Join(left, newRight , Inner,Option(condition))
>
>         }
>       }
>     }
>
> But the problem is that the operation `Generate explode` appears many
> times in the physical plan.
>
>
> Do you have any other ideas ? Maybe rewriting the code.
>
> Thank you.
>
>

Mime
View raw message