Using Lambda function to generate random data in PySpark throws not defined error

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Using Lambda function to generate random data in PySpark throws not defined error

Mich Talebzadeh
Hi,

This used to work but not anymore.

I have UsedFunctions.py file that has these functions

import random
import string
import math

def randomString(length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str

def clustered(x,numRows):
return math.floor(x -1)/numRows

def scattered(x,numRows):
return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
return result_str

def padSingleChar(chars,length):
result_str = ''.join(chars for i in range(length))
return result_str

def println(lst):
for ll in lst:
print(ll[0])
Now in the main().py module I import this file as follows:

import UsedFunctions as uf

Then I try the following

import UsedFunctions as uf

 numRows = 100000   ## do in increment of 100K rows
 rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))
The problem is that now it throws error for numRows as below


  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>
    map(lambda x: (x, uf.clustered(x, numRows), \
NameError: name 'numRows' is not defined

I don't know why this error is coming!

Appreciate any ideas

Thanks,

Mich



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Using Lambda function to generate random data in PySpark throws not defined error

srowen
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.

On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This used to work but not anymore.

I have UsedFunctions.py file that has these functions

import random
import string
import math

def randomString(length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str

def clustered(x,numRows):
return math.floor(x -1)/numRows

def scattered(x,numRows):
return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
return result_str

def padSingleChar(chars,length):
result_str = ''.join(chars for i in range(length))
return result_str

def println(lst):
for ll in lst:
print(ll[0])
Now in the main().py module I import this file as follows:

import UsedFunctions as uf

Then I try the following

import UsedFunctions as uf

 numRows = 100000   ## do in increment of 100K rows
 rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))
The problem is that now it throws error for numRows as below


  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>
    map(lambda x: (x, uf.clustered(x, numRows), \
NameError: name 'numRows' is not defined

I don't know why this error is coming!

Appreciate any ideas

Thanks,

Mich



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Using Lambda function to generate random data in PySpark throws not defined error

Mich Talebzadeh
Thanks Sean,

This is the code

numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!
#
## Check if table exist otherwise create it


rows = 0
sqltext = ""
if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]
print ("number of rows is ",rows)
else:
print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")
sqltext = """
CREATE TABLE {DB}.{tableName}(
ID INT
, CLUSTERED INT
, SCATTERED INT
, RANDOMISED INT
, RANDOM_STRING VARCHAR(50)
, SMALL_VC VARCHAR(50)
, PADDING VARCHAR(4000)
)
STORED AS PARQUET
"""
spark.sql(sqltext)

start = 0
if (rows == 0):
start = 1
else:
maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]
start = maxID + 1
end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class
print(numRows)
print(uf.clustered(200,numRows))
rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \
uf.scattered(x,10000), \
uf.randomised(x,10000), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))
df = rdd.toDF(). \
withColumnRenamed("_1","ID"). \
withColumnRenamed("_2", "CLUSTERED"). \
withColumnRenamed("_3", "SCATTERED"). \
withColumnRenamed("_4", "RANDOMISED"). \
withColumnRenamed("_5", "RANDOM_STRING"). \
withColumnRenamed("_6", "SMALL_VC"). \
withColumnRenamed("_7", "PADDING")

And this is the run with error


Started at

11/12/2020 14:42:45.45

number of rows is  4500000

starting at ID =  4500001 ,ending on =  4600000

100000

0.00199

20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream

    vs = list(itertools.islice(iterator, batch))

  File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft

    yield next(iterator)

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper

    return f(*args, **kwargs)

  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>

    map(lambda x: (x, uf.clustered(x, numRows), \

NameError: name 'numRows' is not defined


Regards,

Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:47, Sean Owen <[hidden email]> wrote:
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.

On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This used to work but not anymore.

I have UsedFunctions.py file that has these functions

import random
import string
import math

def randomString(length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str

def clustered(x,numRows):
return math.floor(x -1)/numRows

def scattered(x,numRows):
return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
return result_str

def padSingleChar(chars,length):
result_str = ''.join(chars for i in range(length))
return result_str

def println(lst):
for ll in lst:
print(ll[0])
Now in the main().py module I import this file as follows:

import UsedFunctions as uf

Then I try the following

import UsedFunctions as uf

 numRows = 100000   ## do in increment of 100K rows
 rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))
The problem is that now it throws error for numRows as below


  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>
    map(lambda x: (x, uf.clustered(x, numRows), \
NameError: name 'numRows' is not defined

I don't know why this error is coming!

Appreciate any ideas

Thanks,

Mich



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Using Lambda function to generate random data in PySpark throws not defined error

Mich Talebzadeh
Sorry, part of the code is not that visible

rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh <[hidden email]> wrote:
Thanks Sean,

This is the code

numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!
#
## Check if table exist otherwise create it


rows = 0
sqltext = ""
if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]
print ("number of rows is ",rows)
else:
print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")
sqltext = """
CREATE TABLE {DB}.{tableName}(
ID INT
, CLUSTERED INT
, SCATTERED INT
, RANDOMISED INT
, RANDOM_STRING VARCHAR(50)
, SMALL_VC VARCHAR(50)
, PADDING VARCHAR(4000)
)
STORED AS PARQUET
"""
spark.sql(sqltext)

start = 0
if (rows == 0):
start = 1
else:
maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]
start = maxID + 1
end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class
print(numRows)
print(uf.clustered(200,numRows))
rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \
uf.scattered(x,10000), \
uf.randomised(x,10000), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))
df = rdd.toDF(). \
withColumnRenamed("_1","ID"). \
withColumnRenamed("_2", "CLUSTERED"). \
withColumnRenamed("_3", "SCATTERED"). \
withColumnRenamed("_4", "RANDOMISED"). \
withColumnRenamed("_5", "RANDOM_STRING"). \
withColumnRenamed("_6", "SMALL_VC"). \
withColumnRenamed("_7", "PADDING")

And this is the run with error


Started at

11/12/2020 14:42:45.45

number of rows is  4500000

starting at ID =  4500001 ,ending on =  4600000

100000

0.00199

20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream

    vs = list(itertools.islice(iterator, batch))

  File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft

    yield next(iterator)

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper

    return f(*args, **kwargs)

  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>

    map(lambda x: (x, uf.clustered(x, numRows), \

NameError: name 'numRows' is not defined


Regards,

Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:47, Sean Owen <[hidden email]> wrote:
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.

On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This used to work but not anymore.

I have UsedFunctions.py file that has these functions

import random
import string
import math

def randomString(length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str

def clustered(x,numRows):
return math.floor(x -1)/numRows

def scattered(x,numRows):
return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
return result_str

def padSingleChar(chars,length):
result_str = ''.join(chars for i in range(length))
return result_str

def println(lst):
for ll in lst:
print(ll[0])
Now in the main().py module I import this file as follows:

import UsedFunctions as uf

Then I try the following

import UsedFunctions as uf

 numRows = 100000   ## do in increment of 100K rows
 rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))
The problem is that now it throws error for numRows as below


  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>
    map(lambda x: (x, uf.clustered(x, numRows), \
NameError: name 'numRows' is not defined

I don't know why this error is coming!

Appreciate any ideas

Thanks,

Mich



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Using Lambda function to generate random data in PySpark throws not defined error

Marco Mistroni
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers

how bout this.. does this work fine?
list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))

If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there

If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working..
Could you try that as well?
kr


On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh <[hidden email]> wrote:
Sorry, part of the code is not that visible

rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh <[hidden email]> wrote:
Thanks Sean,

This is the code

numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!
#
## Check if table exist otherwise create it


rows = 0
sqltext = ""
if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]
print ("number of rows is ",rows)
else:
print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")
sqltext = """
CREATE TABLE {DB}.{tableName}(
ID INT
, CLUSTERED INT
, SCATTERED INT
, RANDOMISED INT
, RANDOM_STRING VARCHAR(50)
, SMALL_VC VARCHAR(50)
, PADDING VARCHAR(4000)
)
STORED AS PARQUET
"""
spark.sql(sqltext)

start = 0
if (rows == 0):
start = 1
else:
maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]
start = maxID + 1
end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class
print(numRows)
print(uf.clustered(200,numRows))
rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \
uf.scattered(x,10000), \
uf.randomised(x,10000), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))
df = rdd.toDF(). \
withColumnRenamed("_1","ID"). \
withColumnRenamed("_2", "CLUSTERED"). \
withColumnRenamed("_3", "SCATTERED"). \
withColumnRenamed("_4", "RANDOMISED"). \
withColumnRenamed("_5", "RANDOM_STRING"). \
withColumnRenamed("_6", "SMALL_VC"). \
withColumnRenamed("_7", "PADDING")

And this is the run with error


Started at

11/12/2020 14:42:45.45

number of rows is  4500000

starting at ID =  4500001 ,ending on =  4600000

100000

0.00199

