spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Using Lambda function to generate random data in PySpark throws not defined error
Date Sat, 12 Dec 2020 21:02:33 GMT
I solved the issue of variable numRows within the lambda function not
defined by defining it as a Global variable

global numRows
numRows = 10   ## do in increment of 50K rows otherwise you blow up
driver memory!
#

Then I could call it within the lambda function as follows


rdd = sc.parallelize(Range). \
         map(lambda x: (x, uf.clustered(x,numRows), \
                           uf.scattered(x,numRows), \
                           uf.randomised(x, numRows), \
                           uf.randomString(50), \
                           uf.padString(x," ",50), \
                           uf.padSingleChar("x",4000)))

This then worked. I am not convinced this is *the correct* solution but
somehow it worked.


Thanks


Mich


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 11 Dec 2020 at 18:52, Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> many thanks KR.
>
> If i call the clusterted function on its own it works
>
> numRows = 100000
>
> print(uf.clustered(200,numRows))
>
> and returns
>
> 0.00199
> If I run all in one including the UsedFunctions claa in the same py file
> it works. The code is attached
>
> However, in PyCharm, I do the following
>
> UsedFunctions.py. Note that this file only contains functions and no class
>
> import logging
> import random
> import string
> import math
>
> def randomString(length):
>     letters = string.ascii_letters
>     result_str = ''.join(random.choice(letters) for i in range(length))
>     return result_str
>
> def clustered(x,numRows):
>     return math.floor(x -1)/numRows
>
> def scattered(x,numRows):
>     return abs((x -1 % numRows))* 1.0
>
> def randomised(seed,numRows):
>     random.seed(seed)
>     return abs(random.randint(0, numRows) % numRows) * 1.0
>
> def padString(x,chars,length):
>     n = int(math.log10(x) + 1)
>     result_str = ''.join(random.choice(chars) for i in range(length-n)) +
> str(x)
>     return result_str
>
> def padSingleChar(chars,length):
>     result_str = ''.join(chars for i in range(length))
>     return result_str
>
> def println(lst):
>     for ll in lst:
>       print(ll[0])
>
> In the main.py(PyCharm)  I have this code which is failing
>
> from pyspark import SparkContext, SparkConf
>
> from pyspark.sql import SQLContext
>
> from pyspark.sql import HiveContext
>
> from pyspark.sql import SparkSession
>
> from pyspark.sql import Row
>
> from pyspark.sql.types import StringType, ArrayType
>
> from pyspark.sql.functions import udf, col, max as max, to_date, date_add,
> \
>
>     add_months
>
> from datetime import datetime, timedelta
>
> import os
>
> from os.path import join, abspath
>
> from typing import Optional
>
> import logging
>
> import random
>
> import string
>
> import math
>
> import mathOperations as mo
>
> import UsedFunctions as uf
>
> ##import test_oracle as to
>
>
> class main:
>
>   rec = {}
>
>   settings = [
>
>                 ("hive.exec.dynamic.partition", "true"),
>
>                 ("hive.exec.dynamic.partition.mode", "nonstrict"),
>
>                 ("spark.sql.orc.filterPushdown", "true"),
>
>                 ("hive.msck.path.validation", "ignore"),
>
>                 ("spark.sql.caseSensitive", "true"),
>
>                 ("spark.speculation", "false"),
>
>                 ("hive.metastore.authorization.storage.checks", "false"),
>
>                 ("hive.metastore.client.connect.retry.delay", "5s"),
>
>                 ("hive.metastore.client.socket.timeout", "1800s"),
>
>                 ("hive.metastore.connect.retries", "12"),
>
>                 ("hive.metastore.execute.setugi", "false"),
>
>                 ("hive.metastore.failure.retries", "12"),
>
>                 ("hive.metastore.schema.verification", "false"),
>
>                 ("hive.metastore.schema.verification.record.version",
> "false"),
>
>                 ("hive.metastore.server.max.threads", "100000"),
>
>                 ("hive.metastore.authorization.storage.checks",
> "/apps/hive/warehouse")
>
> ]
>
>   configs = {"DB":"pycharm",
>
>            "tableName":"randomDataPy"}
>
>   DB = "pycharm"
>
>   tableName = "randomDataPy"
>
>   fullyQualifiedTableName = DB +"."+tableName
>
>   spark = SparkSession.builder \
>
>           .appName("app1") \
>
>           .enableHiveSupport() \
>
>           .getOrCreate()
>
>
>   spark.sparkContext._conf.setAll(settings)
>
>
>   sc = SparkContext.getOrCreate()
>
>   print(sc.getConf().getAll())
>
>   sqlContext = SQLContext(sc)
>
>   HiveContext = HiveContext(sc)
>
>   lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy
> HH:mm:ss.ss') ")).collect()
>
>   print("\nStarted at");uf.println(lst)
>
>
>   numRows = 100000   ## do in increment of 50K rows otherwise you blow up
> driver memory!
>
>   #
>
>   ## Check if table exist otherwise create it
>
>
>   rows = 0
>
>   sqltext  = ""
>
>   if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
>
>     rows = spark.sql(f"""SELECT COUNT(1) FROM
> {fullyQualifiedTableName}""").collect()[0][0]
>
>     print ("number of rows is ",rows)
>
>   else:
>
>     print(f"\nTable {fullyQualifiedTableName} does not exist, creating
> table ")
>
>     sqltext = """
>
>     CREATE TABLE {DB}.{tableName}(
>
>     ID INT
>
>     , CLUSTERED INT
>
>     , SCATTERED INT
>
>     , RANDOMISED INT
>
>     , RANDOM_STRING VARCHAR(50)
>
>     , SMALL_VC VARCHAR(50)
>
>     , PADDING  VARCHAR(4000)
>
>     )
>
>     STORED AS PARQUET
>
>     """
>
>     spark.sql(sqltext)
>
>
>   start = 0
>
>   if (rows == 0):
>
>     start = 1
>
>   else:
>
>     maxID = spark.sql(f"SELECT MAX(id) FROM
> {fullyQualifiedTableName}").collect()[0][0]
>
>     start = maxID + 1
>
>     end = start + numRows - 1
>
>   print ("starting at ID = ",start, ",ending on = ",end)
>
>   Range = range(start, end+1)
>
>   ## This traverses through the Range and increment "x" by one unit each
> time, and that x value is used in the code to generate random data through
> Python functions in a class
>
>   print(numRows)
>
>   print(uf.clustered(200,numRows))
>
>   rdd = sc.parallelize(Range). \
>
>            map(lambda x: (x, uf.clustered(x, numRows), \
>
>                              uf.scattered(x,10000), \
>
>                              uf.randomised(x,10000), \
>
>                              uf.randomString(50), \
>
>                              uf.padString(x," ",50), \
>
>                              uf.padSingleChar("x",4000)))
>
>   df = rdd.toDF(). \
>
>        withColumnRenamed("_1","ID"). \
>
>        withColumnRenamed("_2", "CLUSTERED"). \
>
>        withColumnRenamed("_3", "SCATTERED"). \
>
>        withColumnRenamed("_4", "RANDOMISED"). \
>
>        withColumnRenamed("_5", "RANDOM_STRING"). \
>
>        withColumnRenamed("_6", "SMALL_VC"). \
>
>        withColumnRenamed("_7", "PADDING")
>
>   df.write.mode("overwrite").saveAsTable("pycharm.ABCD")
>
>   df.printSchema()
>
>   df.explain()
>
>   df.createOrReplaceTempView("tmp")
>
>   sqltext = f"""
>
>     INSERT INTO TABLE {fullyQualifiedTableName}
>
>     SELECT
>
>             ID
>
>           , CLUSTERED
>
>           , SCATTERED
>
>           , RANDOMISED
>
>           , RANDOM_STRING
>
>           , SMALL_VC
>
>           , PADDING
>
>     FROM tmp
>
>     """
>
>   spark.sql(sqltext)
>
>   spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM
> {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False)
>
>   ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY
> id""").show(n=20,truncate=False,vertical=False)
>
>   lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy
> HH:mm:ss.ss') ")).collect()
>
>   print("\nFinished at");usedFunctions.println(lst)
>
>
>
>
>
> On Fri, 11 Dec 2020 at 18:04, Sofia’s World <mmistroni@gmail.com> wrote:
>
>> copying and pasting your code code in a jup notebook works fine. that is,
>> using my own version of Range which is simply a list of numbers
>>
>> how bout this.. does this work fine?
>> list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))
>>
>> If it does, i'd look in what's inside your Range and what you get out of
>> it. I suspect something wrong in there
>>
>> If there was something with the clustered function, then you should be
>> able to take it out of the map() and still have the code working..
>> Could you try that as well?
>> kr
>>
>>
>> On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Sorry, part of the code is not that visible
>>>
>>> rdd = sc.parallelize(Range). \
>>>            map(lambda x: (x, uf.clustered(x, numRows), \
>>>                              uf.scattered(x,10000), \
>>>                              uf.randomised(x,10000), \
>>>                              uf.randomString(50), \
>>>                              uf.padString(x," ",50), \
>>>                              uf.padSingleChar("x",4000)))
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh <mich.talebzadeh@gmail.com>
>>> wrote:
>>>
>>>> Thanks Sean,
>>>>
>>>> This is the code
>>>>
>>>> numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver
memory!
>>>> #
>>>> ## Check if table exist otherwise create it
>>>>
>>>>
>>>> rows = 0
>>>> sqltext  = ""
>>>> if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
>>>>   rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]
>>>>   print ("number of rows is ",rows)
>>>> else:
>>>>   print(f"\nTable {fullyQualifiedTableName} does not exist, creating table
")
>>>>   sqltext = """
>>>>   CREATE TABLE {DB}.{tableName}(
>>>>   ID INT
>>>>   , CLUSTERED INT
>>>>   , SCATTERED INT
>>>>   , RANDOMISED INT
>>>>   , RANDOM_STRING VARCHAR(50)
>>>>   , SMALL_VC VARCHAR(50)
>>>>   , PADDING  VARCHAR(4000)
>>>>   )
>>>>   STORED AS PARQUET
>>>>   """
>>>>   spark.sql(sqltext)
>>>>
>>>> start = 0
>>>> if (rows == 0):
>>>>   start = 1
>>>> else:
>>>>   maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]
>>>>   start = maxID + 1
>>>>   end = start + numRows - 1
>>>> print ("starting at ID = ",start, ",ending on = ",end)
>>>> Range = range(start, end+1)
>>>> ## This traverses through the Range and increment "x" by one unit each time,
and that x value is used in the code to generate random data through Python functions in a
class
>>>> print(numRows)
>>>> print(uf.clustered(200,numRows))
>>>> rdd = sc.parallelize(Range). \
>>>>          map(lambda x: (x, uf.clustered(x, numRows), \
>>>>                            uf.scattered(x,10000), \
>>>>                            uf.randomised(x,10000), \
>>>>                            uf.randomString(50), \
>>>>                            uf.padString(x," ",50), \
>>>>                            uf.padSingleChar("x",4000)))
>>>> df = rdd.toDF(). \
>>>>      withColumnRenamed("_1","ID"). \
>>>>      withColumnRenamed("_2", "CLUSTERED"). \
>>>>      withColumnRenamed("_3", "SCATTERED"). \
>>>>      withColumnRenamed("_4", "RANDOMISED"). \
>>>>      withColumnRenamed("_5", "RANDOM_STRING"). \
>>>>      withColumnRenamed("_6", "SMALL_VC"). \
>>>>      withColumnRenamed("_7", "PADDING")
>>>>
>>>>
>>>> And this is the run with error
>>>>
>>>>
>>>> Started at
>>>>
>>>> 11/12/2020 14:42:45.45
>>>>
>>>> number of rows is  4500000
>>>>
>>>> starting at ID =  4500001 ,ending on =  4600000
>>>>
>>>> 100000
>>>>
>>>> 0.00199
>>>>
>>>> 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0
>>>> (TID 33)
>>>>
>>>> org.apache.spark.api.python.PythonException: Traceback (most recent
>>>> call last):
>>>>
>>>>   File
>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
>>>> line 605, in main
>>>>
>>>>   File
>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
>>>> line 597, in process
>>>>
>>>>   File
>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
>>>> line 271, in dump_stream
>>>>
>>>>     vs = list(itertools.islice(iterator, batch))
>>>>
>>>>   File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440,
>>>> in takeUpToNumLeft
>>>>
>>>>     yield next(iterator)
>>>>
>>>>   File
>>>> "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line
>>>> 107, in wrapper
>>>>
>>>>     return f(*args, **kwargs)
>>>>
>>>>   File
>>>> "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line
>>>> 101, in <lambda>
>>>>
>>>>     map(lambda x: (x, uf.clustered(x, numRows), \
>>>>
>>>> NameError: name 'numRows' is not defined
>>>>
>>>> Regards,
>>>>
>>>> Mich
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, 11 Dec 2020 at 16:47, Sean Owen <srowen@gmail.com> wrote:
>>>>
>>>>> Looks like a simple Python error - you haven't shown the code that
>>>>> produces it. Indeed, I suspect you'll find there is no such symbol.
>>>>>
>>>>> On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> This used to work but not anymore.
>>>>>>
>>>>>> I have UsedFunctions.py file that has these functions
>>>>>>
>>>>>> import random
>>>>>> import string
>>>>>> import math
>>>>>>
>>>>>> def randomString(length):
>>>>>>     letters = string.ascii_letters
>>>>>>     result_str = ''.join(random.choice(letters) for i in range(length))
>>>>>>     return result_str
>>>>>>
>>>>>> def clustered(x,numRows):
>>>>>>     return math.floor(x -1)/numRows
>>>>>>
>>>>>> def scattered(x,numRows):
>>>>>>     return abs((x -1 % numRows))* 1.0
>>>>>>
>>>>>> def randomised(seed,numRows):
>>>>>>     random.seed(seed)
>>>>>>     return abs(random.randint(0, numRows) % numRows) * 1.0
>>>>>>
>>>>>> def padString(x,chars,length):
>>>>>>     n = int(math.log10(x) + 1)
>>>>>>     result_str = ''.join(random.choice(chars) for i in range(length-n))
+ str(x)
>>>>>>     return result_str
>>>>>>
>>>>>> def padSingleChar(chars,length):
>>>>>>     result_str = ''.join(chars for i in range(length))
>>>>>>     return result_str
>>>>>>
>>>>>> def println(lst):
>>>>>>     for ll in lst:
>>>>>>       print(ll[0])
>>>>>>
>>>>>> Now in the main().py module I import this file as follows:
>>>>>>
>>>>>> import UsedFunctions as uf
>>>>>>
>>>>>> Then I try the following
>>>>>>
>>>>>> import UsedFunctions as uf
>>>>>>
>>>>>>  numRows = 100000   ## do in increment of 100K rows
>>>>>>  rdd = sc.parallelize(Range). \
>>>>>>            map(lambda x: (x, uf.clustered(x, numRows), \
>>>>>>                              uf.scattered(x,10000), \
>>>>>>                              uf.randomised(x,10000), \
>>>>>>                              uf.randomString(50), \
>>>>>>                              uf.padString(x," ",50), \
>>>>>>                              uf.padSingleChar("x",4000)))
>>>>>> The problem is that now it throws error for numRows as below
>>>>>>
>>>>>>
>>>>>>   File
>>>>>> "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py",
line
>>>>>> 101, in <lambda>
>>>>>>     map(lambda x: (x, uf.clustered(x, numRows), \
>>>>>> NameError: name 'numRows' is not defined
>>>>>>
>>>>>> I don't know why this error is coming!
>>>>>>
>>>>>> Appreciate any ideas
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Mich
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property
which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary
damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>

Mime
View raw message