datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject [datafu] branch spark-tmp updated: expose python functions to pyspark dataFrames
Date Mon, 08 Jul 2019 17:43:54 GMT
This is an automated email from the ASF dual-hosted git repository.

eyal pushed a commit to branch spark-tmp
in repository https://gitbox.apache.org/repos/asf/datafu.git


The following commit(s) were added to refs/heads/spark-tmp by this push:
     new d8f8d58  expose python functions to pyspark dataFrames
d8f8d58 is described below

commit d8f8d587d30f1b69c098a0af893d544eb6d61541
Author: Ohad Raviv <oraviv@paypal.com>
AuthorDate: Mon Jul 8 20:42:45 2019 +0300

    expose python functions to pyspark dataFrames
    
    Signed-off-by: Eyal Allweil <eyal@apache.org>
---
 .../src/main/resources/pyspark_utils/df_utils.py   | 283 +++++++++++----------
 .../test/resources/python_tests/df_utils_tests.py  |  40 +--
 2 files changed, 169 insertions(+), 154 deletions(-)

diff --git a/datafu-spark/src/main/resources/pyspark_utils/df_utils.py b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
index f6261c5..d91228e 100644
--- a/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
+++ b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
@@ -15,142 +15,157 @@
 # specific language governing permissions and limitations
 # under the License.
  
+import pyspark
 from pyspark.sql import DataFrame
-
 from pyspark_utils.bridge_utils import _getjvm_class
 
 