20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream

    vs = list(itertools.islice(iterator, batch))

  File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft

    yield next(iterator)

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper

    return f(*args, **kwargs)

  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>

    map(lambda x: (x, uf.clustered(x, numRows), \

NameError: name 'numRows' is not defined


Regards,

Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:47, Sean Owen <[hidden email]> wrote:
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.

On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This used to work but not anymore.

I have UsedFunctions.py file that has these functions

import random
import string
import math

def randomString(length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str

def clustered(x,numRows):
return math.floor(x -1)/numRows

def scattered(x,numRows):
return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
return result_str

def padSingleChar(chars,length):
result_str = ''.join(chars for i in range(length))
return result_str

def println(lst):
for ll in lst:
print(ll[0])
Now in the main().py module I import this file as follows:

import UsedFunctions as uf

Then I try the following

import UsedFunctions as uf

 numRows = 100000   ## do in increment of 100K rows
 rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))
The problem is that now it throws error for numRows as below


  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>
    map(lambda x: (x, uf.clustered(x, numRows), \
NameError: name 'numRows' is not defined

I don't know why this error is coming!

Appreciate any ideas

Thanks,

Mich



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Using Lambda function to generate random data in PySpark throws not defined error

Mich Talebzadeh
many thanks KR.

If i call the clusterted function on its own it works

numRows = 100000  
print(uf.clustered(200,numRows))
and returns 

0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached 

However, in PyCharm, I do the following

UsedFunctions.py. Note that this file only contains functions and no class

import logging
import random
import string
import math

def randomString(length):
    letters = string.ascii_letters
    result_str = ''.join(random.choice(letters) for i in range(length))
    return result_str

def clustered(x,numRows):
    return math.floor(x -1)/numRows

def scattered(x,numRows):
    return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
    random.seed(seed)
    return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
    n = int(math.log10(x) + 1)
    result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
    return result_str

def padSingleChar(chars,length):
    result_str = ''.join(chars for i in range(length))
    return result_str

def println(lst):
    for ll in lst:
      print(ll[0])

In the main.py(PyCharm)  I have this code which is failing

from pyspark import SparkContext, SparkConf

from pyspark.sql import SQLContext

from pyspark.sql import HiveContext

from pyspark.sql import SparkSession

from pyspark.sql import Row

from pyspark.sql.types import StringType, ArrayType

from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \

    add_months

from datetime import datetime, timedelta

import os

from os.path import join, abspath

from typing import Optional

import logging

import random

import string

import math

import mathOperations as mo

import UsedFunctions as uf

##import test_oracle as to


class main:

  rec = {}

  settings = [

                ("hive.exec.dynamic.partition", "true"),

                ("hive.exec.dynamic.partition.mode", "nonstrict"),

                ("spark.sql.orc.filterPushdown", "true"),

                ("hive.msck.path.validation", "ignore"),

                ("spark.sql.caseSensitive", "true"),

                ("spark.speculation", "false"),

                ("hive.metastore.authorization.storage.checks", "false"),

                ("hive.metastore.client.connect.retry.delay", "5s"),

                ("hive.metastore.client.socket.timeout", "1800s"),

                ("hive.metastore.connect.retries", "12"),

                ("hive.metastore.execute.setugi", "false"),

                ("hive.metastore.failure.retries", "12"),

                ("hive.metastore.schema.verification", "false"),

                ("hive.metastore.schema.verification.record.version", "false"),

                ("hive.metastore.server.max.threads", "100000"),

                ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse")

]

  configs = {"DB":"pycharm",

           "tableName":"randomDataPy"}

  DB = "pycharm"

  tableName = "randomDataPy"

  fullyQualifiedTableName = DB +"."+tableName

  spark = SparkSession.builder \

          .appName("app1") \

          .enableHiveSupport() \

          .getOrCreate()


  spark.sparkContext._conf.setAll(settings)


  sc = SparkContext.getOrCreate()

  print(sc.getConf().getAll())

  sqlContext = SQLContext(sc)

  HiveContext = HiveContext(sc)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nStarted at");uf.println(lst)


  numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!

  #

  ## Check if table exist otherwise create it


  rows = 0

  sqltext  = ""

  if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):

    rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]

    print ("number of rows is ",rows)

  else:

    print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")

    sqltext = """

    CREATE TABLE {DB}.{tableName}(

    ID INT

    , CLUSTERED INT

    , SCATTERED INT

    , RANDOMISED INT

    , RANDOM_STRING VARCHAR(50)

    , SMALL_VC VARCHAR(50)

    , PADDING  VARCHAR(4000)

    )

    STORED AS PARQUET

    """

    spark.sql(sqltext)


  start = 0

  if (rows == 0):

    start = 1

  else:

    maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]

    start = maxID + 1

    end = start + numRows - 1

  print ("starting at ID = ",start, ",ending on = ",end)

  Range = range(start, end+1)

  ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class

  print(numRows)

  print(uf.clustered(200,numRows))

  rdd = sc.parallelize(Range). \

           map(lambda x: (x, uf.clustered(x, numRows), \

                             uf.scattered(x,10000), \

                             uf.randomised(x,10000), \

                             uf.randomString(50), \

                             uf.padString(x," ",50), \

                             uf.padSingleChar("x",4000)))

  df = rdd.toDF(). \

       withColumnRenamed("_1","ID"). \

       withColumnRenamed("_2", "CLUSTERED"). \

       withColumnRenamed("_3", "SCATTERED"). \

       withColumnRenamed("_4", "RANDOMISED"). \

       withColumnRenamed("_5", "RANDOM_STRING"). \

       withColumnRenamed("_6", "SMALL_VC"). \

       withColumnRenamed("_7", "PADDING")

  df.write.mode("overwrite").saveAsTable("pycharm.ABCD")

  df.printSchema()

  df.explain()

  df.createOrReplaceTempView("tmp")

  sqltext = f"""

    INSERT INTO TABLE {fullyQualifiedTableName}

    SELECT

            ID

          , CLUSTERED

          , SCATTERED

          , RANDOMISED

          , RANDOM_STRING

          , SMALL_VC

          , PADDING

    FROM tmp

    """

  spark.sql(sqltext)

  spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False)

  ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nFinished at");usedFunctions.println(lst)





On Fri, 11 Dec 2020 at 18:04, Sofia’s World <[hidden email]> wrote:
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers

how bout this.. does this work fine?
list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))

If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there

If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working..
Could you try that as well?
kr


On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh <[hidden email]> wrote:
Sorry, part of the code is not that visible

rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh <[hidden email]> wrote:
Thanks Sean,

This is the code

numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!
#
## Check if table exist otherwise create it


rows = 0
sqltext = ""
if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]
print ("number of rows is ",rows)
else:
print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")
sqltext = """
CREATE TABLE {DB}.{tableName}(
ID INT
, CLUSTERED INT
, SCATTERED INT
, RANDOMISED INT
, RANDOM_STRING VARCHAR(50)
, SMALL_VC VARCHAR(50)
, PADDING VARCHAR(4000)
)
STORED AS PARQUET
"""
spark.sql(sqltext)

start = 0
if (rows == 0):
start = 1
else:
maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]
start = maxID + 1
end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class
print(numRows)
print(uf.clustered(200,numRows))
rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \
uf.scattered(x,10000), \
uf.randomised(x,10000), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))
df = rdd.toDF(). \
withColumnRenamed("_1","ID"). \
withColumnRenamed("_2", "CLUSTERED"). \
withColumnRenamed("_3", "SCATTERED"). \
withColumnRenamed("_4", "RANDOMISED"). \
withColumnRenamed("_5", "RANDOM_STRING"). \
withColumnRenamed("_6", "SMALL_VC"). \
withColumnRenamed("_7", "PADDING")

And this is the run with error


Started at

11/12/2020 14:42:45.45

number of rows is  4500000

starting at ID =  4500001 ,ending on =  4600000

100000

0.00199

20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream

    vs = list(itertools.islice(iterator, batch))

  File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft

    yield next(iterator)

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper

    return f(*args, **kwargs)

  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>

    map(lambda x: (x, uf.clustered(x, numRows), \

NameError: name 'numRows' is not defined


Regards,

Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:47, Sean Owen <[hidden email]> wrote:
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.

On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This used to work but not anymore.

I have UsedFunctions.py file that has these functions

import random
import string
import math

def randomString(length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str

def clustered(x,numRows):
return math.floor(x -1)/numRows

def scattered(x,numRows):
return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
return result_str

def padSingleChar(chars,length):
result_str = ''.join(chars for i in range(length))
return result_str

def println(lst):
for ll in lst:
print(ll[0])
Now in the main().py module I import this file as follows:

import UsedFunctions as uf

Then I try the following

import UsedFunctions as uf

 numRows = 100000   ## do in increment of 100K rows
 rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))
The problem is that now it throws error for numRows as below


  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>
    map(lambda x: (x, uf.clustered(x, numRows), \
NameError: name 'numRows' is not defined

I don't know why this error is coming!

Appreciate any ideas

Thanks,

Mich



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

dynamic_ARRAY_generator_parquet.text.py (5K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Using Lambda function to generate random data in PySpark throws not defined error

Mich Talebzadeh
I solved the issue of variable numRows within the lambda function not defined by defining it as a Global variable

global numRows
numRows = 10 ## do in increment of 50K rows otherwise you blow up driver memory!
#

Then I could call it within the lambda function as follows


rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x,numRows), \
uf.scattered(x,numRows), \
uf.randomised(x, numRows), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))

This then worked. I am not convinced this is *the correct* solution but somehow it worked.


Thanks


Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 18:52, Mich Talebzadeh <[hidden email]> wrote:
many thanks KR.

If i call the clusterted function on its own it works

numRows = 100000  
print(uf.clustered(200,numRows))
and returns 

0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached 

However, in PyCharm, I do the following

UsedFunctions.py. Note that this file only contains functions and no class

import logging
import random
import string
import math

def randomString(length):
    letters = string.ascii_letters
    result_str = ''.join(random.choice(letters) for i in range(length))
    return result_str

def clustered(x,numRows):
    return math.floor(x -1)/numRows

def scattered(x,numRows):
    return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
    random.seed(seed)
    return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
    n = int(math.log10(x) + 1)
    result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
    return result_str

def padSingleChar(chars,length):
    result_str = ''.join(chars for i in range(length))
    return result_str

def println(lst):
    for ll in lst:
      print(ll[0])

In the main.py(PyCharm)  I have this code which is failing

from pyspark import SparkContext, SparkConf

from pyspark.sql import SQLContext

from pyspark.sql import HiveContext

from pyspark.sql import SparkSession

from pyspark.sql import Row

from pyspark.sql.types import StringType, ArrayType

from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \

    add_months

from datetime import datetime, timedelta

import os

from os.path import join, abspath

from typing import Optional

import logging

import random

import string

import math

import mathOperations as mo

import UsedFunctions as uf

##import test_oracle as to


class main:

  rec = {}

  settings = [

                ("hive.exec.dynamic.partition", "true"),

                ("hive.exec.dynamic.partition.mode", "nonstrict"),

                ("spark.sql.orc.filterPushdown", "true"),

                ("hive.msck.path.validation", "ignore"),

                ("spark.sql.caseSensitive", "true"),

                ("spark.speculation", "false"),

                ("hive.metastore.authorization.storage.checks", "false"),

                ("hive.metastore.client.connect.retry.delay", "5s"),

                ("hive.metastore.client.socket.timeout", "1800s"),

                ("hive.metastore.connect.retries", "12"),

                ("hive.metastore.execute.setugi", "false"),

                ("hive.metastore.failure.retries", "12"),

                ("hive.metastore.schema.verification", "false"),

                ("hive.metastore.schema.verification.record.version", "false"),

                ("hive.metastore.server.max.threads", "100000"),

                ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse")

]

  configs = {"DB":"pycharm",

           "tableName":"randomDataPy"}

  DB = "pycharm"

  tableName = "randomDataPy"

  fullyQualifiedTableName = DB +"."+tableName

  spark = SparkSession.builder \

          .appName("app1") \

          .enableHiveSupport() \

          .getOrCreate()


  spark.sparkContext._conf.setAll(settings)


  sc = SparkContext.getOrCreate()

  print(sc.getConf().getAll())

  sqlContext = SQLContext(sc)

  HiveContext = HiveContext(sc)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nStarted at");uf.println(lst)


  numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!

  #

  ## Check if table exist otherwise create it


  rows = 0

  sqltext  = ""

  if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):

    rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]

    print ("number of rows is ",rows)

  else:

    print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")

    sqltext = """

    CREATE TABLE {DB}.{tableName}(

    ID INT

    , CLUSTERED INT

    , SCATTERED INT

    , RANDOMISED INT

    , RANDOM_STRING VARCHAR(50)

    , SMALL_VC VARCHAR(50)

    , PADDING  VARCHAR(4000)

    )

    STORED AS PARQUET

    """

    spark.sql(sqltext)


  start = 0

  if (rows == 0):

    start = 1

  else:

    maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]

    start = maxID + 1

    end = start + numRows - 1

  print ("starting at ID = ",start, ",ending on = ",end)

  Range = range(start, end+1)

  ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class

  print(numRows)

  print(uf.clustered(200,numRows))

  rdd = sc.parallelize(Range). \

           map(lambda x: (x, uf.clustered(x, numRows), \

                             uf.scattered(x,10000), \

                             uf.randomised(x,10000), \

                             uf.randomString(50), \

                             uf.padString(x," ",50), \

                             uf.padSingleChar("x",4000)))

  df = rdd.toDF(). \

       withColumnRenamed("_1","ID"). \

       withColumnRenamed("_2", "CLUSTERED"). \

       withColumnRenamed("_3", "SCATTERED"). \

       withColumnRenamed("_4", "RANDOMISED"). \

       withColumnRenamed("_5", "RANDOM_STRING"). \

       withColumnRenamed("_6", "SMALL_VC"). \

       withColumnRenamed("_7", "PADDING")

  df.write.mode("overwrite").saveAsTable("pycharm.ABCD")

  df.printSchema()

  df.explain()

  df.createOrReplaceTempView("tmp")

  sqltext = f"""

    INSERT INTO TABLE {fullyQualifiedTableName}

    SELECT

            ID

          , CLUSTERED

          , SCATTERED

          , RANDOMISED

          , RANDOM_STRING

          , SMALL_VC

          , PADDING

    FROM tmp

    """

  spark.sql(sqltext)

  spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False)

  ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nFinished at");usedFunctions.println(lst)





