spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sofia’s World <mmistr...@gmail.com>
Subject Re: Using Lambda function to generate random data in PySpark throws not defined error
Date Sat, 12 Dec 2020 21:48:15 GMT
Hi Mich
 i dont think it's a good idea...  I believe your IDE is playing tricks on
you.
Take spark out of the equation.... this is a python issue only.
i am  guessing your IDE is somehow messing up your environment.

if you take out the whole spark code and replace it by this code

map(lambda x: (x, uf.clustered(x,numRows), \
                           uf.scattered(x,numRows), \
                           uf.randomised(x, numRows), \
                           uf.randomString(50), \
                           uf.padString(x," ",50), \
                           uf.padSingleChar("x",4000)), [1,2,3,4,5])

you should get exactly the same error...

Send me a zip with the tfconstants,py and a trimmed donw version of your
main,py and i'll plug it in my IDE and see if i can reproduce
It worked fine in  Jupyter, but then i have all functins in same notebook
hth
 marco
















On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> I solved the issue of variable numRows within the lambda function not
> defined by defining it as a Global variable
>
> global numRows
> numRows = 10   ## do in increment of 50K rows otherwise you blow up driver memory!
> #
>
> Then I could call it within the lambda function as follows
>
>
> rdd = sc.parallelize(Range). \
>          map(lambda x: (x, uf.clustered(x,numRows), \
>                            uf.scattered(x,numRows), \
>                            uf.randomised(x, numRows), \
>                            uf.randomString(50), \
>                            uf.padString(x," ",50), \
>                            uf.padSingleChar("x",4000)))
>
> This then worked. I am not convinced this is *the correct* solution but
> somehow it worked.
>
>
> Thanks
>
>
> Mich
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 11 Dec 2020 at 18:52, Mich Talebzadeh <mich.talebzadeh@gmail.com>
> wrote:
>
>> many thanks KR.
>>
>> If i call the clusterted function on its own it works
>>
>> numRows = 100000
>>
>> print(uf.clustered(200,numRows))
>>
>> and returns
>>
>> 0.00199
>> If I run all in one including the UsedFunctions claa in the same py file
>> it works. The code is attached
>>
>> However, in PyCharm, I do the following
>>
>> UsedFunctions.py. Note that this file only contains functions and no class
>>
>> import logging
>> import random
>> import string
>> import math
>>
>> def randomString(length):
>>     letters = string.ascii_letters
>>     result_str = ''.join(random.choice(letters) for i in range(length))
>>     return result_str
>>
>> def clustered(x,numRows):
>>     return math.floor(x -1)/numRows
>>
>> def scattered(x,numRows):
>>     return abs((x -1 % numRows))* 1.0
>>
>> def randomised(seed,numRows):
>>     random.seed(seed)
>>     return abs(random.randint(0, numRows) % numRows) * 1.0
>>
>> def padString(x,chars,length):
>>     n = int(math.log10(x) + 1)
>>     result_str = ''.join(random.choice(chars) for i in range(length-n))
>> + str(x)
>>     return result_str
>>
>> def padSingleChar(chars,length):
>>     result_str = ''.join(chars for i in range(length))
>>     return result_str
>>
>> def println(lst):
>>     for ll in lst:
>>       print(ll[0])
>>
>> In the main.py(PyCharm)  I have this code which is failing
>>
>> from pyspark import SparkContext, SparkConf
>>
>> from pyspark.sql import SQLContext
>>
>> from pyspark.sql import HiveContext
>>
>> from pyspark.sql import SparkSession
>>
>> from pyspark.sql import Row
>>
>> from pyspark.sql.types import StringType, ArrayType
>>
>> from pyspark.sql.functions import udf, col, max as max, to_date,
>> date_add, \
>>
>>     add_months
>>
>> from datetime import datetime, timedelta
>>
>> import os
>>
>> from os.path import join, abspath
>>
>> from typing import Optional
>>
>> import logging
>>
>> import random
>>
>> import string
>>
>> import math
>>
>> import mathOperations as mo
>>
>> import UsedFunctions as uf
>>
>> ##import test_oracle as to
>>
>>
>> class main:
>>
>>   rec = {}
>>
>>   settings = [
>>
>>                 ("hive.exec.dynamic.partition", "true"),
>>
>>                 ("hive.exec.dynamic.partition.mode", "nonstrict"),
>>
>>                 ("spark.sql.orc.filterPushdown", "true"),
>>
>>                 ("hive.msck.path.validation", "ignore"),
>>
>>                 ("spark.sql.caseSensitive", "true"),
>>
>>                 ("spark.speculation", "false"),
>>
>>                 ("hive.metastore.authorization.storage.checks", "false"),
>>
>>                 ("hive.metastore.client.connect.retry.delay", "5s"),
>>
>>                 ("hive.metastore.client.socket.timeout", "1800s"),
>>
>>                 ("hive.metastore.connect.retries", "12"),
>>
>>                 ("hive.metastore.execute.setugi", "false"),
>>
>>                 ("hive.metastore.failure.retries", "12"),
>>
>>                 ("hive.metastore.schema.verification", "false"),
>>
>>                 ("hive.metastore.schema.verification.record.version",
>> "false"),
>>
>>                 ("hive.metastore.server.max.threads", "100000"),
>>
>>                 ("hive.metastore.authorization.storage.checks",
>> "/apps/hive/warehouse")
>>
>> ]
>>
>>   configs = {"DB":"pycharm",
>>
>>            "tableName":"randomDataPy"}
>>
>>   DB = "pycharm"
>>
>>   tableName = "randomDataPy"
>>
>>   fullyQualifiedTableName = DB +"."+tableName
>>
>>   spark = SparkSession.builder \
>>
>>           .appName("app1") \
>>
>>           .enableHiveSupport() \
>>
>>           .getOrCreate()
>>
>>
>>   spark.sparkContext._conf.setAll(settings)
>>
>>
>>   sc = SparkContext.getOrCreate()
>>
>>   print(sc.getConf().getAll())
>>
>>   sqlContext = SQLContext(sc)
>>
>>   HiveContext = HiveContext(sc)
>>
>>   lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy
>> HH:mm:ss.ss') ")).collect()
>>
>>   print("\nStarted at");uf.println(lst)
>>
>>
>>   numRows = 100000   ## do in increment of 50K rows otherwise you blow up
>> driver memory!
>>
>>   #
>>
>>   ## Check if table exist otherwise create it
>>
>>
>>   rows = 0
>>
>>   sqltext  = ""
>>
>>   if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
>>
>>     rows = spark.sql(f"""SELECT COUNT(1) FROM
>> {fullyQualifiedTableName}""").collect()[0][0]
>>
>>     print ("number of rows is ",rows)
>>
>>   else:
>>
>>     print(f"\nTable {fullyQualifiedTableName} does not exist, creating
>> table ")
>>
>>     sqltext = """
>>
>>     CREATE TABLE {DB}.{tableName}(
>>
>>     ID INT
>>
>>     , CLUSTERED INT
>>
>>     , SCATTERED INT
>>
>>     , RANDOMISED INT
>>
>>     , RANDOM_STRING VARCHAR(50)
>>
>>     , SMALL_VC VARCHAR(50)
>>
>>     , PADDING  VARCHAR(4000)
>>
>>     )
>>
>>     STORED AS PARQUET
>>
>>     """
>>
>>     spark.sql(sqltext)
>>
>>
>>   start = 0
>>
>>   if (rows == 0):
>>
>>     start = 1
>>
>>   else:
>>
>>     maxID = spark.sql(f"SELECT MAX(id) FROM
>> {fullyQualifiedTableName}").collect()[0][0]
>>
>>     start = maxID + 1
>>
>>     end = start + numRows - 1
>>
>>   print ("starting at ID = ",start, ",ending on = ",end)
>>
>>   Range = range(start, end+1)
>>
>>   ## This traverses through the Range and increment "x" by one unit each
>> time, and that x value is used in the code to generate random data through
>> Python functions in a class
>>
>>   print(numRows)
>>
>>   print(uf.clustered(200,numRows))
>>
>>   rdd = sc.parallelize(Range). \
>>
>>            map(lambda x: (x, uf.clustered(x, numRows), \
>>
>>                              uf.scattered(x,10000), \
>>
>>                              uf.randomised(x,10000), \
>>
>>                              uf.randomString(50), \
>>
>>                              uf.padString(x," ",50), \
>>
>>                              uf.padSingleChar("x",4000)))
>>
>>   df = rdd.toDF(). \
>>
>>        withColumnRenamed("_1","ID"). \
>>
>>        withColumnRenamed("_2", "CLUSTERED"). \
>>
>>        withColumnRenamed("_3", "SCATTERED"). \
>>
>>        withColumnRenamed("_4", "RANDOMISED"). \
>>
>>        withColumnRenamed("_5", "RANDOM_STRING"). \
>>
>>        withColumnRenamed("_6", "SMALL_VC"). \
>>
>>        withColumnRenamed("_7", "PADDING")
>>
>>   df.write.mode("overwrite").saveAsTable("pycharm.ABCD")
>>
>>   df.printSchema()
>>
>>   df.explain()
>>
>>   df.createOrReplaceTempView("tmp")
>>
>>   sqltext = f"""
>>
>>     INSERT INTO TABLE {fullyQualifiedTableName}
>>
>>     SELECT
>>
>>             ID
>>
>>           , CLUSTERED
>>
>>           , SCATTERED
>>
>>           , RANDOMISED
>>
>>           , RANDOM_STRING
>>
>>           , SMALL_VC
>>
>>           , PADDING
>>
>>     FROM tmp
>>
>>     """
>>
>>   spark.sql(sqltext)
>>
>>   spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM
>> {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False)
>>
>>   ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY
>> id""").show(n=20,truncate=False,vertical=False)
>>
>>   lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy
>> HH:mm:ss.ss') ")).collect()
>>
>>   print("\nFinished at");usedFunctions.println(lst)
>>
>>
>>
>>
>>
>> On Fri, 11 Dec 2020 at 18:04, Sofia’s World <mmistroni@gmail.com> wrote:
>>
>>> copying and pasting your code code in a jup notebook works fine. that
>>> is, using my own version of Range which is simply a list of numbers
>>>
>>> how bout this.. does this work fine?
>>> list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))
>>>
>>> If it does, i'd look in what's inside your Range and what you get out of
>>> it. I suspect something wrong in there
>>>
>>> If there was something with the clustered function, then you should be
>>> able to take it out of the map() and still have the code working..
>>> Could you try that as well?
>>> kr
>>>
>>>
>>> On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Sorry, part of the code is not that visible
>>>>
>>>> rdd = sc.parallelize(Range). \
>>>>            map(lambda x: (x, uf.clustered(x, numRows), \
>>>>                              uf.scattered(x,10000), \
>>>>                              uf.randomised(x,10000), \
>>>>                              uf.randomString(50), \
>>>>                              uf.padString(x," ",50), \
>>>>                              uf.padSingleChar("x",4000)))
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Thanks Sean,
>>>>>
>>>>> This is the code
>>>>>
>>>>> numRows = 100000   ## do in increment of 50K rows otherwise you blow
up driver memory!
>>>>> #
>>>>> ## Check if table exist otherwise create it
>>>>>
>>>>>
>>>>> rows = 0
>>>>> sqltext  = ""
>>>>> if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
>>>>>   rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]
>>>>>   print ("number of rows is ",rows)
>>>>> else:
>>>>>   print(f"\nTable {fullyQualifiedTableName} does not exist, creating
table ")
>>>>>   sqltext = """
>>>>>   CREATE TABLE {DB}.{tableName}(
>>>>>   ID INT
>>>>>   , CLUSTERED INT
>>>>>   , SCATTERED INT
>>>>>   , RANDOMISED INT
>>>>>   , RANDOM_STRING VARCHAR(50)
>>>>>   , SMALL_VC VARCHAR(50)
>>>>>   , PADDING  VARCHAR(4000)
>>>>>   )
>>>>>   STORED AS PARQUET
>>>>>   """
>>>>>   spark.sql(sqltext)
>>>>>
>>>>> start = 0
>>>>> if (rows == 0):
>>>>>   start = 1
>>>>> else:
>>>>>   maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]
>>>>>   start = maxID + 1
>>>>>   end = start + numRows - 1
>>>>> print ("starting at ID = ",start, ",ending on = ",end)
>>>>> Range = range(start, end+1)
>>>>> ## This traverses through the Range and increment "x" by one unit each
time, and that x value is used in the code to generate random data through Python functions
in a class
>>>>> print(numRows)
>>>>> print(uf.clustered(200,numRows))
>>>>> rdd = sc.parallelize(Range). \
>>>>>          map(lambda x: (x, uf.clustered(x, numRows), \
>>>>>                            uf.scattered(x,10000), \
>>>>>                            uf.randomised(x,10000), \
>>>>>                            uf.randomString(50), \
>>>>>                            uf.padString(x," ",50), \
>>>>>                            uf.padSingleChar("x",4000)))
>>>>> df = rdd.toDF(). \
>>>>>      withColumnRenamed("_1","ID"). \
>>>>>      withColumnRenamed("_2", "CLUSTERED"). \
>>>>>      withColumnRenamed("_3", "SCATTERED"). \
>>>>>      withColumnRenamed("_4", "RANDOMISED"). \
>>>>>      withColumnRenamed("_5", "RANDOM_STRING"). \
>>>>>      withColumnRenamed("_6", "SMALL_VC"). \
>>>>>      withColumnRenamed("_7", "PADDING")
>>>>>
>>>>>
>>>>> And this is the run with error
>>>>>
>>>>>
>>>>> Started at
>>>>>
>>>>> 11/12/2020 14:42:45.45
>>>>>
>>>>> number of rows is  4500000
>>>>>
>>>>> starting at ID =  4500001 ,ending on =  4600000
>>>>>
>>>>> 100000
>>>>>
>>>>> 0.00199
>>>>>
>>>>> 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0
>>>>> (TID 33)
>>>>>
>>>>> org.apache.spark.api.python.PythonException: Traceback (most recent
>>>>> call last):
>>>>>
>>>>>   File
>>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
>>>>> line 605, in main
>>>>>
>>>>>   File
>>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
>>>>> line 597, in process
>>>>>
>>>>>   File
>>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
>>>>> line 271, in dump_stream
>>>>>
>>>>>     vs = list(itertools.islice(iterator, batch))
>>>>>
>>>>>   File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440,
>>>>> in takeUpToNumLeft
>>>>>
>>>>>     yield next(iterator)
>>>>>
>>>>>   File
>>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py",
line
>>>>> 107, in wrapper
>>>>>
>>>>>     return f(*args, **kwargs)
>>>>>
>>>>>   File
>>>>> "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line
>>>>> 101, in <lambda>
>>>>>
>>>>>     map(lambda x: (x, uf.clustered(x, numRows), \
>>>>>
>>>>> NameError: name 'numRows' is not defined
>>>>>
>>>>> Regards,
>>>>>
>>>>> Mich
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, 11 Dec 2020 at 16:47, Sean Owen <srowen@gmail.com> wrote:
>>>>>
>>>>>> Looks like a simple Python error - you haven't shown the code that
>>>>>> produces it. Indeed, I suspect you'll find there is no such symbol.
>>>>>>
>>>>>> On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <
>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> This used to work but not anymore.
>>>>>>>
>>>>>>> I have UsedFunctions.py file that has these functions
>>>>>>>
>>>>>>> import random
>>>>>>> import string
>>>>>>> import math
>>>>>>>
>>>>>>> def randomString(length):
>>>>>>>     letters = string.ascii_letters
>>>>>>>     result_str = ''.join(random.choice(letters) for i in range(length))
>>>>>>>     return result_str
>>>>>>>
>>>>>>> def clustered(x,numRows):
>>>>>>>     return math.floor(x -1)/numRows
>>>>>>>
>>>>>>> def scattered(x,numRows):
>>>>>>>     return abs((x -1 % numRows))* 1.0
>>>>>>>
>>>>>>> def randomised(seed,numRows):
>>>>>>>     random.seed(seed)
>>>>>>>     return abs(random.randint(0, numRows) % numRows) * 1.0
>>>>>>>
>>>>>>> def padString(x,chars,length):
>>>>>>>     n = int(math.log10(x) + 1)
>>>>>>>     result_str = ''.join(random.choice(chars) for i in range(length-n))
+ str(x)
>>>>>>>     return result_str
>>>>>>>
>>>>>>> def padSingleChar(chars,length):
>>>>>>>     result_str = ''.join(chars for i in range(length))
>>>>>>>     return result_str
>>>>>>>
>>>>>>> def println(lst):
>>>>>>>     for ll in lst:
>>>>>>>       print(ll[0])
>>>>>>>
>>>>>>> Now in the main().py module I import this file as follows:
>>>>>>>
>>>>>>> import UsedFunctions as uf
>>>>>>>
>>>>>>> Then I try the following
>>>>>>>
>>>>>>> import UsedFunctions as uf
>>>>>>>
>>>>>>>  numRows = 100000   ## do in increment of 100K rows
>>>>>>>  rdd = sc.parallelize(Range). \
>>>>>>>            map(lambda x: (x, uf.clustered(x, numRows), \
>>>>>>>                              uf.scattered(x,10000), \
>>>>>>>                              uf.randomised(x,10000), \
>>>>>>>                              uf.randomString(50), \
>>>>>>>                              uf.padString(x," ",50), \
>>>>>>>                              uf.padSingleChar("x",4000)))
>>>>>>> The problem is that now it throws error for numRows as below
>>>>>>>
>>>>>>>
>>>>>>>   File
>>>>>>> "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py",
line
>>>>>>> 101, in <lambda>
>>>>>>>     map(lambda x: (x, uf.clustered(x, numRows), \
>>>>>>> NameError: name 'numRows' is not defined
>>>>>>>
>>>>>>> I don't know why this error is coming!
>>>>>>>
>>>>>>> Appreciate any ideas
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Mich
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property
which may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary
damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>

Mime
View raw message