-class PySparkDFUtils(object):
-
-    def __init__(self):
-        self._sc = None
-
-    def _initSparkContext(self, sc, sqlContext):
-        self._sc = sc
-        self._sqlContext = sqlContext
-        self._gateway = sc._gateway
-
-    def _get_jvm_spark_utils(self):
-        jvm_utils = _getjvm_class(self._gateway, "datafu.spark.SparkDFUtilsBridge")
-        return jvm_utils
-
-    def _get_utils(self, df):
-        self._initSparkContext(df._sc, df.sql_ctx)
-        return self._get_jvm_spark_utils()
-
-    # public:
-
-    def dedup(self, dataFrame, groupCol, orderCols = []):
-        """
-        Used get the 'latest' record (after ordering according to the provided order columns)
in each group.
-        :param dataFrame: DataFrame to operate on
-        :param groupCol: column to group by the records
-        :param orderCols: columns to order the records according to.
-        :return: DataFrame representing the data after the operation
-        """
-        java_cols = self._cols_to_java_cols(orderCols)
-        jdf = self._get_utils(dataFrame).dedup(dataFrame._jdf, groupCol._jc, java_cols)
-        return DataFrame(jdf, self._sqlContext)
-
-    def dedupTopN(self, dataFrame, n, groupCol, orderCols = []):
-        """
-        Used get the top N records (after ordering according to the provided order columns)
in each group.
-        :param dataFrame: DataFrame to operate on
-        :param n: number of records to return from each group
-        :param groupCol: column to group by the records
-        :param orderCols: columns to order the records according to
-        :return: DataFrame representing the data after the operation
-        """
-        java_cols = self._cols_to_java_cols(orderCols)
-        jdf = self._get_utils(dataFrame).dedupTopN(dataFrame._jdf, n, groupCol._jc, java_cols)
-        return DataFrame(jdf, self._sqlContext)
-
-    def dedup2(self, dataFrame, groupCol, orderByCol, desc = True, columnsFilter = [], columnsFilterKeep
= True):
-        """
-        Used get the 'latest' record (after ordering according to the provided order columns)
in each group.
-        :param dataFrame: DataFrame to operate on
-        :param groupCol: column to group by the records
-        :param orderByCol: column to order the records according to
-        :param desc: have the order as desc
-        :param columnsFilter: columns to filter
-        :param columnsFilterKeep: indicates whether we should filter the selected columns
'out' or alternatively have only
-    *                          those columns in the result
-        :return: DataFrame representing the data after the operation
-        """
-        jdf = self._get_utils(dataFrame).dedup2(dataFrame._jdf, groupCol._jc, orderByCol._jc,
desc, columnsFilter, columnsFilterKeep)
-        return DataFrame(jdf, self._sqlContext)
-
-    def changeSchema(self, dataFrame, newScheme = []):
-        """
-        Returns a DataFrame with the column names renamed to the column names in the new
schema
-        :param dataFrame: DataFrame to operate on
-        :param newScheme: new column names
-        :return: DataFrame representing the data after the operation
-        """
-        jdf = self._get_utils(dataFrame).changeSchema(dataFrame._jdf, newScheme)
-        return DataFrame(jdf, self._sqlContext)
-
-    def joinSkewed(self, dfLeft, dfRight, joinExprs, numShards = 30, joinType= "inner"):
-        """
-        Used to perform a join when the right df is relatively small but doesn't fit to perform
broadcast join.
-        Use cases:
-            a. excluding keys that might be skew from a medium size list.
-            b. join a big skewed table with a table that has small number of very big rows.
-        :param dfLeft: left DataFrame
-        :param dfRight: right DataFrame
-        :param joinExprs: join expression
-        :param numShards: number of shards
-        :param joinType: join type
-        :return: DataFrame representing the data after the operation
-        """
-        jdf = self._get_utils(dfLeft).joinSkewed(dfLeft._jdf, dfRight._jdf, joinExprs._jc,
numShards, joinType)
-        return DataFrame(jdf, self._sqlContext)
-
-    def broadcastJoinSkewed(self, notSkewed, skewed, joinCol, numberCustsToBroadcast):
-        """
-        Suitable to perform a join in cases when one DF is skewed and the other is not skewed.
-        splits both of the DFs to two parts according to the skewed keys.
-        1. Map-join: broadcasts the skewed-keys part of the not skewed DF to the skewed-keys
part of the skewed DF
-        2. Regular join: between the remaining two parts.
-        :param notSkewed: not skewed DataFrame
-        :param skewed: skewed DataFrame
-        :param joinCol: join column
-        :param numberCustsToBroadcast: number of custs to broadcast
-        :return: DataFrame representing the data after the operation
-        """
-        jdf = self._get_utils(skewed).broadcastJoinSkewed(notSkewed._jdf, skewed._jdf, joinCol,
numberCustsToBroadcast)
-        return DataFrame(jdf, self._sqlContext)
-
-    def joinWithRange(self, dfSingle, colSingle, dfRange, colRangeStart, colRangeEnd, decreaseFactor):
-        """
-        Helper function to join a table with column to a table with range of the same column.
-        For example, ip table with whois data that has range of ips as lines.
-        The main problem which this handles is doing naive explode on the range can result
in huge table.
-        requires:
-        1. single table needs to be distinct on the join column, because there could be a
few corresponding ranges so we dedup at the end - we choose the minimal range.
-        2. the range and single columns to be numeric.
-        """
-        jdf = self._get_utils(dfSingle).joinWithRange(dfSingle._jdf, colSingle, dfRange._jdf,
colRangeStart, colRangeEnd, decreaseFactor)
-        return DataFrame(jdf, self._sqlContext)
-
-    def joinWithRangeAndDedup(self, dfSingle, colSingle, dfRange, colRangeStart, colRangeEnd,
decreaseFactor, dedupSmallRange):
-        """
-        Helper function to join a table with column to a table with range of the same column.
-        For example, ip table with whois data that has range of ips as lines.
-        The main problem which this handles is doing naive explode on the range can result
in huge table.
-        requires:
-        1. single table needs to be distinct on the join column, because there could be a
few corresponding ranges so we dedup at the end - we choose the minimal range.
-        2. the range and single columns to be numeric.
-        """
-        jdf = self._get_utils(dfSingle).joinWithRangeAndDedup(dfSingle._jdf, colSingle, dfRange._jdf,
colRangeStart, colRangeEnd, decreaseFactor, dedupSmallRange)
-        return DataFrame(jdf, self._sqlContext)
-
-    def _cols_to_java_cols(self, cols):
-        return self._map_if_needed(lambda x: x._jc, cols)
-
-    def _dfs_to_java_dfs(self, dfs):
-        return self._map_if_needed(lambda x: x._jdf, dfs)
-
-    def _map_if_needed(self, func, itr):
-        return map(func, itr) if itr is not None else itr
+def _get_utils(df):
+    _gateway = df._sc._gateway
+    return _getjvm_class(_gateway, "datafu.spark.SparkDFUtilsBridge")
+
+
+# public:
+
+
+def dedup(df, group_col, order_cols = []):
+    """
+    Used get the 'latest' record (after ordering according to the provided order columns)
in each group.
+    :param df: DataFrame to operate on
+    :param group_col: column to group by the records
+    :param order_cols: columns to order the records according to.
+    :return: DataFrame representing the data after the operation
+    """
+    java_cols = _cols_to_java_cols(order_cols)
+    jdf = _get_utils(df).dedup(df._jdf, group_col._jc, java_cols)
+    return DataFrame(jdf, df.sql_ctx)
+
+
+def dedup_top_n(df, n, group_col, order_cols = []):
+    """
+    Used get the top N records (after ordering according to the provided order columns) in
each group.
+    :param df: DataFrame to operate on
+    :param n: number of records to return from each group
+    :param group_col: column to group by the records
+    :param order_cols: columns to order the records according to
+    :return: DataFrame representing the data after the operation
+    """
+    java_cols = _cols_to_java_cols(order_cols)
+    jdf = _get_utils(df).dedupTopN(df._jdf, n, group_col._jc, java_cols)
+    return DataFrame(jdf, df.sql_ctx)
+
+
+def dedup2(df, group_col, order_by_col, desc = True, columns_filter = [], columns_filter_keep
= True):
+    """
+    Used get the 'latest' record (after ordering according to the provided order columns)
in each group.
+    :param df: DataFrame to operate on
+    :param group_col: column to group by the records
+    :param order_by_col: column to order the records according to
+    :param desc: have the order as desc
+    :param columns_filter: columns to filter
+    :param columns_filter_keep: indicates whether we should filter the selected columns 'out'
or alternatively have only
+*                          those columns in the result
+    :return: DataFrame representing the data after the operation
+    """
+    jdf = _get_utils(df).dedup2(df._jdf, group_col._jc, order_by_col._jc, desc, columns_filter,
columns_filter_keep)
+    return DataFrame(jdf, df.sql_ctx)
+
+
+def change_schema(df, new_scheme = []):
+    """
+    Returns a DataFrame with the column names renamed to the column names in the new schema
+    :param df: DataFrame to operate on
+    :param new_scheme: new column names
+    :return: DataFrame representing the data after the operation
+    """
+    jdf = _get_utils(df).changeSchema(df._jdf, new_scheme)
+    return DataFrame(jdf, df.sql_ctx)
+
+
+def join_skewed(df_left, df_right, join_exprs, num_shards = 30, join_type="inner"):
+    """
+    Used to perform a join when the right df is relatively small but doesn't fit to perform
broadcast join.
+    Use cases:
+        a. excluding keys that might be skew from a medium size list.
+        b. join a big skewed table with a table that has small number of very big rows.
+    :param df_left: left DataFrame
+    :param df_right: right DataFrame
+    :param join_exprs: join expression
+    :param num_shards: number of shards
+    :param join_type: join type
+    :return: DataFrame representing the data after the operation
+    """
+    jdf = _get_utils(df_left).joinSkewed(df_left._jdf, df_right._jdf, join_exprs._jc, num_shards,
join_type)
+    return DataFrame(jdf, df_left.sql_ctx)
+
+
+def broadcast_join_skewed(not_skewed_df, skewed_df, join_col, number_of_custs_to_broadcast):
+    """
+    Suitable to perform a join in cases when one DF is skewed and the other is not skewed.
+    splits both of the DFs to two parts according to the skewed keys.
+    1. Map-join: broadcasts the skewed-keys part of the not skewed DF to the skewed-keys
part of the skewed DF
+    2. Regular join: between the remaining two parts.
+    :param not_skewed_df: not skewed DataFrame
+    :param skewed_df: skewed DataFrame
+    :param join_col: join column
+    :param number_of_custs_to_broadcast: number of custs to broadcast
+    :return: DataFrame representing the data after the operation
+    """
+    jdf = _get_utils(skewed_df).broadcastJoinSkewed(not_skewed_df._jdf, skewed_df._jdf, join_col,
number_of_custs_to_broadcast)
+    return DataFrame(jdf, not_skewed_df.sql_ctx)
+
+
+def join_with_range(df_single, col_single, df_range, col_range_start, col_range_end, decrease_factor):
+    """
+    Helper function to join a table with column to a table with range of the same column.
+    For example, ip table with whois data that has range of ips as lines.
+    The main problem which this handles is doing naive explode on the range can result in
huge table.
+    requires:
+    1. single table needs to be distinct on the join column, because there could be a few
corresponding ranges so we dedup at the end - we choose the minimal range.
+    2. the range and single columns to be numeric.
+    """
+    jdf = _get_utils(df_single).joinWithRange(df_single._jdf, col_single, df_range._jdf,
col_range_start, col_range_end, decrease_factor)
+    return DataFrame(jdf, df_single.sql_ctx)
+
+
+def join_with_range_and_dedup(df_single, col_single, df_range, col_range_start, col_range_end,
decrease_factor, dedup_small_range):
+    """
+    Helper function to join a table with column to a table with range of the same column.
+    For example, ip table with whois data that has range of ips as lines.
+    The main problem which this handles is doing naive explode on the range can result in
huge table.
+    requires:
+    1. single table needs to be distinct on the join column, because there could be a few
corresponding ranges so we dedup at the end - we choose the minimal range.
+    2. the range and single columns to be numeric.
+    """
+    jdf = _get_utils(df_single).joinWithRangeAndDedup(df_single._jdf, col_single, df_range._jdf,
col_range_start, col_range_end, decrease_factor, dedup_small_range)
+    return DataFrame(jdf, df_single.sql_ctx)
+
+
+def _cols_to_java_cols(cols):
+    return _map_if_needed(lambda x: x._jc, cols)
+
+
+def _dfs_to_java_dfs(dfs):
+    return _map_if_needed(lambda x: x._jdf, dfs)
+
+
+def _map_if_needed(func, itr):
+    return map(func, itr) if itr is not None else itr
+
+
+def activate():
+    """Activate integration between datafu-spark and PySpark.
+    This function only needs to be called once.
+
+    This technique taken from pymongo_spark
+    https://github.com/mongodb/mongo-hadoop/blob/master/spark/src/main/python/pymongo_spark.py
+    """
+    pyspark.sql.DataFrame.dedup = dedup
+    pyspark.sql.DataFrame.dedup_top_n = dedup_top_n
+    pyspark.sql.DataFrame.dedup2 = dedup2
+    pyspark.sql.DataFrame.change_schema = change_schema
+    pyspark.sql.DataFrame.join_skewed = join_skewed
+    pyspark.sql.DataFrame.broadcast_join_skewed = broadcast_join_skewed
+    pyspark.sql.DataFrame.join_with_range = join_with_range
+    pyspark.sql.DataFrame.join_with_range_and_dedup = join_with_range_and_dedup
 