On Fri, 11 Dec 2020 at 18:04, Sofia’s World <[hidden email]> wrote:
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers

how bout this.. does this work fine?
list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))

If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there

If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working..
Could you try that as well?
kr


On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh <[hidden email]> wrote:
Sorry, part of the code is not that visible

rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh <[hidden email]> wrote:
Thanks Sean,

This is the code

numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!
#
## Check if table exist otherwise create it


rows = 0
sqltext = ""
if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]
print ("number of rows is ",rows)
else:
print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")
sqltext = """
CREATE TABLE {DB}.{tableName}(
ID INT
, CLUSTERED INT
, SCATTERED INT
, RANDOMISED INT
, RANDOM_STRING VARCHAR(50)
, SMALL_VC VARCHAR(50)
, PADDING VARCHAR(4000)
)
STORED AS PARQUET
"""
spark.sql(sqltext)

start = 0
if (rows == 0):
start = 1
else:
maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]
start = maxID + 1
end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class
print(numRows)
print(uf.clustered(200,numRows))
rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \
uf.scattered(x,10000), \
uf.randomised(x,10000), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))
df = rdd.toDF(). \
withColumnRenamed("_1","ID"). \
withColumnRenamed("_2", "CLUSTERED"). \
withColumnRenamed("_3", "SCATTERED"). \
withColumnRenamed("_4", "RANDOMISED"). \
withColumnRenamed("_5", "RANDOM_STRING"). \
withColumnRenamed("_6", "SMALL_VC"). \
withColumnRenamed("_7", "PADDING")

And this is the run with error


Started at

11/12/2020 14:42:45.45

number of rows is  4500000

starting at ID =  4500001 ,ending on =  4600000

100000

0.00199

20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream

    vs = list(itertools.islice(iterator, batch))

  File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft

    yield next(iterator)

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper

    return f(*args, **kwargs)

  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>

    map(lambda x: (x, uf.clustered(x, numRows), \

NameError: name 'numRows' is not defined


Regards,

Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:47, Sean Owen <[hidden email]> wrote:
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.

On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This used to work but not anymore.

I have UsedFunctions.py file that has these functions

import random
import string
import math

def randomString(length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str

def clustered(x,numRows):
return math.floor(x -1)/numRows

def scattered(x,numRows):
return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
return result_str

def padSingleChar(chars,length):
result_str = ''.join(chars for i in range(length))
return result_str

def println(lst):
for ll in lst:
print(ll[0])
Now in the main().py module I import this file as follows:

import UsedFunctions as uf

Then I try the following

import UsedFunctions as uf

 numRows = 100000   ## do in increment of 100K rows
 rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))
The problem is that now it throws error for numRows as below


  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>
    map(lambda x: (x, uf.clustered(x, numRows), \
NameError: name 'numRows' is not defined

I don't know why this error is coming!

Appreciate any ideas

Thanks,

Mich



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Using Lambda function to generate random data in PySpark throws not defined error

Marco Mistroni
Hi Mich
 i dont think it's a good idea...  I believe your IDE is playing tricks on you.
Take spark out of the equation.... this is a python issue only.
i am  guessing your IDE is somehow messing up your environment.

if you take out the whole spark code and replace it by this code

map(lambda x: (x, uf.clustered(x,numRows), \
                           uf.scattered(x,numRows), \
                           uf.randomised(x, numRows), \
                           uf.randomString(50), \
                           uf.padString(x," ",50), \
                           uf.padSingleChar("x",4000)), [1,2,3,4,5])

you should get exactly the same error...

Send me a zip with the tfconstants,py and a trimmed donw version of your main,py and i'll plug it in my IDE and see if i can reproduce
It worked fine in  Jupyter, but then i have all functins in same notebook
hth
 marco
















On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh <[hidden email]> wrote:
I solved the issue of variable numRows within the lambda function not defined by defining it as a Global variable

global numRows
numRows = 10 ## do in increment of 50K rows otherwise you blow up driver memory!
#

Then I could call it within the lambda function as follows


rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x,numRows), \
uf.scattered(x,numRows), \
uf.randomised(x, numRows), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))

This then worked. I am not convinced this is *the correct* solution but somehow it worked.


Thanks


Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 18:52, Mich Talebzadeh <[hidden email]> wrote:
many thanks KR.

If i call the clusterted function on its own it works

numRows = 100000  
print(uf.clustered(200,numRows))
and returns 

0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached 

However, in PyCharm, I do the following

UsedFunctions.py. Note that this file only contains functions and no class

import logging
import random
import string
import math

def randomString(length):
    letters = string.ascii_letters
    result_str = ''.join(random.choice(letters) for i in range(length))
    return result_str

def clustered(x,numRows):
    return math.floor(x -1)/numRows

def scattered(x,numRows):
    return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
    random.seed(seed)
    return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
    n = int(math.log10(x) + 1)
    result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
    return result_str

def padSingleChar(chars,length):
    result_str = ''.join(chars for i in range(length))
    return result_str

def println(lst):
    for ll in lst:
      print(ll[0])

In the main.py(PyCharm)  I have this code which is failing

from pyspark import SparkContext, SparkConf

from pyspark.sql import SQLContext

from pyspark.sql import HiveContext

from pyspark.sql import SparkSession

from pyspark.sql import Row

from pyspark.sql.types import StringType, ArrayType

from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \

    add_months

from datetime import datetime, timedelta

import os

from os.path import join, abspath

from typing import Optional

import logging

import random

import string

import math

import mathOperations as mo

import UsedFunctions as uf

##import test_oracle as to


class main:

  rec = {}

  settings = [

                ("hive.exec.dynamic.partition", "true"),

                ("hive.exec.dynamic.partition.mode", "nonstrict"),

                ("spark.sql.orc.filterPushdown", "true"),

                ("hive.msck.path.validation", "ignore"),

                ("spark.sql.caseSensitive", "true"),

                ("spark.speculation", "false"),

                ("hive.metastore.authorization.storage.checks", "false"),

                ("hive.metastore.client.connect.retry.delay", "5s"),

                ("hive.metastore.client.socket.timeout", "1800s"),

                ("hive.metastore.connect.retries", "12"),

                ("hive.metastore.execute.setugi", "false"),

                ("hive.metastore.failure.retries", "12"),

                ("hive.metastore.schema.verification", "false"),

                ("hive.metastore.schema.verification.record.version", "false"),

                ("hive.metastore.server.max.threads", "100000"),

                ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse")

]

  configs = {"DB":"pycharm",

           "tableName":"randomDataPy"}

  DB = "pycharm"

  tableName = "randomDataPy"

  fullyQualifiedTableName = DB +"."+tableName

  spark = SparkSession.builder \

          .appName("app1") \

          .enableHiveSupport() \

          .getOrCreate()


  spark.sparkContext._conf.setAll(settings)


  sc = SparkContext.getOrCreate()

  print(sc.getConf().getAll())

  sqlContext = SQLContext(sc)

  HiveContext = HiveContext(sc)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nStarted at");uf.println(lst)


  numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!

  #

  ## Check if table exist otherwise create it


  rows = 0

  sqltext  = ""

  if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):

    rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]

    print ("number of rows is ",rows)

  else:

    print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")

    sqltext = """

    CREATE TABLE {DB}.{tableName}(

    ID INT

    , CLUSTERED INT

    , SCATTERED INT

    , RANDOMISED INT

    , RANDOM_STRING VARCHAR(50)

    , SMALL_VC VARCHAR(50)

    , PADDING  VARCHAR(4000)

    )

    STORED AS PARQUET

    """

    spark.sql(sqltext)


  start = 0

  if (rows == 0):

    start = 1

  else:

    maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]

    start = maxID + 1

    end = start + numRows - 1

  print ("starting at ID = ",start, ",ending on = ",end)

  Range = range(start, end+1)

  ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class

  print(numRows)

  print(uf.clustered(200,numRows))

  rdd = sc.parallelize(Range). \

           map(lambda x: (x, uf.clustered(x, numRows), \

                             uf.scattered(x,10000), \

                             uf.randomised(x,10000), \

                             uf.randomString(50), \

                             uf.padString(x," ",50), \

                             uf.padSingleChar("x",4000)))

  df = rdd.toDF(). \

       withColumnRenamed("_1","ID"). \

       withColumnRenamed("_2", "CLUSTERED"). \

       withColumnRenamed("_3", "SCATTERED"). \

       withColumnRenamed("_4", "RANDOMISED"). \

       withColumnRenamed("_5", "RANDOM_STRING"). \

       withColumnRenamed("_6", "SMALL_VC"). \

       withColumnRenamed("_7", "PADDING")

  df.write.mode("overwrite").saveAsTable("pycharm.ABCD")

  df.printSchema()

  df.explain()

  df.createOrReplaceTempView("tmp")

  sqltext = f"""

    INSERT INTO TABLE {fullyQualifiedTableName}

    SELECT

            ID

          , CLUSTERED

          , SCATTERED

          , RANDOMISED

          , RANDOM_STRING

          , SMALL_VC

          , PADDING

    FROM tmp

    """

  spark.sql(sqltext)

  spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False)

  ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nFinished at");usedFunctions.println(lst)





