spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bode, Meikel, NMA-CFD" <>
Subject RE: Recursive Queries or Recursive UDF?
Date Sat, 01 May 2021 13:17:40 GMT
Hi all,

I created a running example of my data set and I describe what I want to achieve. The idea
is to create a view over the resulting table and use it for later joins.
Instead if applying a UDF to a column using a dict with 20+ (growing) million records.

Example data set:

        ("inquiry1", "quotation1"),

        ("inquiry2", "quotation2"),
        ("quotation2", "order2"),
        ("order2", "invoice2"),

        ("order3", "invoice3")
    ['parent', 'child']

We see several hierarchies in the df above but we don’t have records indicating that e.g.
inquiry1 is the root of one of the hierarchies.
So we have:

1: inquiry1 > quotation1
2: inquiry2 > quotation2 > order2
3: order3 > invoice3

What I need is the following. For every child I need the level 0 parent like this:

child, lvl-0-parent
quotation1, inquiry1
quotation2, inquiry2
order2, inquiry2
invoice2, inquiry2
invoice3, order3

It would be perfect to see that some of the entries actually are the root by indicating:
child, lvl-0-parent
inquiry1, null
inquiry2, null
order3, null

Actually that’s what I realized with my recursive UDF I put into the initial post.

Thank you for any hints on that issue! Any hints on the UDF solution are also very welcome:

Thx and best,

From: Bode, Meikel, NMA-CFD
Sent: Freitag, 30. April 2021 12:16
To: user @spark <>
Subject: Recursive Queries or Recursive UDF?

Hi all,

I implemented a recursive UDF, that tries to find a document number in a long list of predecessor
documents. This can be a multi-level hierarchy:
C is successor of B is successor of A (but many more levels are possible)

As input to that UDF I prepare a dict that contains the complete document flow reduced to
the required fields to follow the path back to the originating document.
The dict is broadcasted and then used  by the UDF. Actually this approach is very slow and
now – as data growth – it kills my executors regularly so that RDDs get lost and task
fail. Sometimes also the workers (docker containers) become unresponsive and are getting killed.

Here is the coding of the methods:

1.: Prepare and define the UDF, broadcast dict.

    # Define function for recursive lookup of root document
    def __gen_caseid_udf_sales_document_flow(self):
        global bc_document_flow, udf_sales_document_flow

        # Prepare docflow for broadcasting by only selecting required fields
        df_vbfa_subset = self.spark.table("FLOWTABLE").select("clnt", "predecessor_head",
"predecessor_item", "doc_num", "doc_item")

        # Prepare dictionary for broadcast
        document_flow_dic = {}
        for clnt, predecessor_head, predecessor_item, doc_num, doc_item in df_subset.rdd.collect():
            document_flow_dic[(clnt, doc_num, doc_item)] = predecessor_head, predecessor_item

        # Broadcast dictionary to workers
        bc_document_flow = self.spark.sparkContext.broadcast(document_flow_dic)

        # Register new user defined function UDF
        udf_vbfa_sales_document_flow = func.udf(gen_caseid_udf_sale_root_lookup)

2.: The recursive function used in the UDF
# Find root document
def gen_caseid_udf_sale_get_root_doc(lt, clnt, docnr, posnr):
    global bc_document_flow

    if not clnt or not docnr or not posnr:
        return None, None

    key = clnt, docnr, posnr

    if key in lt:
        docnr_tmp, item_tmp = lt[key]
        if docnr_tmp == docnr and item_tmp == posnr:
            return docnr, posnr
            return gen_caseid_udf_sale_get_root_doc(lt, clnt, docnr_tmp, item_tmp)
        return docnr, posnr

3: The UDF
# Define udf function to look up root document
def gen_caseid_udf_sale_root_lookup(clnt, doc_num, posnr):
    global bc_document_flow # Name of the broad cast variable

    lt = bc_document_flow.value
    h, p = gen_caseid_udf_vbfa_sale_get_root_doc(lt, clnt, doc_num, posnr)
return str(clnt) + str(h) + str(p)

4. Usage of the UDF on a DF that might contain several ten thousands of rows:

# Lookup root document from document flow
documents = documents.withColumn("root_doc", udf_sales_document_flow(func.col('clnt'),

Do you have any hint on my coding or are there any ideas how to implement a recursive select
without implement a potential unoptimizable UDF?
I came along which
might an option, does Spark support this kind of construct?

Thanks and all the best,

View raw message