diff --git a/datafu-spark/src/test/resources/python_tests/df_utils_tests.py b/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
index c33a88f..d47d4a1 100644
--- a/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
+++ b/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
@@ -21,14 +21,14 @@ import os
 import sys
 from pprint import pprint as p
 
-from pyspark_utils.df_utils import PySparkDFUtils
+from pyspark_utils import df_utils
 
 p('CHECKING IF PATHS EXISTS:')
 for x in sys.path:
     p('PATH ' + x + ': ' + str(os.path.exists(x)))
 
 
-df_utils = PySparkDFUtils()
+df_utils.activate()
 
 df_people = sqlContext.createDataFrame([
     ("a", "Alice", 34),
@@ -41,19 +41,19 @@ df_people = sqlContext.createDataFrame([
     ("c", "Zoey", 36)],
     ["id", "name", "age"])
 
-func_dedup_res = df_utils.dedup(dataFrame=df_people, groupCol=df_people.id,
-                             orderCols=[df_people.age.desc(), df_people.name.desc()])
+func_dedup_res = df_people.dedup(group_col=df_people.id,
+                             order_cols=[df_people.age.desc(), df_people.name.desc()])
 func_dedup_res.registerTempTable("dedup")
 
-func_dedupTopN_res = df_utils.dedupTopN(dataFrame=df_people, n=2, groupCol=df_people.id,
-                                     orderCols=[df_people.age.desc(), df_people.name.desc()])
+func_dedupTopN_res = df_people.dedup_top_n(n=2, group_col=df_people.id,
+                                     order_cols=[df_people.age.desc(), df_people.name.desc()])
 func_dedupTopN_res.registerTempTable("dedupTopN")
 
-func_dedup2_res = df_utils.dedup2(dataFrame=df_people, groupCol=df_people.id, orderByCol=df_people.age,
desc=True,
-                               columnsFilter=["name"], columnsFilterKeep=False)
+func_dedup2_res = df_people.dedup2(group_col=df_people.id, order_by_col=df_people.age, desc=True,
+                               columns_filter=["name"], columns_filter_keep=False)
 func_dedup2_res.registerTempTable("dedup2")
 
-func_changeSchema_res = df_utils.changeSchema(dataFrame=df_people, newScheme=["id1", "name1",
"age1"])
+func_changeSchema_res = df_people.change_schema(new_scheme=["id1", "name1", "age1"])
 func_changeSchema_res.registerTempTable("changeSchema")
 
 df_people2 = sqlContext.createDataFrame([
@@ -67,24 +67,24 @@ simpleDF = sqlContext.createDataFrame([
     ["id", "value"])
 from pyspark.sql.functions import expr
 
-func_joinSkewed_res = df_utils.joinSkewed(dfLeft=df_people2.alias("df1"), dfRight=simpleDF.alias("df2"),
-                                       joinExprs=expr("df1.id == df2.id"), numShards=5,
-                                       joinType="inner")
+func_joinSkewed_res = df_utils.join_skewed(df_left=df_people2.alias("df1"), df_right=simpleDF.alias("df2"),
+                                           join_exprs=expr("df1.id == df2.id"), num_shards=5,
+                                           join_type="inner")
 func_joinSkewed_res.registerTempTable("joinSkewed")
 
-func_broadcastJoinSkewed_res = df_utils.broadcastJoinSkewed(notSkewed=df_people2, skewed=simpleDF,
joinCol="id",
-                                                         numberCustsToBroadcast=5)
+func_broadcastJoinSkewed_res = df_utils.broadcast_join_skewed(not_skewed_df=df_people2, skewed_df=simpleDF,
join_col="id",
+                                                              number_of_custs_to_broadcast=5)
 func_broadcastJoinSkewed_res.registerTempTable("broadcastJoinSkewed")
 
 dfRange = sqlContext.createDataFrame([
     ("a", 34, 36)],
     ["id1", "start", "end"])
-func_joinWithRange_res = df_utils.joinWithRange(dfSingle=df_people2, colSingle="age", dfRange=dfRange,
-                                             colRangeStart="start", colRangeEnd="end",
-                                             decreaseFactor=5)
+func_joinWithRange_res = df_utils.join_with_range(df_single=df_people2, col_single="age",
df_range=dfRange,
+                                                  col_range_start="start", col_range_end="end",
+                                                  decrease_factor=5)
 func_joinWithRange_res.registerTempTable("joinWithRange")
 
-func_joinWithRangeAndDedup_res = df_utils.joinWithRangeAndDedup(dfSingle=df_people2, colSingle="age",
dfRange=dfRange,
-                                                  colRangeStart="start", colRangeEnd="end",
-                                                  decreaseFactor=5, dedupSmallRange=True)
+func_joinWithRangeAndDedup_res = df_utils.join_with_range_and_dedup(df_single=df_people2,
col_single="age", df_range=dfRange,
+                                                                    col_range_start="start",
col_range_end="end",
+                                                                    decrease_factor=5, dedup_small_range=True)
 func_joinWithRangeAndDedup_res.registerTempTable("joinWithRangeAndDedup")


Mime
View raw message