On Fri, 11 Dec 2020 at 18:04, Sofia’s World <[hidden email]> wrote:
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers

how bout this.. does this work fine?
list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))

If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there

If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working..
Could you try that as well?
kr


On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh <[hidden email]> wrote:
Sorry, part of the code is not that visible

rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh <[hidden email]> wrote:
Thanks Sean,

This is the code

numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!
#
## Check if table exist otherwise create it


rows = 0
sqltext = ""
if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]
print ("number of rows is ",rows)
else:
print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")
sqltext = """
CREATE TABLE {DB}.{tableName}(
ID INT
, CLUSTERED INT
, SCATTERED INT
, RANDOMISED INT
, RANDOM_STRING VARCHAR(50)
, SMALL_VC VARCHAR(50)
, PADDING VARCHAR(4000)
)
STORED AS PARQUET
"""
spark.sql(sqltext)

start = 0
if (rows == 0):
start = 1
else:
maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]
start = maxID + 1
end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class
print(numRows)
print(uf.clustered(200,numRows))
rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \
uf.scattered(x,10000), \
uf.randomised(x,10000), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))
df = rdd.toDF(). \
withColumnRenamed("_1","ID"). \
withColumnRenamed("_2", "CLUSTERED"). \
withColumnRenamed("_3", "SCATTERED"). \
withColumnRenamed("_4", "RANDOMISED"). \
withColumnRenamed("_5", "RANDOM_STRING"). \
withColumnRenamed("_6", "SMALL_VC"). \
withColumnRenamed("_7", "PADDING")

And this is the run with error


Started at

11/12/2020 14:42:45.45

number of rows is  4500000

starting at ID =  4500001 ,ending on =  4600000

100000

0.00199

20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream

    vs = list(itertools.islice(iterator, batch))

  File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft

    yield next(iterator)

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper

    return f(*args, **kwargs)

  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>

    map(lambda x: (x, uf.clustered(x, numRows), \

NameError: name 'numRows' is not defined


Regards,

Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:47, Sean Owen <[hidden email]> wrote:
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.

On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This used to work but not anymore.

I have UsedFunctions.py file that has these functions

import random
import string
import math

def randomString(length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str

def clustered(x,numRows):
return math.floor(x -1)/numRows

def scattered(x,numRows):
return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
return result_str

def padSingleChar(chars,length):
result_str = ''.join(chars for i in range(length))
return result_str

def println(lst):
for ll in lst:
print(ll[0])
Now in the main().py module I import this file as follows:

import UsedFunctions as uf

Then I try the following

import UsedFunctions as uf

 numRows = 100000   ## do in increment of 100K rows
 rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))
The problem is that now it throws error for numRows as below


  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>
    map(lambda x: (x, uf.clustered(x, numRows), \
NameError: name 'numRows' is not defined

I don't know why this error is coming!

Appreciate any ideas

Thanks,

Mich



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Using Lambda function to generate random data in PySpark throws not defined error

Mich Talebzadeh
thanks Marco.

When I stripped down spark etc and ran your map, it came back OK (no errors) WITHOUT global numRows

However, with full code, this is the unresolved reference notification I am getting as attached embedded your code WITHOUT global numRows

regards,


Mich

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sat, 12 Dec 2020 at 21:48, Sofia’s World <[hidden email]> wrote:
Hi Mich
 i dont think it's a good idea...  I believe your IDE is playing tricks on you.
Take spark out of the equation.... this is a python issue only.
i am  guessing your IDE is somehow messing up your environment.

if you take out the whole spark code and replace it by this code

map(lambda x: (x, uf.clustered(x,numRows), \
                           uf.scattered(x,numRows), \
                           uf.randomised(x, numRows), \
                           uf.randomString(50), \
                           uf.padString(x," ",50), \
                           uf.padSingleChar("x",4000)), [1,2,3,4,5])

you should get exactly the same error...

Send me a zip with the tfconstants,py and a trimmed donw version of your main,py and i'll plug it in my IDE and see if i can reproduce
It worked fine in  Jupyter, but then i have all functins in same notebook
hth
 marco
















On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh <[hidden email]> wrote:
I solved the issue of variable numRows within the lambda function not defined by defining it as a Global variable

global numRows
numRows = 10 ## do in increment of 50K rows otherwise you blow up driver memory!
#

Then I could call it within the lambda function as follows


rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x,numRows), \
uf.scattered(x,numRows), \
uf.randomised(x, numRows), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))

This then worked. I am not convinced this is *the correct* solution but somehow it worked.


Thanks


Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 18:52, Mich Talebzadeh <[hidden email]> wrote:
many thanks KR.

If i call the clusterted function on its own it works

numRows = 100000  
print(uf.clustered(200,numRows))
and returns 

0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached 

However, in PyCharm, I do the following

UsedFunctions.py. Note that this file only contains functions and no class

import logging
import random
import string
import math

def randomString(length):
    letters = string.ascii_letters
    result_str = ''.join(random.choice(letters) for i in range(length))
    return result_str

def clustered(x,numRows):
    return math.floor(x -1)/numRows

def scattered(x,numRows):
    return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
    random.seed(seed)
    return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
    n = int(math.log10(x) + 1)
    result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
    return result_str

def padSingleChar(chars,length):
    result_str = ''.join(chars for i in range(length))
    return result_str

def println(lst):
    for ll in lst:
      print(ll[0])

In the main.py(PyCharm)  I have this code which is failing

from pyspark import SparkContext, SparkConf

from pyspark.sql import SQLContext

from pyspark.sql import HiveContext

from pyspark.sql import SparkSession

from pyspark.sql import Row

from pyspark.sql.types import StringType, ArrayType

from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \

    add_months

from datetime import datetime, timedelta

import os

from os.path import join, abspath

from typing import Optional

import logging

import random

import string

import math

import mathOperations as mo

import UsedFunctions as uf

##import test_oracle as to


class main:

  rec = {}

  settings = [

                ("hive.exec.dynamic.partition", "true"),

                ("hive.exec.dynamic.partition.mode", "nonstrict"),

                ("spark.sql.orc.filterPushdown", "true"),

                ("hive.msck.path.validation", "ignore"),

                ("spark.sql.caseSensitive", "true"),

                ("spark.speculation", "false"),

                ("hive.metastore.authorization.storage.checks", "false"),

                ("hive.metastore.client.connect.retry.delay", "5s"),

                ("hive.metastore.client.socket.timeout", "1800s"),

                ("hive.metastore.connect.retries", "12"),

                ("hive.metastore.execute.setugi", "false"),

                ("hive.metastore.failure.retries", "12"),

                ("hive.metastore.schema.verification", "false"),

                ("hive.metastore.schema.verification.record.version", "false"),

                ("hive.metastore.server.max.threads", "100000"),

                ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse")

]

  configs = {"DB":"pycharm",

           "tableName":"randomDataPy"}

  DB = "pycharm"

  tableName = "randomDataPy"

  fullyQualifiedTableName = DB +"."+tableName

  spark = SparkSession.builder \

          .appName("app1") \

          .enableHiveSupport() \

          .getOrCreate()


  spark.sparkContext._conf.setAll(settings)


  sc = SparkContext.getOrCreate()

  print(sc.getConf().getAll())

  sqlContext = SQLContext(sc)

  HiveContext = HiveContext(sc)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nStarted at");uf.println(lst)


  numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!

  #

  ## Check if table exist otherwise create it


  rows = 0

  sqltext  = ""

  if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):

    rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]

    print ("number of rows is ",rows)

  else:

    print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")

    sqltext = """

    CREATE TABLE {DB}.{tableName}(

    ID INT

    , CLUSTERED INT

    , SCATTERED INT

    , RANDOMISED INT

    , RANDOM_STRING VARCHAR(50)

    , SMALL_VC VARCHAR(50)

    , PADDING  VARCHAR(4000)

    )

    STORED AS PARQUET

    """

    spark.sql(sqltext)


  start = 0

  if (rows == 0):

    start = 1

  else:

    maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]

    start = maxID + 1

    end = start + numRows - 1

  print ("starting at ID = ",start, ",ending on = ",end)

  Range = range(start, end+1)

  ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class

  print(numRows)

  print(uf.clustered(200,numRows))

  rdd = sc.parallelize(Range). \

           map(lambda x: (x, uf.clustered(x, numRows), \

                             uf.scattered(x,10000), \

                             uf.randomised(x,10000), \

                             uf.randomString(50), \

                             uf.padString(x," ",50), \

                             uf.padSingleChar("x",4000)))

  df = rdd.toDF(). \

       withColumnRenamed("_1","ID"). \

       withColumnRenamed("_2", "CLUSTERED"). \

       withColumnRenamed("_3", "SCATTERED"). \

       withColumnRenamed("_4", "RANDOMISED"). \

       withColumnRenamed("_5", "RANDOM_STRING"). \

       withColumnRenamed("_6", "SMALL_VC"). \

       withColumnRenamed("_7", "PADDING")

  df.write.mode("overwrite").saveAsTable("pycharm.ABCD")

  df.printSchema()

  df.explain()

  df.createOrReplaceTempView("tmp")

  sqltext = f"""

    INSERT INTO TABLE {fullyQualifiedTableName}

    SELECT

            ID

          , CLUSTERED

          , SCATTERED

          , RANDOMISED

          , RANDOM_STRING

          , SMALL_VC

          , PADDING

    FROM tmp

    """

  spark.sql(sqltext)

  spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False)

  ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nFinished at");usedFunctions.println(lst)





On Fri, 11 Dec 2020 at 18:04, Sofia’s World <[hidden email]> wrote:
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers

how bout this.. does this work fine?
list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))

If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there

If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working..
Could you try that as well?
kr


On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh <[hidden email]> wrote:
Sorry, part of the code is not that visible

rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh <[hidden email]> wrote:
Thanks Sean,

This is the code

numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!
#
## Check if table exist otherwise create it


rows = 0
sqltext = ""
if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]
print ("number of rows is ",rows)
else:
print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")
sqltext = """
CREATE TABLE {DB}.{tableName}(
ID INT
, CLUSTERED INT
, SCATTERED INT
, RANDOMISED INT
, RANDOM_STRING VARCHAR(50)
, SMALL_VC VARCHAR(50)
, PADDING VARCHAR(4000)
)
STORED AS PARQUET
"""
spark.sql(sqltext)

