spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pedro <>
Subject Misaligned Rows with UDF
Date Tue, 14 Jul 2015 21:34:49 GMT

I am working at finding the root cause of a bug where rows in dataframes
seem to have misaligned data. My dataframes have two types of columns:
columns from data and columns from UDFs. I seem to be having trouble where
for a given row, the row data doesn't match the data used to compute the

In my case, I am calculating click through rates, so the columns of interest
are items sent to users and items clicked by users. Below is an example of
the problem that I am encountering.

Column Schema: clicked_items (Array of Long), sent_items (Array of Long),
ctr (Double), str_items
Data: [[1, 2], [1, 2, 3], 1, "[5] | [5]"]
ctr_udf = UserDefinedFunction(lambda x, y: 1.0 * len(x) / len(y),
str_udf = UserDefinedFunction(lambda x, y: "{0} | {1}".format(str(x),
str(y)), StringType())

My dataframe looks something like this:, df.sent_items, ctr_udf(df.clicked_items,
df.sent_items), str_udf(df.clicked_items, df.sent_items))

As you can see above, the row data doesn't match what was input to the UDF.

To eliminate the possibility that there was an issue with UDFs, I tested the
code below and it works fine:
test_data_schema = StructType([
        StructField('numerator', ArrayType(IntegerType())),
        StructField('denominator', ArrayType(IntegerType())),
        StructField('label', StringType())
test_data = sc.parallelize([
        (range(3), range(4), 'a'),
        ([], range(5), 'b'),
        ([], range(3), 'c'),
        (range(4), range(5), 'd'),
        (range(3), range(4), 'e'),
        (range(1), range(3), 'f')
    ], 3)
td_df = sql.createDataFrame(test_data, test_data_schema)
def compute_func(num, den, label):
    return 1.0 * len(num) / len(den)
compute_udf = UserDefinedFunction(compute_func, DoubleType()), td_df.denominator, td_df.label,
compute_udf(td_df.numerator, td_df.denominator, td_df.label)).take(10)

Since the above works, I thought that this might be an issue with the data,
not the dataframe/udf. To test this, I wrote the data to hdfs using parquet,
then reread the data in. This fixed the issue, so I think it points to an
issue with lineage/data pipeline/hdfs. Any thoughts on this would be great.

My data roughly follows this flow
1. Read raw logs from HDFS
2. Transform raw logs into dataframe, register as temp table
3. Group data by unique identifier, reflatten the data with the identifier
as a column and additional computed values, register as temp table
4. Perform a broadcast join into a column of the table
5. Regroup the table by the prior unique identifier plus a new one, then
flatten once more (deduping records using the data from the broadcast join).
6. Do UDF calculation.

My next step will be to try extending the example I wrote above to mimic the
data flow that I have in hopes of reproducing the bug outside our codebase.
Any intuition on next step would be great.

Pedro Rodriguez
CU Boulder PhD Student

View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message