spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rahul kumar <rk20.stor...@gmail.com>
Subject Unable to use scala function in pyspark
Date Sun, 26 Sep 2021 20:00:07 GMT
I'm trying to use a function defined in scala jar in pyspark ( spark 3.0.2).  

--scala ---

Object PythonUtil {

def customedf(dataFrame: DataFrame,
                 keyCol: String,
                 table: String,
                 outputSchema: StructType,
                 database: String): DataFrame = {

// some transformation of dataframe and convert as per the output schema types and fields.
...
resultDF
}

//In jupyter notebook
schema creation:
alias = StructType([StructField("first_name", StringType(), False),StructField("last_name",
StringType(), False)])
name = StructType([StructField("first_name", StringType(), False),StructField("aliases", ArrayType(alias),
False)])
street_adress = StructType([StructField("street_name", StringType(), False),StructField("apt_number",
IntegerType(), False)])
address = StructType([StructField("zip", LongType(), False),StructField("street", street_adress,
False),StructField("city", StringType(), False)])
workHistory = StructType([StructField("company_name", StringType(), False),StructField("company_address",
address, False),StructField("worked_from", StringType(), False)])

//passing this to scala function.
outputschema= StructType([StructField("name", name, False),StructField("SSN", StringType(),
False),StructField("home_address", ArrayType(address), False)])

ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"], ["525-31-0299"], ["456-45-2200"],
["200-71-7765"]]
customerIdsDF=spark.createDataFrame(ssns,["SSN"])

scala2_object= sc._jvm.com.mytest.spark.PythonUtil
pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', 'table', outputschema,
'test'), spark._wrapped)

Then I get an error that AttributeError: 'StructField' object has no attribute '_get_object_id'

full stacktrace
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-25-74a3b3e652e6> in <module>
      4 
      5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil
----> 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', 'table',smallSchema,
'test'), spark._wrapped)

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self,
*args)
   1294 
   1295     def __call__(self, *args):
-> 1296         args_command, temp_args = self._build_args(*args)
   1297 
   1298         command = proto.CALL_COMMAND_NAME +\

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in _build_args(self,
*args)
   1258     def _build_args(self, *args):
   1259         if self.converters is not None and len(self.converters) > 0:
-> 1260             (new_args, temp_args) = self._get_args(args)
   1261         else:
   1262             new_args = args

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in _get_args(self,
args)
   1245                 for converter in self.gateway_client.converters:
   1246                     if converter.can_convert(arg):
-> 1247                         temp_arg = converter.convert(arg, self.gateway_client)
   1248                         temp_args.append(temp_arg)
   1249                         new_args.append(temp_arg)

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py in
convert(self, object, gateway_client)
    509         java_list = ArrayList()
    510         for element in object:
--> 511             java_list.add(element)
    512         return java_list
    513 

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self,
*args)
   1294 
   1295     def __call__(self, *args):
-> 1296         args_command, temp_args = self._build_args(*args)
   1297 
   1298         command = proto.CALL_COMMAND_NAME +\

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in _build_args(self,
*args)
   1264 
   1265         args_command = "".join(
-> 1266             [get_command_part(arg, self.pool) for arg in new_args])
   1267 
   1268         return args_command, temp_args

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in <listcomp>(.0)
   1264 
   1265         args_command = "".join(
-> 1266             [get_command_part(arg, self.pool) for arg in new_args])
   1267 
   1268         return args_command, temp_args

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_command_part(parameter,
python_proxy_pool)
    296             command_part += ";" + interface
    297     else:
--> 298         command_part = REFERENCE_TYPE + parameter._get_object_id()
    299 
    300     command_part += "\n"






---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message