start = 0
if (rows == 0):
start = 1
else:
maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]
start = maxID + 1
end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class
print(numRows)
print(uf.clustered(200,numRows))
rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \
uf.scattered(x,10000), \
uf.randomised(x,10000), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))
df = rdd.toDF(). \
withColumnRenamed("_1","ID"). \
withColumnRenamed("_2", "CLUSTERED"). \
withColumnRenamed("_3", "SCATTERED"). \
withColumnRenamed("_4", "RANDOMISED"). \
withColumnRenamed("_5", "RANDOM_STRING"). \
withColumnRenamed("_6", "SMALL_VC"). \
withColumnRenamed("_7", "PADDING")

And this is the run with error


Started at

11/12/2020 14:42:45.45

number of rows is  4500000

starting at ID =  4500001 ,ending on =  4600000

100000

0.00199

20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream

    vs = list(itertools.islice(iterator, batch))

  File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft

    yield next(iterator)

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper

    return f(*args, **kwargs)

  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>

    map(lambda x: (x, uf.clustered(x, numRows), \

NameError: name 'numRows' is not defined


Regards,

Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:47, Sean Owen <[hidden email]> wrote:
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.

On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This used to work but not anymore.

I have UsedFunctions.py file that has these functions

import random
import string
import math

def randomString(length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str

def clustered(x,numRows):
return math.floor(x -1)/numRows

def scattered(x,numRows):
return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
return result_str

def padSingleChar(chars,length):
result_str = ''.join(chars for i in range(length))
return result_str

def println(lst):
for ll in lst:
print(ll[0])
Now in the main().py module I import this file as follows:

import UsedFunctions as uf

Then I try the following

import UsedFunctions as uf

 numRows = 100000   ## do in increment of 100K rows
 rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))
The problem is that now it throws error for numRows as below


  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>
    map(lambda x: (x, uf.clustered(x, numRows), \
NameError: name 'numRows' is not defined

I don't know why this error is coming!

Appreciate any ideas

Thanks,

Mich



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

unresoved_reference.jpg (2M) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Using Lambda function to generate random data in PySpark throws not defined error

Marco Mistroni
Sure Mich...uhm...let me try to run your code in my IDE. .. I m intrigued by the error..
Will report back either if I find something or not.
Kind regards

On Sun, Dec 13, 2020, 9:46 AM Mich Talebzadeh <[hidden email]> wrote:
thanks Marco.

When I stripped down spark etc and ran your map, it came back OK (no errors) WITHOUT global numRows

However, with full code, this is the unresolved reference notification I am getting as attached embedded your code WITHOUT global numRows

regards,


Mich

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sat, 12 Dec 2020 at 21:48, Sofia’s World <[hidden email]> wrote:
Hi Mich
 i dont think it's a good idea...  I believe your IDE is playing tricks on you.
Take spark out of the equation.... this is a python issue only.
i am  guessing your IDE is somehow messing up your environment.

if you take out the whole spark code and replace it by this code

map(lambda x: (x, uf.clustered(x,numRows), \
                           uf.scattered(x,numRows), \
                           uf.randomised(x, numRows), \
                           uf.randomString(50), \
                           uf.padString(x," ",50), \
                           uf.padSingleChar("x",4000)), [1,2,3,4,5])

you should get exactly the same error...

Send me a zip with the tfconstants,py and a trimmed donw version of your main,py and i'll plug it in my IDE and see if i can reproduce
It worked fine in  Jupyter, but then i have all functins in same notebook
hth
 marco
















On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh <[hidden email]> wrote:
I solved the issue of variable numRows within the lambda function not defined by defining it as a Global variable

global numRows
numRows = 10 ## do in increment of 50K rows otherwise you blow up driver memory!
#

Then I could call it within the lambda function as follows


rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x,numRows), \
uf.scattered(x,numRows), \
uf.randomised(x, numRows), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))

This then worked. I am not convinced this is *the correct* solution but somehow it worked.


Thanks


Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 18:52, Mich Talebzadeh <[hidden email]> wrote:
many thanks KR.

If i call the clusterted function on its own it works

numRows = 100000  
print(uf.clustered(200,numRows))
and returns 

0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached 

However, in PyCharm, I do the following

UsedFunctions.py. Note that this file only contains functions and no class

import logging
import random
import string
import math

def randomString(length):
    letters = string.ascii_letters
    result_str = ''.join(random.choice(letters) for i in range(length))
    return result_str

def clustered(x,numRows):
    return math.floor(x -1)/numRows

def scattered(x,numRows):
    return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
    random.seed(seed)
    return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
    n = int(math.log10(x) + 1)
    result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
    return result_str

def padSingleChar(chars,length):
    result_str = ''.join(chars for i in range(length))
    return result_str

def println(lst):
    for ll in lst:
      print(ll[0])

In the main.py(PyCharm)  I have this code which is failing

from pyspark import SparkContext, SparkConf

from pyspark.sql import SQLContext

from pyspark.sql import HiveContext

from pyspark.sql import SparkSession

from pyspark.sql import Row

from pyspark.sql.types import StringType, ArrayType

from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \

    add_months

from datetime import datetime, timedelta

import os

from os.path import join, abspath

from typing import Optional

import logging

import random

import string

import math

import mathOperations as mo

import UsedFunctions as uf

##import test_oracle as to


class main:

  rec = {}

  settings = [

                ("hive.exec.dynamic.partition", "true"),

                ("hive.exec.dynamic.partition.mode", "nonstrict"),

                ("spark.sql.orc.filterPushdown", "true"),

                ("hive.msck.path.validation", "ignore"),

                ("spark.sql.caseSensitive", "true"),

                ("spark.speculation", "false"),

                ("hive.metastore.authorization.storage.checks", "false"),

                ("hive.metastore.client.connect.retry.delay", "5s"),

                ("hive.metastore.client.socket.timeout", "1800s"),

                ("hive.metastore.connect.retries", "12"),

                ("hive.metastore.execute.setugi", "false"),

                ("hive.metastore.failure.retries", "12"),

                ("hive.metastore.schema.verification", "false"),

                ("hive.metastore.schema.verification.record.version", "false"),

                ("hive.metastore.server.max.threads", "100000"),

                ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse")

]

  configs = {"DB":"pycharm",

           "tableName":"randomDataPy"}

  DB = "pycharm"

  tableName = "randomDataPy"

  fullyQualifiedTableName = DB +"."+tableName

  spark = SparkSession.builder \

          .appName("app1") \

          .enableHiveSupport() \

          .getOrCreate()


  spark.sparkContext._conf.setAll(settings)


  sc = SparkContext.getOrCreate()

  print(sc.getConf().getAll())

  sqlContext = SQLContext(sc)

  HiveContext = HiveContext(sc)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nStarted at");uf.println(lst)


  numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!

  #

  ## Check if table exist otherwise create it


  rows = 0

  sqltext  = ""

  if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):

    rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]

    print ("number of rows is ",rows)

  else:

    print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")

    sqltext = """

    CREATE TABLE {DB}.{tableName}(

    ID INT

    , CLUSTERED INT

    , SCATTERED INT

    , RANDOMISED INT

    , RANDOM_STRING VARCHAR(50)

    , SMALL_VC VARCHAR(50)

    , PADDING  VARCHAR(4000)

    )

    STORED AS PARQUET

    """

    spark.sql(sqltext)


  start = 0

  if (rows == 0):

    start = 1

  else:

    maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]

    start = maxID + 1

    end = start + numRows - 1

  print ("starting at ID = ",start, ",ending on = ",end)

  Range = range(start, end+1)

  ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class

  print(numRows)

  print(uf.clustered(200,numRows))

  rdd = sc.parallelize(Range). \

           map(lambda x: (x, uf.clustered(x, numRows), \

                             uf.scattered(x,10000), \

                             uf.randomised(x,10000), \

                             uf.randomString(50), \

                             uf.padString(x," ",50), \

                             uf.padSingleChar("x",4000)))

  df = rdd.toDF(). \

       withColumnRenamed("_1","ID"). \

       withColumnRenamed("_2", "CLUSTERED"). \

       withColumnRenamed("_3", "SCATTERED"). \

       withColumnRenamed("_4", "RANDOMISED"). \

       withColumnRenamed("_5", "RANDOM_STRING"). \

       withColumnRenamed("_6", "SMALL_VC"). \

       withColumnRenamed("_7", "PADDING")

  df.write.mode("overwrite").saveAsTable("pycharm.ABCD")

  df.printSchema()

  df.explain()

  df.createOrReplaceTempView("tmp")

  sqltext = f"""

    INSERT INTO TABLE {fullyQualifiedTableName}

    SELECT

            ID

          , CLUSTERED

          , SCATTERED

          , RANDOMISED

          , RANDOM_STRING

          , SMALL_VC

          , PADDING

    FROM tmp

    """

  spark.sql(sqltext)

  spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False)

  ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nFinished at");usedFunctions.println(lst)





On Fri, 11 Dec 2020 at 18:04, Sofia’s World <[hidden email]> wrote:
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers

how bout this.. does this work fine?
list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))

If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there

If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working..
Could you try that as well?
kr


On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh <[hidden email]> wrote:
Sorry, part of the code is not that visible

rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh <[hidden email]> wrote:
Thanks Sean,

This is the code

numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!
#
## Check if table exist otherwise create it


rows = 0
sqltext = ""
if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]
print ("number of rows is ",rows)
else:
print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")
sqltext = """
CREATE TABLE {DB}.{tableName}(
ID INT
, CLUSTERED INT
, SCATTERED INT
, RANDOMISED INT
, RANDOM_STRING VARCHAR(50)
, SMALL_VC VARCHAR(50)
, PADDING VARCHAR(4000)
)
STORED AS PARQUET
"""
spark.sql(sqltext)

start = 0
if (rows == 0):
start = 1
else:
maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]
start = maxID + 1
end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class
print(numRows)
print(uf.clustered(200,numRows))
rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \
uf.scattered(x,10000), \
uf.randomised(x,10000), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))
df = rdd.toDF(). \
withColumnRenamed("_1","ID"). \
withColumnRenamed("_2", "CLUSTERED"). \
withColumnRenamed("_3", "SCATTERED"). \
withColumnRenamed("_4", "RANDOMISED"). \
withColumnRenamed("_5", "RANDOM_STRING"). \
withColumnRenamed("_6", "SMALL_VC"). \
withColumnRenamed("_7", "PADDING")

And this is the run with error


Started at

11/12/2020 14:42:45.45

number of rows is  4500000

starting at ID =  4500001 ,ending on =  4600000

100000

0.00199

