This is an automated email from the ASF dual-hosted git repository.
eyal pushed a commit to branch spark-tmp
in repository https://gitbox.apache.org/repos/asf/datafu.git
The following commit(s) were added to refs/heads/spark-tmp by this push:
new d8f8d58 expose python functions to pyspark dataFrames
d8f8d58 is described below
commit d8f8d587d30f1b69c098a0af893d544eb6d61541
Author: Ohad Raviv <oraviv@paypal.com>
AuthorDate: Mon Jul 8 20:42:45 2019 +0300
expose python functions to pyspark dataFrames
Signed-off-by: Eyal Allweil <eyal@apache.org>
---
.../src/main/resources/pyspark_utils/df_utils.py | 283 +++++++++++----------
.../test/resources/python_tests/df_utils_tests.py | 40 +--
2 files changed, 169 insertions(+), 154 deletions(-)
diff --git a/datafu-spark/src/main/resources/pyspark_utils/df_utils.py b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
index f6261c5..d91228e 100644
--- a/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
+++ b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
@@ -15,142 +15,157 @@
# specific language governing permissions and limitations
# under the License.
+import pyspark
from pyspark.sql import DataFrame
-
from pyspark_utils.bridge_utils import _getjvm_class
-class PySparkDFUtils(object):
-
- def __init__(self):
- self._sc = None
-
- def _initSparkContext(self, sc, sqlContext):
- self._sc = sc
- self._sqlContext = sqlContext
- self._gateway = sc._gateway
-
- def _get_jvm_spark_utils(self):
- jvm_utils = _getjvm_class(self._gateway, "datafu.spark.SparkDFUtilsBridge")
- return jvm_utils
-
- def _get_utils(self, df):
- self._initSparkContext(df._sc, df.sql_ctx)
- return self._get_jvm_spark_utils()
-
- # public:
-
- def dedup(self, dataFrame, groupCol, orderCols = []):
- """
- Used get the 'latest' record (after ordering according to the provided order columns)
in each group.
- :param dataFrame: DataFrame to operate on
- :param groupCol: column to group by the records
- :param orderCols: columns to order the records according to.
- :return: DataFrame representing the data after the operation
- """
- java_cols = self._cols_to_java_cols(orderCols)
- jdf = self._get_utils(dataFrame).dedup(dataFrame._jdf, groupCol._jc, java_cols)
- return DataFrame(jdf, self._sqlContext)
-
- def dedupTopN(self, dataFrame, n, groupCol, orderCols = []):
- """
- Used get the top N records (after ordering according to the provided order columns)
in each group.
- :param dataFrame: DataFrame to operate on
- :param n: number of records to return from each group
- :param groupCol: column to group by the records
- :param orderCols: columns to order the records according to
- :return: DataFrame representing the data after the operation
- """
- java_cols = self._cols_to_java_cols(orderCols)
- jdf = self._get_utils(dataFrame).dedupTopN(dataFrame._jdf, n, groupCol._jc, java_cols)
- return DataFrame(jdf, self._sqlContext)
-
- def dedup2(self, dataFrame, groupCol, orderByCol, desc = True, columnsFilter = [], columnsFilterKeep
= True):
- """
- Used get the 'latest' record (after ordering according to the provided order columns)
in each group.
- :param dataFrame: DataFrame to operate on
- :param groupCol: column to group by the records
- :param orderByCol: column to order the records according to
- :param desc: have the order as desc
- :param columnsFilter: columns to filter
- :param columnsFilterKeep: indicates whether we should filter the selected columns
'out' or alternatively have only
- * those columns in the result
- :return: DataFrame representing the data after the operation
- """
- jdf = self._get_utils(dataFrame).dedup2(dataFrame._jdf, groupCol._jc, orderByCol._jc,
desc, columnsFilter, columnsFilterKeep)
- return DataFrame(jdf, self._sqlContext)
-
- def changeSchema(self, dataFrame, newScheme = []):
- """
- Returns a DataFrame with the column names renamed to the column names in the new
schema
- :param dataFrame: DataFrame to operate on
- :param newScheme: new column names
- :return: DataFrame representing the data after the operation
- """
- jdf = self._get_utils(dataFrame).changeSchema(dataFrame._jdf, newScheme)
- return DataFrame(jdf, self._sqlContext)
-
- def joinSkewed(self, dfLeft, dfRight, joinExprs, numShards = 30, joinType= "inner"):
- """
- Used to perform a join when the right df is relatively small but doesn't fit to perform
broadcast join.
- Use cases:
- a. excluding keys that might be skew from a medium size list.
- b. join a big skewed table with a table that has small number of very big rows.
- :param dfLeft: left DataFrame
- :param dfRight: right DataFrame
- :param joinExprs: join expression
- :param numShards: number of shards
- :param joinType: join type
- :return: DataFrame representing the data after the operation
- """
- jdf = self._get_utils(dfLeft).joinSkewed(dfLeft._jdf, dfRight._jdf, joinExprs._jc,
numShards, joinType)
- return DataFrame(jdf, self._sqlContext)
-
- def broadcastJoinSkewed(self, notSkewed, skewed, joinCol, numberCustsToBroadcast):
- """
- Suitable to perform a join in cases when one DF is skewed and the other is not skewed.
- splits both of the DFs to two parts according to the skewed keys.
- 1. Map-join: broadcasts the skewed-keys part of the not skewed DF to the skewed-keys
part of the skewed DF
- 2. Regular join: between the remaining two parts.
- :param notSkewed: not skewed DataFrame
- :param skewed: skewed DataFrame
- :param joinCol: join column
- :param numberCustsToBroadcast: number of custs to broadcast
- :return: DataFrame representing the data after the operation
- """
- jdf = self._get_utils(skewed).broadcastJoinSkewed(notSkewed._jdf, skewed._jdf, joinCol,
numberCustsToBroadcast)
- return DataFrame(jdf, self._sqlContext)
-
- def joinWithRange(self, dfSingle, colSingle, dfRange, colRangeStart, colRangeEnd, decreaseFactor):
- """
- Helper function to join a table with column to a table with range of the same column.
- For example, ip table with whois data that has range of ips as lines.
- The main problem which this handles is doing naive explode on the range can result
in huge table.
- requires:
- 1. single table needs to be distinct on the join column, because there could be a
few corresponding ranges so we dedup at the end - we choose the minimal range.
- 2. the range and single columns to be numeric.
- """
- jdf = self._get_utils(dfSingle).joinWithRange(dfSingle._jdf, colSingle, dfRange._jdf,
colRangeStart, colRangeEnd, decreaseFactor)
- return DataFrame(jdf, self._sqlContext)
-
- def joinWithRangeAndDedup(self, dfSingle, colSingle, dfRange, colRangeStart, colRangeEnd,
decreaseFactor, dedupSmallRange):
- """
- Helper function to join a table with column to a table with range of the same column.
- For example, ip table with whois data that has range of ips as lines.
- The main problem which this handles is doing naive explode on the range can result
in huge table.
- requires:
- 1. single table needs to be distinct on the join column, because there could be a
few corresponding ranges so we dedup at the end - we choose the minimal range.
- 2. the range and single columns to be numeric.
- """
- jdf = self._get_utils(dfSingle).joinWithRangeAndDedup(dfSingle._jdf, colSingle, dfRange._jdf,
colRangeStart, colRangeEnd, decreaseFactor, dedupSmallRange)
- return DataFrame(jdf, self._sqlContext)
-
- def _cols_to_java_cols(self, cols):
- return self._map_if_needed(lambda x: x._jc, cols)
-
- def _dfs_to_java_dfs(self, dfs):
- return self._map_if_needed(lambda x: x._jdf, dfs)
-
- def _map_if_needed(self, func, itr):
- return map(func, itr) if itr is not None else itr
+def _get_utils(df):
+ _gateway = df._sc._gateway
+ return _getjvm_class(_gateway, "datafu.spark.SparkDFUtilsBridge")
+
+
+# public:
+
+
+def dedup(df, group_col, order_cols = []):
+ """
+ Used get the 'latest' record (after ordering according to the provided order columns)
in each group.
+ :param df: DataFrame to operate on
+ :param group_col: column to group by the records
+ :param order_cols: columns to order the records according to.
+ :return: DataFrame representing the data after the operation
+ """
+ java_cols = _cols_to_java_cols(order_cols)
+ jdf = _get_utils(df).dedup(df._jdf, group_col._jc, java_cols)
+ return DataFrame(jdf, df.sql_ctx)
+
+
+def dedup_top_n(df, n, group_col, order_cols = []):
+ """
+ Used get the top N records (after ordering according to the provided order columns) in
each group.
+ :param df: DataFrame to operate on
+ :param n: number of records to return from each group
+ :param group_col: column to group by the records
+ :param order_cols: columns to order the records according to
+ :return: DataFrame representing the data after the operation
+ """
+ java_cols = _cols_to_java_cols(order_cols)
+ jdf = _get_utils(df).dedupTopN(df._jdf, n, group_col._jc, java_cols)
+ return DataFrame(jdf, df.sql_ctx)
+
+
+def dedup2(df, group_col, order_by_col, desc = True, columns_filter = [], columns_filter_keep
= True):
+ """
+ Used get the 'latest' record (after ordering according to the provided order columns)
in each group.
+ :param df: DataFrame to operate on
+ :param group_col: column to group by the records
+ :param order_by_col: column to order the records according to
+ :param desc: have the order as desc
+ :param columns_filter: columns to filter
+ :param columns_filter_keep: indicates whether we should filter the selected columns 'out'
or alternatively have only
+* those columns in the result
+ :return: DataFrame representing the data after the operation
+ """
+ jdf = _get_utils(df).dedup2(df._jdf, group_col._jc, order_by_col._jc, desc, columns_filter,
columns_filter_keep)
+ return DataFrame(jdf, df.sql_ctx)
+
+
+def change_schema(df, new_scheme = []):
+ """
+ Returns a DataFrame with the column names renamed to the column names in the new schema
+ :param df: DataFrame to operate on
+ :param new_scheme: new column names
+ :return: DataFrame representing the data after the operation
+ """
+ jdf = _get_utils(df).changeSchema(df._jdf, new_scheme)
+ return DataFrame(jdf, df.sql_ctx)
+
+
+def join_skewed(df_left, df_right, join_exprs, num_shards = 30, join_type="inner"):
+ """
+ Used to perform a join when the right df is relatively small but doesn't fit to perform
broadcast join.
+ Use cases:
+ a. excluding keys that might be skew from a medium size list.
+ b. join a big skewed table with a table that has small number of very big rows.
+ :param df_left: left DataFrame
+ :param df_right: right DataFrame
+ :param join_exprs: join expression
+ :param num_shards: number of shards
+ :param join_type: join type
+ :return: DataFrame representing the data after the operation
+ """
+ jdf = _get_utils(df_left).joinSkewed(df_left._jdf, df_right._jdf, join_exprs._jc, num_shards,
join_type)
+ return DataFrame(jdf, df_left.sql_ctx)
+
+
+def broadcast_join_skewed(not_skewed_df, skewed_df, join_col, number_of_custs_to_broadcast):
+ """
+ Suitable to perform a join in cases when one DF is skewed and the other is not skewed.
+ splits both of the DFs to two parts according to the skewed keys.
+ 1. Map-join: broadcasts the skewed-keys part of the not skewed DF to the skewed-keys
part of the skewed DF
+ 2. Regular join: between the remaining two parts.
+ :param not_skewed_df: not skewed DataFrame
+ :param skewed_df: skewed DataFrame
+ :param join_col: join column
+ :param number_of_custs_to_broadcast: number of custs to broadcast
+ :return: DataFrame representing the data after the operation
+ """
+ jdf = _get_utils(skewed_df).broadcastJoinSkewed(not_skewed_df._jdf, skewed_df._jdf, join_col,
number_of_custs_to_broadcast)
+ return DataFrame(jdf, not_skewed_df.sql_ctx)
+
+
+def join_with_range(df_single, col_single, df_range, col_range_start, col_range_end, decrease_factor):
+ """
+ Helper function to join a table with column to a table with range of the same column.
+ For example, ip table with whois data that has range of ips as lines.
+ The main problem which this handles is doing naive explode on the range can result in
huge table.
+ requires:
+ 1. single table needs to be distinct on the join column, because there could be a few
corresponding ranges so we dedup at the end - we choose the minimal range.
+ 2. the range and single columns to be numeric.
+ """
+ jdf = _get_utils(df_single).joinWithRange(df_single._jdf, col_single, df_range._jdf,
col_range_start, col_range_end, decrease_factor)
+ return DataFrame(jdf, df_single.sql_ctx)
+
+
+def join_with_range_and_dedup(df_single, col_single, df_range, col_range_start, col_range_end,
decrease_factor, dedup_small_range):
+ """
+ Helper function to join a table with column to a table with range of the same column.
+ For example, ip table with whois data that has range of ips as lines.
+ The main problem which this handles is doing naive explode on the range can result in
huge table.
+ requires:
+ 1. single table needs to be distinct on the join column, because there could be a few
corresponding ranges so we dedup at the end - we choose the minimal range.
+ 2. the range and single columns to be numeric.
+ """
+ jdf = _get_utils(df_single).joinWithRangeAndDedup(df_single._jdf, col_single, df_range._jdf,
col_range_start, col_range_end, decrease_factor, dedup_small_range)
+ return DataFrame(jdf, df_single.sql_ctx)
+
+
+def _cols_to_java_cols(cols):
+ return _map_if_needed(lambda x: x._jc, cols)
+
+
+def _dfs_to_java_dfs(dfs):
+ return _map_if_needed(lambda x: x._jdf, dfs)
+
+
+def _map_if_needed(func, itr):
+ return map(func, itr) if itr is not None else itr
+
+
+def activate():
+ """Activate integration between datafu-spark and PySpark.
+ This function only needs to be called once.
+
+ This technique taken from pymongo_spark
+ https://github.com/mongodb/mongo-hadoop/blob/master/spark/src/main/python/pymongo_spark.py
+ """
+ pyspark.sql.DataFrame.dedup = dedup
+ pyspark.sql.DataFrame.dedup_top_n = dedup_top_n
+ pyspark.sql.DataFrame.dedup2 = dedup2
+ pyspark.sql.DataFrame.change_schema = change_schema
+ pyspark.sql.DataFrame.join_skewed = join_skewed
+ pyspark.sql.DataFrame.broadcast_join_skewed = broadcast_join_skewed
+ pyspark.sql.DataFrame.join_with_range = join_with_range
+ pyspark.sql.DataFrame.join_with_range_and_dedup = join_with_range_and_dedup
diff --git a/datafu-spark/src/test/resources/python_tests/df_utils_tests.py b/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
index c33a88f..d47d4a1 100644
--- a/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
+++ b/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
@@ -21,14 +21,14 @@ import os
import sys
from pprint import pprint as p
-from pyspark_utils.df_utils import PySparkDFUtils
+from pyspark_utils import df_utils
p('CHECKING IF PATHS EXISTS:')
for x in sys.path:
p('PATH ' + x + ': ' + str(os.path.exists(x)))
-df_utils = PySparkDFUtils()
+df_utils.activate()
df_people = sqlContext.createDataFrame([
("a", "Alice", 34),
@@ -41,19 +41,19 @@ df_people = sqlContext.createDataFrame([
("c", "Zoey", 36)],
["id", "name", "age"])
-func_dedup_res = df_utils.dedup(dataFrame=df_people, groupCol=df_people.id,
- orderCols=[df_people.age.desc(), df_people.name.desc()])
+func_dedup_res = df_people.dedup(group_col=df_people.id,
+ order_cols=[df_people.age.desc(), df_people.name.desc()])
func_dedup_res.registerTempTable("dedup")
-func_dedupTopN_res = df_utils.dedupTopN(dataFrame=df_people, n=2, groupCol=df_people.id,
- orderCols=[df_people.age.desc(), df_people.name.desc()])
+func_dedupTopN_res = df_people.dedup_top_n(n=2, group_col=df_people.id,
+ order_cols=[df_people.age.desc(), df_people.name.desc()])
func_dedupTopN_res.registerTempTable("dedupTopN")
-func_dedup2_res = df_utils.dedup2(dataFrame=df_people, groupCol=df_people.id, orderByCol=df_people.age,
desc=True,
- columnsFilter=["name"], columnsFilterKeep=False)
+func_dedup2_res = df_people.dedup2(group_col=df_people.id, order_by_col=df_people.age, desc=True,
+ columns_filter=["name"], columns_filter_keep=False)
func_dedup2_res.registerTempTable("dedup2")
-func_changeSchema_res = df_utils.changeSchema(dataFrame=df_people, newScheme=["id1", "name1",
"age1"])
+func_changeSchema_res = df_people.change_schema(new_scheme=["id1", "name1", "age1"])
func_changeSchema_res.registerTempTable("changeSchema")
df_people2 = sqlContext.createDataFrame([
@@ -67,24 +67,24 @@ simpleDF = sqlContext.createDataFrame([
["id", "value"])
from pyspark.sql.functions import expr
-func_joinSkewed_res = df_utils.joinSkewed(dfLeft=df_people2.alias("df1"), dfRight=simpleDF.alias("df2"),
- joinExprs=expr("df1.id == df2.id"), numShards=5,
- joinType="inner")
+func_joinSkewed_res = df_utils.join_skewed(df_left=df_people2.alias("df1"), df_right=simpleDF.alias("df2"),
+ join_exprs=expr("df1.id == df2.id"), num_shards=5,
+ join_type="inner")
func_joinSkewed_res.registerTempTable("joinSkewed")
-func_broadcastJoinSkewed_res = df_utils.broadcastJoinSkewed(notSkewed=df_people2, skewed=simpleDF,
joinCol="id",
- numberCustsToBroadcast=5)
+func_broadcastJoinSkewed_res = df_utils.broadcast_join_skewed(not_skewed_df=df_people2, skewed_df=simpleDF,
join_col="id",
+ number_of_custs_to_broadcast=5)
func_broadcastJoinSkewed_res.registerTempTable("broadcastJoinSkewed")
dfRange = sqlContext.createDataFrame([
("a", 34, 36)],
["id1", "start", "end"])
-func_joinWithRange_res = df_utils.joinWithRange(dfSingle=df_people2, colSingle="age", dfRange=dfRange,
- colRangeStart="start", colRangeEnd="end",
- decreaseFactor=5)
+func_joinWithRange_res = df_utils.join_with_range(df_single=df_people2, col_single="age",
df_range=dfRange,
+ col_range_start="start", col_range_end="end",
+ decrease_factor=5)
func_joinWithRange_res.registerTempTable("joinWithRange")
-func_joinWithRangeAndDedup_res = df_utils.joinWithRangeAndDedup(dfSingle=df_people2, colSingle="age",
dfRange=dfRange,
- colRangeStart="start", colRangeEnd="end",
- decreaseFactor=5, dedupSmallRange=True)
+func_joinWithRangeAndDedup_res = df_utils.join_with_range_and_dedup(df_single=df_people2,
col_single="age", df_range=dfRange,
+ col_range_start="start",
col_range_end="end",
+ decrease_factor=5, dedup_small_range=True)
func_joinWithRangeAndDedup_res.registerTempTable("joinWithRangeAndDedup")
|