20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream

    vs = list(itertools.islice(iterator, batch))

  File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft

    yield next(iterator)

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper

    return f(*args, **kwargs)

  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>

    map(lambda x: (x, uf.clustered(x, numRows), \

NameError: name 'numRows' is not defined


Regards,

Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:47, Sean Owen <[hidden email]> wrote:
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.

On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This used to work but not anymore.

I have UsedFunctions.py file that has these functions

import random
import string
import math

def randomString(length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str

def clustered(x,numRows):
return math.floor(x -1)/numRows

def scattered(x,numRows):
return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
return result_str

def padSingleChar(chars,length):
result_str = ''.join(chars for i in range(length))
return result_str

def println(lst):
for ll in lst:
print(ll[0])
Now in the main().py module I import this file as follows:

import UsedFunctions as uf

Then I try the following

import UsedFunctions as uf

 numRows = 100000   ## do in increment of 100K rows
 rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))
The problem is that now it throws error for numRows as below


  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>
    map(lambda x: (x, uf.clustered(x, numRows), \
NameError: name 'numRows' is not defined

I don't know why this error is coming!

Appreciate any ideas

Thanks,

Mich



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Using Lambda function to generate random data in PySpark throws not defined error

srowen
In reply to this post by Mich Talebzadeh
I don't believe you'll be able to use globals in a Spark task, as they won't exist on the remote executor machines.

On Sun, Dec 13, 2020 at 3:46 AM Mich Talebzadeh <[hidden email]> wrote:
thanks Marco.

When I stripped down spark etc and ran your map, it came back OK (no errors) WITHOUT global numRows

However, with full code, this is the unresolved reference notification I am getting as attached embedded your code WITHOUT global numRows

regards,


Mich

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sat, 12 Dec 2020 at 21:48, Sofia’s World <[hidden email]> wrote:
Hi Mich
 i dont think it's a good idea...  I believe your IDE is playing tricks on you.
Take spark out of the equation.... this is a python issue only.
i am  guessing your IDE is somehow messing up your environment.

if you take out the whole spark code and replace it by this code

map(lambda x: (x, uf.clustered(x,numRows), \
                           uf.scattered(x,numRows), \
                           uf.randomised(x, numRows), \
                           uf.randomString(50), \
                           uf.padString(x," ",50), \
                           uf.padSingleChar("x",4000)), [1,2,3,4,5])

you should get exactly the same error...

Send me a zip with the tfconstants,py and a trimmed donw version of your main,py and i'll plug it in my IDE and see if i can reproduce
It worked fine in  Jupyter, but then i have all functins in same notebook
hth
 marco
















On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh <[hidden email]> wrote:
I solved the issue of variable numRows within the lambda function not defined by defining it as a Global variable

global numRows
numRows = 10 ## do in increment of 50K rows otherwise you blow up driver memory!
#

Then I could call it within the lambda function as follows


rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x,numRows), \
uf.scattered(x,numRows), \
uf.randomised(x, numRows), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))

This then worked. I am not convinced this is *the correct* solution but somehow it worked.


Thanks


Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 18:52, Mich Talebzadeh <[hidden email]> wrote:
many thanks KR.

If i call the clusterted function on its own it works

numRows = 100000  
print(uf.clustered(200,numRows))
and returns 

0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached 

However, in PyCharm, I do the following

UsedFunctions.py. Note that this file only contains functions and no class

import logging
import random
import string
import math

def randomString(length):
    letters = string.ascii_letters
    result_str = ''.join(random.choice(letters) for i in range(length))
    return result_str

def clustered(x,numRows):
    return math.floor(x -1)/numRows

def scattered(x,numRows):
    return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
    random.seed(seed)
    return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
    n = int(math.log10(x) + 1)
    result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
    return result_str

def padSingleChar(chars,length):
    result_str = ''.join(chars for i in range(length))
    return result_str

def println(lst):
    for ll in lst:
      print(ll[0])

In the main.py(PyCharm)  I have this code which is failing

from pyspark import SparkContext, SparkConf

from pyspark.sql import SQLContext

from pyspark.sql import HiveContext

from pyspark.sql import SparkSession

from pyspark.sql import Row

from pyspark.sql.types import StringType, ArrayType

from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \

    add_months

from datetime import datetime, timedelta

import os

from os.path import join, abspath

from typing import Optional

import logging

import random

import string

import math

import mathOperations as mo

import UsedFunctions as uf

##import test_oracle as to


class main:

  rec = {}

  settings = [

                ("hive.exec.dynamic.partition", "true"),

                ("hive.exec.dynamic.partition.mode", "nonstrict"),

                ("spark.sql.orc.filterPushdown", "true"),

                ("hive.msck.path.validation", "ignore"),

                ("spark.sql.caseSensitive", "true"),

                ("spark.speculation", "false"),

                ("hive.metastore.authorization.storage.checks", "false"),

                ("hive.metastore.client.connect.retry.delay", "5s"),

                ("hive.metastore.client.socket.timeout", "1800s"),

                ("hive.metastore.connect.retries", "12"),

                ("hive.metastore.execute.setugi", "false"),

                ("hive.metastore.failure.retries", "12"),

                ("hive.metastore.schema.verification", "false"),

                ("hive.metastore.schema.verification.record.version", "false"),

                ("hive.metastore.server.max.threads", "100000"),

                ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse")

]

  configs = {"DB":"pycharm",

           "tableName":"randomDataPy"}

  DB = "pycharm"

  tableName = "randomDataPy"

  fullyQualifiedTableName = DB +"."+tableName

  spark = SparkSession.builder \

          .appName("app1") \

          .enableHiveSupport() \

          .getOrCreate()


  spark.sparkContext._conf.setAll(settings)


  sc = SparkContext.getOrCreate()

  print(sc.getConf().getAll())

  sqlContext = SQLContext(sc)

  HiveContext = HiveContext(sc)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nStarted at");uf.println(lst)


  numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!

  #

  ## Check if table exist otherwise create it


  rows = 0

  sqltext  = ""

  if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):

    rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]

    print ("number of rows is ",rows)

  else:

    print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")

    sqltext = """

    CREATE TABLE {DB}.{tableName}(

    ID INT

    , CLUSTERED INT

    , SCATTERED INT

    , RANDOMISED INT

    , RANDOM_STRING VARCHAR(50)

    , SMALL_VC VARCHAR(50)

    , PADDING  VARCHAR(4000)

    )

    STORED AS PARQUET

    """

    spark.sql(sqltext)


  start = 0

  if (rows == 0):

    start = 1

  else:

    maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]

    start = maxID + 1

    end = start + numRows - 1

  print ("starting at ID = ",start, ",ending on = ",end)

  Range = range(start, end+1)

  ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class

  print(numRows)

  print(uf.clustered(200,numRows))

  rdd = sc.parallelize(Range). \

           map(lambda x: (x, uf.clustered(x, numRows), \

                             uf.scattered(x,10000), \

                             uf.randomised(x,10000), \

                             uf.randomString(50), \

                             uf.padString(x," ",50), \

                             uf.padSingleChar("x",4000)))

  df = rdd.toDF(). \

       withColumnRenamed("_1","ID"). \

       withColumnRenamed("_2", "CLUSTERED"). \

       withColumnRenamed("_3", "SCATTERED"). \

       withColumnRenamed("_4", "RANDOMISED"). \

       withColumnRenamed("_5", "RANDOM_STRING"). \

       withColumnRenamed("_6", "SMALL_VC"). \

       withColumnRenamed("_7", "PADDING")

  df.write.mode("overwrite").saveAsTable("pycharm.ABCD")

  df.printSchema()

  df.explain()

  df.createOrReplaceTempView("tmp")

  sqltext = f"""

    INSERT INTO TABLE {fullyQualifiedTableName}

    SELECT

            ID

          , CLUSTERED

          , SCATTERED

          , RANDOMISED

          , RANDOM_STRING

          , SMALL_VC

          , PADDING

    FROM tmp

    """

  spark.sql(sqltext)

  spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False)

  ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nFinished at");usedFunctions.println(lst)





On Fri, 11 Dec 2020 at 18:04, Sofia’s World <[hidden email]> wrote:
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers

how bout this.. does this work fine?
list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))

If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there

If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working..
Could you try that as well?
kr


On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh <[hidden email]> wrote:
Sorry, part of the code is not that visible

rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh <[hidden email]> wrote:
Thanks Sean,

This is the code

numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!
#
## Check if table exist otherwise create it


rows = 0
sqltext = ""
if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]
print ("number of rows is ",rows)
else:
print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")
sqltext = """
CREATE TABLE {DB}.{tableName}(
ID INT
, CLUSTERED INT
, SCATTERED INT
, RANDOMISED INT
, RANDOM_STRING VARCHAR(50)
, SMALL_VC VARCHAR(50)
, PADDING VARCHAR(4000)
)
STORED AS PARQUET
"""
spark.sql(sqltext)

start = 0
if (rows == 0):
start = 1
else:
maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]
start = maxID + 1
end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class
print(numRows)
print(uf.clustered(200,numRows))
rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \
uf.scattered(x,10000), \
uf.randomised(x,10000), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))
df = rdd.toDF(). \
withColumnRenamed("_1","ID"). \
withColumnRenamed("_2", "CLUSTERED"). \
withColumnRenamed("_3", "SCATTERED"). \
withColumnRenamed("_4", "RANDOMISED"). \
withColumnRenamed("_5", "RANDOM_STRING"). \
withColumnRenamed("_6", "SMALL_VC"). \
withColumnRenamed("_7", "PADDING")

And this is the run with error


Started at

11/12/2020 14:42:45.45

number of rows is  4500000

starting at ID =  4500001 ,ending on =  4600000

100000

0.00199

20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream

    vs = list(itertools.islice(iterator, batch))

  File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft

    yield next(iterator)

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper

    return f(*args, **kwargs)

  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>

    map(lambda x: (x, uf.clustered(x, numRows), \

NameError: name 'numRows' is not defined


Regards,

Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:47, Sean Owen <[hidden email]> wrote:
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.

On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This used to work but not anymore.

I have UsedFunctions.py file that has these functions

import random
import string
import math

def randomString(length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str

def clustered(x,numRows):
return math.floor(x -1)/numRows

def scattered(x,numRows):
return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
return result_str

def padSingleChar(chars,length):
result_str = ''.join(chars for i in range(length))
return result_str

def println(lst):
for ll in lst:
print(ll[0])
Now in the main().py module I import this file as follows:

import UsedFunctions as uf

Then I try the following

import UsedFunctions as uf

 numRows = 100000   ## do in increment of 100K rows
 rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))
The problem is that now it throws error for numRows as below


  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>
    map(lambda x: (x, uf.clustered(x, numRows), \
NameError: name 'numRows' is not defined

I don't know why this error is coming!

Appreciate any ideas

Thanks,

Mich



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Using Lambda function to generate random data in PySpark throws not defined error

Mich Talebzadeh

Thanks all.

Found out the problem :(

I defined the runner.py as

class main()

I replaced it with

def main():

and it worked without declaring numRows as global.

I am still wondering the reason for it working with def main()?


regards,

Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 13 Dec 2020 at 15:10, Sean Owen <[hidden email]> wrote:
I don't believe you'll be able to use globals in a Spark task, as they won't exist on the remote executor machines.

On Sun, Dec 13, 2020 at 3:46 AM Mich Talebzadeh <[hidden email]> wrote:
thanks Marco.

When I stripped down spark etc and ran your map, it came back OK (no errors) WITHOUT global numRows

However, with full code, this is the unresolved reference notification I am getting as attached embedded your code WITHOUT global numRows

regards,


Mich

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sat, 12 Dec 2020 at 21:48, Sofia’s World <[hidden email]> wrote:
Hi Mich
 i dont think it's a good idea...  I believe your IDE is playing tricks on you.
Take spark out of the equation.... this is a python issue only.
i am  guessing your IDE is somehow messing up your environment.

if you take out the whole spark code and replace it by this code

map(lambda x: (x, uf.clustered(x,numRows), \
                           uf.scattered(x,numRows), \
                           uf.randomised(x, numRows), \
                           uf.randomString(50), \
                           uf.padString(x," ",50), \
                           uf.padSingleChar("x",4000)), [1,2,3,4,5])

you should get exactly the same error...

Send me a zip with the tfconstants,py and a trimmed donw version of your main,py and i'll plug it in my IDE and see if i can reproduce
It worked fine in  Jupyter, but then i have all functins in same notebook
hth
 marco
















On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh <[hidden email]> wrote:
I solved the issue of variable numRows within the lambda function not defined by defining it as a Global variable

global numRows
numRows = 10 ## do in increment of 50K rows otherwise you blow up driver memory!
#

Then I could call it within the lambda function as follows


rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x,numRows), \
uf.scattered(x,numRows), \
uf.randomised(x, numRows), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))

This then worked. I am not convinced this is *the correct* solution but somehow it worked.


Thanks


Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 18:52, Mich Talebzadeh <[hidden email]> wrote:
many thanks KR.

If i call the clusterted function on its own it works

numRows = 100000  
print(uf.clustered(200,numRows))
and returns 

0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached 

However, in PyCharm, I do the following

UsedFunctions.py. Note that this file only contains functions and no class

import logging
import random
import string
import math

def randomString(length):
    letters = string.ascii_letters
    result_str = ''.join(random.choice(letters) for i in range(length))
    return result_str

def clustered(x,numRows):
    return math.floor(x -1)/numRows

def scattered(x,numRows):
    return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
    random.seed(seed)
    return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
    n = int(math.log10(x) + 1)
    result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
    return result_str

def padSingleChar(chars,length):
    result_str = ''.join(chars for i in range(length))
    return result_str

def println(lst):
    for ll in lst:
      print(ll[0])

In the main.py(PyCharm)  I have this code which is failing

from pyspark import SparkContext, SparkConf

from pyspark.sql import SQLContext

from pyspark.sql import HiveContext

from pyspark.sql import SparkSession

from pyspark.sql import Row

from pyspark.sql.types import StringType, ArrayType

from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \

    add_months

from datetime import datetime, timedelta

import os

from os.path import join, abspath

from typing import Optional

import logging

import random

import string

import math

import mathOperations as mo

import UsedFunctions as uf

##import test_oracle as to


class main:

  rec = {}

  settings = [

                ("hive.exec.dynamic.partition", "true"),

                ("hive.exec.dynamic.partition.mode", "nonstrict"),

                ("spark.sql.orc.filterPushdown", "true"),

                ("hive.msck.path.validation", "ignore"),

                ("spark.sql.caseSensitive", "true"),

                ("spark.speculation", "false"),

                ("hive.metastore.authorization.storage.checks", "false"),

                ("hive.metastore.client.connect.retry.delay", "5s"),

                ("hive.metastore.client.socket.timeout", "1800s"),

                ("hive.metastore.connect.retries", "12"),

                ("hive.metastore.execute.setugi", "false"),

                ("hive.metastore.failure.retries", "12"),

                ("hive.metastore.schema.verification", "false"),

                ("hive.metastore.schema.verification.record.version", "false"),

                ("hive.metastore.server.max.threads", "100000"),

                ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse")

]

  configs = {"DB":"pycharm",

           "tableName":"randomDataPy"}

  DB = "pycharm"

  tableName = "randomDataPy"

  fullyQualifiedTableName = DB +"."+tableName

  spark = SparkSession.builder \

          .appName("app1") \

          .enableHiveSupport() \

          .getOrCreate()


  spark.sparkContext._conf.setAll(settings)


  sc = SparkContext.getOrCreate()

  print(sc.getConf().getAll())

  sqlContext = SQLContext(sc)

  HiveContext = HiveContext(sc)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nStarted at");uf.println(lst)


  numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!

  #

  ## Check if table exist otherwise create it


  rows = 0

  sqltext  = ""

  if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):

    rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]

    print ("number of rows is ",rows)

  else:

    print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")

    sqltext = """

    CREATE TABLE {DB}.{tableName}(

    ID INT

    , CLUSTERED INT

    , SCATTERED INT

    , RANDOMISED INT

    , RANDOM_STRING VARCHAR(50)

    , SMALL_VC VARCHAR(50)

    , PADDING  VARCHAR(4000)

    )

    STORED AS PARQUET

    """

    spark.sql(sqltext)


  start = 0

  if (rows == 0):

    start = 1

  else:

    maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]

    start = maxID + 1

    end = start + numRows - 1

  print ("starting at ID = ",start, ",ending on = ",end)

  Range = range(start, end+1)

  ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class

  print(numRows)

  print(uf.clustered(200,numRows))

  rdd = sc.parallelize(Range). \

           map(lambda x: (x, uf.clustered(x, numRows), \

                             uf.scattered(x,10000), \

                             uf.randomised(x,10000), \

                             uf.randomString(50), \

                             uf.padString(x," ",50), \

                             uf.padSingleChar("x",4000)))

  df = rdd.toDF(). \

       withColumnRenamed("_1","ID"). \

       withColumnRenamed("_2", "CLUSTERED"). \

       withColumnRenamed("_3", "SCATTERED"). \

       withColumnRenamed("_4", "RANDOMISED"). \

       withColumnRenamed("_5", "RANDOM_STRING"). \

       withColumnRenamed("_6", "SMALL_VC"). \

       withColumnRenamed("_7", "PADDING")

  df.write.mode("overwrite").saveAsTable("pycharm.ABCD")

  df.printSchema()

  df.explain()

  df.createOrReplaceTempView("tmp")

  sqltext = f"""

    INSERT INTO TABLE {fullyQualifiedTableName}

    SELECT

            ID

          , CLUSTERED

          , SCATTERED

          , RANDOMISED

          , RANDOM_STRING

          , SMALL_VC

          , PADDING

    FROM tmp

    """

  spark.sql(sqltext)

  spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False)

  ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nFinished at");usedFunctions.println(lst)





On Fri, 11 Dec 2020 at 18:04, Sofia’s World <[hidden email]> wrote:
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers

how bout this.. does this work fine?
list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))

If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there

If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working..
Could you try that as well?
kr


On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh <[hidden email]> wrote:
Sorry, part of the code is not that visible

rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh <[hidden email]> wrote:
Thanks Sean,

This is the code

numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!
#
## Check if table exist otherwise create it


rows = 0
sqltext = ""
if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]
print ("number of rows is ",rows)
else:
print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")
sqltext = """
CREATE TABLE {DB}.{tableName}(
ID INT
, CLUSTERED INT
, SCATTERED INT
, RANDOMISED INT
, RANDOM_STRING VARCHAR(50)
, SMALL_VC VARCHAR(50)
, PADDING VARCHAR(4000)
)
STORED AS PARQUET
"""
spark.sql(sqltext)

start = 0
if (rows == 0):
start = 1
else:
maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]
start = maxID + 1
end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class
print(numRows)
print(uf.clustered(200,numRows))
rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \
uf.scattered(x,10000), \
uf.randomised(x,10000), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))
df = rdd.toDF(). \
withColumnRenamed("_1","ID"). \
withColumnRenamed("_2", "CLUSTERED"). \
withColumnRenamed("_3", "SCATTERED"). \
withColumnRenamed("_4", "RANDOMISED"). \
withColumnRenamed("_5", "RANDOM_STRING"). \
withColumnRenamed("_6", "SMALL_VC"). \
withColumnRenamed("_7", "PADDING")

And this is the run with error


Started at

11/12/2020 14:42:45.45

number of rows is  4500000

starting at ID =  4500001 ,ending on =  4600000

100000

0.00199

20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream

    vs = list(itertools.islice(iterator, batch))

  File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft

    yield next(iterator)

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper

    return f(*args, **kwargs)

  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>

    map(lambda x: (x, uf.clustered(x, numRows), \

NameError: name 'numRows' is not defined


Regards,

Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:47, Sean Owen <[hidden email]> wrote:
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.

On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This used to work but not anymore.

I have UsedFunctions.py file that has these functions

import random
import string
import math

def randomString(length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str

def clustered(x,numRows):
return math.floor(x -1)/numRows

def scattered(x,numRows):
return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
return result_str

def padSingleChar(chars,length):
result_str = ''.join(chars for i in range(length))
return result_str

def println(lst):
for ll in lst:
print(ll[0])
Now in the main().py module I import this file as follows:

import UsedFunctions as uf

Then I try the following

import UsedFunctions as uf

 numRows = 100000   ## do in increment of 100K rows
 rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))
The problem is that now it throws error for numRows as below


  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>
    map(lambda x: (x, uf.clustered(x, numRows), \
NameError: name 'numRows' is not defined

I don't know why this error is coming!

Appreciate any ideas

Thanks,

Mich



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Using Lambda function to generate random data in PySpark throws not defined error

Marco Mistroni
Hey Mich
 glad to know u got to the bottom
In python, if you want to run a module - same as if you would use Java/Scala  -you will have to define a def main()  method
You'll notice that the snippet i sent you had this syntax - 
if __name__ == "main":
   main()

I am guessing you just choose an  unfortunate name for your class. Had you called it    

class pincopallino:
       .....

Your IDE would not have called it because it could not find  a main method, and then you would have been on the right track
I am guessing your main() class somehow confused your IDE.

The best way to run your spark code would be via a unit test though.... the code below might give you a head start - (you'll need to configure your IDE for this though..)

have fun
kr
 marco

import logging
from pyspark.sql import SparkSession
from pyspark import HiveContext
from pyspark import SparkConf
from pyspark import SparkContext
import pyspark
from pyspark.sql import SparkSession
import pytest
import shutil

def spark_session():
return SparkSession.builder \
.master('local[1]') \
.appName('SparkByExamples.com') \
.getOrCreate()


def test_create_table(spark_session):
df = spark_session.createDataFrame([['one', 'two']]).toDF(*['first', 'second'])
print(df.show())

df2 = spark_session.createDataFrame([['one', 'two']]).toDF(*['first', 'second'])

df.createOrReplaceTempView('sample')

assert df.subtract(df2).count() == 0










On Sun, Dec 13, 2020 at 8:43 PM Mich Talebzadeh <[hidden email]> wrote:

Thanks all.

Found out the problem :(

I defined the runner.py as

class main()

I replaced it with

def main():

and it worked without declaring numRows as global.

I am still wondering the reason for it working with def main()?


regards,

Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 13 Dec 2020 at 15:10, Sean Owen <[hidden email]> wrote:
I don't believe you'll be able to use globals in a Spark task, as they won't exist on the remote executor machines.

On Sun, Dec 13, 2020 at 3:46 AM Mich Talebzadeh <[hidden email]> wrote:
thanks Marco.

When I stripped down spark etc and ran your map, it came back OK (no errors) WITHOUT global numRows

However, with full code, this is the unresolved reference notification I am getting as attached embedded your code WITHOUT global numRows

regards,


Mich

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sat, 12 Dec 2020 at 21:48, Sofia’s World <[hidden email]> wrote:
Hi Mich
 i dont think it's a good idea...  I believe your IDE is playing tricks on you.
Take spark out of the equation.... this is a python issue only.
i am  guessing your IDE is somehow messing up your environment.

if you take out the whole spark code and replace it by this code

map(lambda x: (x, uf.clustered(x,numRows), \
                           uf.scattered(x,numRows), \
                           uf.randomised(x, numRows), \
                           uf.randomString(50), \
                           uf.padString(x," ",50), \
                           uf.padSingleChar("x",4000)), [1,2,3,4,5])

you should get exactly the same error...

Send me a zip with the tfconstants,py and a trimmed donw version of your main,py and i'll plug it in my IDE and see if i can reproduce
It worked fine in  Jupyter, but then i have all functins in same notebook
hth
 marco
















On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh <[hidden email]> wrote:
I solved the issue of variable numRows within the lambda function not defined by defining it as a Global variable

global numRows
numRows = 10 ## do in increment of 50K rows otherwise you blow up driver memory!
#

Then I could call it within the lambda function as follows


rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x,numRows), \
uf.scattered(x,numRows), \
uf.randomised(x, numRows), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))

This then worked. I am not convinced this is *the correct* solution but somehow it worked.


Thanks


Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 18:52, Mich Talebzadeh <[hidden email]> wrote:
many thanks KR.

If i call the clusterted function on its own it works

numRows = 100000  
print(uf.clustered(200,numRows))
and returns 

0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached 

However, in PyCharm, I do the following

UsedFunctions.py. Note that this file only contains functions and no class

import logging
import random
import string
import math

def randomString(length):
    letters = string.ascii_letters
    result_str = ''.join(random.choice(letters) for i in range(length))
    return result_str

def clustered(x,numRows):
    return math.floor(x -1)/numRows

def scattered(x,numRows):
    return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
    random.seed(seed)
    return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
    n = int(math.log10(x) + 1)
    result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
    return result_str

def padSingleChar(chars,length):
    result_str = ''.join(chars for i in range(length))
    return result_str

def println(lst):
    for ll in lst:
      print(ll[0])

In the main.py(PyCharm)  I have this code which is failing

from pyspark import SparkContext, SparkConf

from pyspark.sql import SQLContext

from pyspark.sql import HiveContext

from pyspark.sql import SparkSession

from pyspark.sql import Row

from pyspark.sql.types import StringType, ArrayType

from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \

    add_months

from datetime import datetime, timedelta

import os

from os.path import join, abspath

from typing import Optional

import logging

import random

import string

import math

import mathOperations as mo

import UsedFunctions as uf

##import test_oracle as to


class main:

  rec = {}

  settings = [

                ("hive.exec.dynamic.partition", "true"),

                ("hive.exec.dynamic.partition.mode", "nonstrict"),

                ("spark.sql.orc.filterPushdown", "true"),

                ("hive.msck.path.validation", "ignore"),

                ("spark.sql.caseSensitive", "true"),

                ("spark.speculation", "false"),

                ("hive.metastore.authorization.storage.checks", "false"),

                ("hive.metastore.client.connect.retry.delay", "5s"),

                ("hive.metastore.client.socket.timeout", "1800s"),

                ("hive.metastore.connect.retries", "12"),

                ("hive.metastore.execute.setugi", "false"),

                ("hive.metastore.failure.retries", "12"),

                ("hive.metastore.schema.verification", "false"),

                ("hive.metastore.schema.verification.record.version", "false"),

                ("hive.metastore.server.max.threads", "100000"),

                ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse")

]

  configs = {"DB":"pycharm",

           "tableName":"randomDataPy"}

  DB = "pycharm"

  tableName = "randomDataPy"

  fullyQualifiedTableName = DB +"."+tableName

  spark = SparkSession.builder \

          .appName("app1") \

          .enableHiveSupport() \

          .getOrCreate()


  spark.sparkContext._conf.setAll(settings)


  sc = SparkContext.getOrCreate()

  print(sc.getConf().getAll())

  sqlContext = SQLContext(sc)

  HiveContext = HiveContext(sc)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nStarted at");uf.println(lst)


  numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!

  #

  ## Check if table exist otherwise create it


  rows = 0

  sqltext  = ""

  if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):

    rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]

    print ("number of rows is ",rows)

  else:

    print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")

    sqltext = """

    CREATE TABLE {DB}.{tableName}(

    ID INT

    , CLUSTERED INT

    , SCATTERED INT

    , RANDOMISED INT

    , RANDOM_STRING VARCHAR(50)

    , SMALL_VC VARCHAR(50)

    , PADDING  VARCHAR(4000)

    )

    STORED AS PARQUET

    """

    spark.sql(sqltext)


  start = 0

  if (rows == 0):

    start = 1

  else:

    maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]

    start = maxID + 1

    end = start + numRows - 1

  print ("starting at ID = ",start, ",ending on = ",end)

  Range = range(start, end+1)

  ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class

  print(numRows)

  print(uf.clustered(200,numRows))

  rdd = sc.parallelize(Range). \

           map(lambda x: (x, uf.clustered(x, numRows), \

                             uf.scattered(x,10000), \

                             uf.randomised(x,10000), \

                             uf.randomString(50), \

                             uf.padString(x," ",50), \

                             uf.padSingleChar("x",4000)))

  df = rdd.toDF(). \

       withColumnRenamed("_1","ID"). \

       withColumnRenamed("_2", "CLUSTERED"). \

       withColumnRenamed("_3", "SCATTERED"). \

       withColumnRenamed("_4", "RANDOMISED"). \

       withColumnRenamed("_5", "RANDOM_STRING"). \

       withColumnRenamed("_6", "SMALL_VC"). \

       withColumnRenamed("_7", "PADDING")

  df.write.mode("overwrite").saveAsTable("pycharm.ABCD")

  df.printSchema()

  df.explain()

  df.createOrReplaceTempView("tmp")

  sqltext = f"""

    INSERT INTO TABLE {fullyQualifiedTableName}

    SELECT

            ID

          , CLUSTERED

          , SCATTERED

          , RANDOMISED

          , RANDOM_STRING

          , SMALL_VC

          , PADDING

    FROM tmp

    """

  spark.sql(sqltext)

  spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False)

  ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False)

  lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()

  print("\nFinished at");usedFunctions.println(lst)





On Fri, 11 Dec 2020 at 18:04, Sofia’s World <[hidden email]> wrote:
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers

how bout this.. does this work fine?
list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))

If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there

If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working..
Could you try that as well?
kr


On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh <[hidden email]> wrote:
Sorry, part of the code is not that visible

rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:56, Mich Talebzadeh <[hidden email]> wrote:
Thanks Sean,

This is the code

numRows = 100000   ## do in increment of 50K rows otherwise you blow up driver memory!
#
## Check if table exist otherwise create it


rows = 0
sqltext = ""
if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1):
rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0]
print ("number of rows is ",rows)
else:
print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ")
sqltext = """
CREATE TABLE {DB}.{tableName}(
ID INT
, CLUSTERED INT
, SCATTERED INT
, RANDOMISED INT
, RANDOM_STRING VARCHAR(50)
, SMALL_VC VARCHAR(50)
, PADDING VARCHAR(4000)
)
STORED AS PARQUET
"""
spark.sql(sqltext)

start = 0
if (rows == 0):
start = 1
else:
maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0]
start = maxID + 1
end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class
print(numRows)
print(uf.clustered(200,numRows))
rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \
uf.scattered(x,10000), \
uf.randomised(x,10000), \
uf.randomString(50), \
uf.padString(x," ",50), \
uf.padSingleChar("x",4000)))
df = rdd.toDF(). \
withColumnRenamed("_1","ID"). \
withColumnRenamed("_2", "CLUSTERED"). \
withColumnRenamed("_3", "SCATTERED"). \
withColumnRenamed("_4", "RANDOMISED"). \
withColumnRenamed("_5", "RANDOM_STRING"). \
withColumnRenamed("_6", "SMALL_VC"). \
withColumnRenamed("_7", "PADDING")

And this is the run with error


Started at

11/12/2020 14:42:45.45

number of rows is  4500000

starting at ID =  4500001 ,ending on =  4600000

100000

0.00199

20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream

    vs = list(itertools.islice(iterator, batch))

  File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft

    yield next(iterator)

  File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper

    return f(*args, **kwargs)

  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>

    map(lambda x: (x, uf.clustered(x, numRows), \

NameError: name 'numRows' is not defined


Regards,

Mich


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 11 Dec 2020 at 16:47, Sean Owen <[hidden email]> wrote:
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.

On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This used to work but not anymore.

I have UsedFunctions.py file that has these functions

import random
import string
import math

def randomString(length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str

def clustered(x,numRows):
return math.floor(x -1)/numRows

def scattered(x,numRows):
return abs((x -1 % numRows))* 1.0

def randomised(seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0

def padString(x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
return result_str

def padSingleChar(chars,length):
result_str = ''.join(chars for i in range(length))
return result_str

def println(lst):
for ll in lst:
print(ll[0])
Now in the main().py module I import this file as follows:

import UsedFunctions as uf

Then I try the following

import UsedFunctions as uf

 numRows = 100000   ## do in increment of 100K rows
 rdd = sc.parallelize(Range). \
           map(lambda x: (x, uf.clustered(x, numRows), \
                             uf.scattered(x,10000), \
                             uf.randomised(x,10000), \
                             uf.randomString(50), \
                             uf.padString(x," ",50), \
                             uf.padSingleChar("x",4000)))
The problem is that now it throws error for numRows as below


  File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda>
    map(lambda x: (x, uf.clustered(x, numRows), \
NameError: name 'numRows' is not defined

I don't know why this error is coming!

Appreciate any ideas

Thanks,

Mich



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.