|
|
Hi,
This used to work but not anymore.
I have UsedFunctions.py file that has these functions
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0]) Now in the main().py module I import this file as follows:
import UsedFunctions as uf
Then I try the following
import UsedFunctions as uf
numRows = 100000 ## do in increment of 100K rows rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) The problem is that now it throws error for numRows as below
File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
I don't know why this error is coming!
Appreciate any ideas
Thanks,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol. On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh < [hidden email]> wrote: Hi,
This used to work but not anymore.
I have UsedFunctions.py file that has these functions
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0]) Now in the main().py module I import this file as follows:
import UsedFunctions as uf
Then I try the following
import UsedFunctions as uf
numRows = 100000 ## do in increment of 100K rows rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) The problem is that now it throws error for numRows as below
File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
I don't know why this error is coming!
Appreciate any ideas
Thanks,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|
Thanks Sean,
This is the code
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0 sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING")
And this is the run with error
Started at 11/12/2020 14:42:45.45 number of rows is 4500000 starting at ID = 4500001 ,ending on = 4600000 100000 0.00199 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft yield next(iterator) File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper return f(*args, **kwargs) File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
Regards,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.
On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh < [hidden email]> wrote: Hi,
This used to work but not anymore.
I have UsedFunctions.py file that has these functions
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0]) Now in the main().py module I import this file as follows:
import UsedFunctions as uf
Then I try the following
import UsedFunctions as uf
numRows = 100000 ## do in increment of 100K rows rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) The problem is that now it throws error for numRows as below
File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
I don't know why this error is coming!
Appreciate any ideas
Thanks,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|
Sorry, part of the code is not that visible
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)))
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
This is the code
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0 sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING")
And this is the run with error
Started at 11/12/2020 14:42:45.45 number of rows is 4500000 starting at ID = 4500001 ,ending on = 4600000 100000 0.00199 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft yield next(iterator) File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper return f(*args, **kwargs) File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
Regards,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.
On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh < [hidden email]> wrote: Hi,
This used to work but not anymore.
I have UsedFunctions.py file that has these functions
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0]) Now in the main().py module I import this file as follows:
import UsedFunctions as uf
Then I try the following
import UsedFunctions as uf
numRows = 100000 ## do in increment of 100K rows rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) The problem is that now it throws error for numRows as below
File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
I don't know why this error is coming!
Appreciate any ideas
Thanks,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers how bout this.. does this work fine? list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))
If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there
If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working.. Could you try that as well? kr
On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh < [hidden email]> wrote: Sorry, part of the code is not that visible
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)))
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
This is the code
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0 sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING")
And this is the run with error
Started at 11/12/2020 14:42:45.45 number of rows is 4500000 starting at ID = 4500001 ,ending on = 4600000 100000 0.00199 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft yield next(iterator) File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper return f(*args, **kwargs) File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
Regards,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.
On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh < [hidden email]> wrote: Hi,
This used to work but not anymore.
I have UsedFunctions.py file that has these functions
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0]) Now in the main().py module I import this file as follows:
import UsedFunctions as uf
Then I try the following
import UsedFunctions as uf
numRows = 100000 ## do in increment of 100K rows rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) The problem is that now it throws error for numRows as below
File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
I don't know why this error is coming!
Appreciate any ideas
Thanks,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|
many thanks KR.
If i call the clusterted function on its own it works
print(uf.clustered(200,numRows)) and returns
0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached
However, in PyCharm, I do the following
UsedFunctions.py. Note that this file only contains functions and no class
import logging
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0])
In the main.py(PyCharm) I have this code which is failing
from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext from pyspark.sql import HiveContext from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql.types import StringType, ArrayType from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \ add_months from datetime import datetime, timedelta import os from os.path import join, abspath from typing import Optional import logging import random import string import math import mathOperations as mo import UsedFunctions as uf ##import test_oracle as to
class main:
rec = {} settings = [ ("hive.exec.dynamic.partition", "true"), ("hive.exec.dynamic.partition.mode", "nonstrict"), ("spark.sql.orc.filterPushdown", "true"), ("hive.msck.path.validation", "ignore"), ("spark.sql.caseSensitive", "true"), ("spark.speculation", "false"), ("hive.metastore.authorization.storage.checks", "false"), ("hive.metastore.client.connect.retry.delay", "5s"), ("hive.metastore.client.socket.timeout", "1800s"), ("hive.metastore.connect.retries", "12"), ("hive.metastore.execute.setugi", "false"), ("hive.metastore.failure.retries", "12"), ("hive.metastore.schema.verification", "false"), ("hive.metastore.schema.verification.record.version", "false"), ("hive.metastore.server.max.threads", "100000"), ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse") ] configs = {"DB":"pycharm", "tableName":"randomDataPy"} DB = "pycharm" tableName = "randomDataPy" fullyQualifiedTableName = DB +"."+tableName spark = SparkSession.builder \ .appName("app1") \ .enableHiveSupport() \ .getOrCreate()
spark.sparkContext._conf.setAll(settings)
sc = SparkContext.getOrCreate() print(sc.getConf().getAll()) sqlContext = SQLContext(sc) HiveContext = HiveContext(sc) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nStarted at");uf.println(lst)
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0
sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING") df.write.mode("overwrite").saveAsTable("pycharm.ABCD") df.printSchema() df.explain() df.createOrReplaceTempView("tmp") sqltext = f""" INSERT INTO TABLE {fullyQualifiedTableName} SELECT ID , CLUSTERED , SCATTERED , RANDOMISED , RANDOM_STRING , SMALL_VC , PADDING FROM tmp """ spark.sql(sqltext) spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False) ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nFinished at");usedFunctions.println(lst)
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers how bout this.. does this work fine? list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))
If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there
If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working.. Could you try that as well? kr
On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh < [hidden email]> wrote: Sorry, part of the code is not that visible
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)))
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
This is the code
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0 sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING")
And this is the run with error
Started at 11/12/2020 14:42:45.45 number of rows is 4500000 starting at ID = 4500001 ,ending on = 4600000 100000 0.00199 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft yield next(iterator) File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper return f(*args, **kwargs) File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
Regards,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.
On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh < [hidden email]> wrote: Hi,
This used to work but not anymore.
I have UsedFunctions.py file that has these functions
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0]) Now in the main().py module I import this file as follows:
import UsedFunctions as uf
Then I try the following
import UsedFunctions as uf
numRows = 100000 ## do in increment of 100K rows rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) The problem is that now it throws error for numRows as below
File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
I don't know why this error is coming!
Appreciate any ideas
Thanks,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
I solved the issue of variable numRows within the lambda function not defined by defining it as a Global variable
global numRows numRows = 10 ## do in increment of 50K rows otherwise you blow up driver memory! #
Then I could call it within the lambda function as follows
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x,numRows), \ uf.scattered(x,numRows), \ uf.randomised(x, numRows), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) This then worked. I am not convinced this is *the correct* solution but somehow it worked.
Thanks
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
many thanks KR.
If i call the clusterted function on its own it works
print(uf.clustered(200,numRows)) and returns
0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached
However, in PyCharm, I do the following
UsedFunctions.py. Note that this file only contains functions and no class
import logging
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0])
In the main.py(PyCharm) I have this code which is failing
from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext from pyspark.sql import HiveContext from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql.types import StringType, ArrayType from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \ add_months from datetime import datetime, timedelta import os from os.path import join, abspath from typing import Optional import logging import random import string import math import mathOperations as mo import UsedFunctions as uf ##import test_oracle as to
class main:
rec = {} settings = [ ("hive.exec.dynamic.partition", "true"), ("hive.exec.dynamic.partition.mode", "nonstrict"), ("spark.sql.orc.filterPushdown", "true"), ("hive.msck.path.validation", "ignore"), ("spark.sql.caseSensitive", "true"), ("spark.speculation", "false"), ("hive.metastore.authorization.storage.checks", "false"), ("hive.metastore.client.connect.retry.delay", "5s"), ("hive.metastore.client.socket.timeout", "1800s"), ("hive.metastore.connect.retries", "12"), ("hive.metastore.execute.setugi", "false"), ("hive.metastore.failure.retries", "12"), ("hive.metastore.schema.verification", "false"), ("hive.metastore.schema.verification.record.version", "false"), ("hive.metastore.server.max.threads", "100000"), ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse") ] configs = {"DB":"pycharm", "tableName":"randomDataPy"} DB = "pycharm" tableName = "randomDataPy" fullyQualifiedTableName = DB +"."+tableName spark = SparkSession.builder \ .appName("app1") \ .enableHiveSupport() \ .getOrCreate()
spark.sparkContext._conf.setAll(settings)
sc = SparkContext.getOrCreate() print(sc.getConf().getAll()) sqlContext = SQLContext(sc) HiveContext = HiveContext(sc) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nStarted at");uf.println(lst)
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0
sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING") df.write.mode("overwrite").saveAsTable("pycharm.ABCD") df.printSchema() df.explain() df.createOrReplaceTempView("tmp") sqltext = f""" INSERT INTO TABLE {fullyQualifiedTableName} SELECT ID , CLUSTERED , SCATTERED , RANDOMISED , RANDOM_STRING , SMALL_VC , PADDING FROM tmp """ spark.sql(sqltext) spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False) ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nFinished at");usedFunctions.println(lst)
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers how bout this.. does this work fine? list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))
If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there
If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working.. Could you try that as well? kr
On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh < [hidden email]> wrote: Sorry, part of the code is not that visible
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)))
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
This is the code
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0 sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING")
And this is the run with error
Started at 11/12/2020 14:42:45.45 number of rows is 4500000 starting at ID = 4500001 ,ending on = 4600000 100000 0.00199 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft yield next(iterator) File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper return f(*args, **kwargs) File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
Regards,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.
On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh < [hidden email]> wrote: Hi,
This used to work but not anymore.
I have UsedFunctions.py file that has these functions
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0]) Now in the main().py module I import this file as follows:
import UsedFunctions as uf
Then I try the following
import UsedFunctions as uf
numRows = 100000 ## do in increment of 100K rows rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) The problem is that now it throws error for numRows as below
File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
I don't know why this error is coming!
Appreciate any ideas
Thanks,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|
Hi Mich i dont think it's a good idea... I believe your IDE is playing tricks on you. Take spark out of the equation.... this is a python issue only. i am guessing your IDE is somehow messing up your environment.
if you take out the whole spark code and replace it by this code
map(lambda x: (x, uf.clustered(x,numRows), \ uf.scattered(x,numRows), \ uf.randomised(x, numRows), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)), [1,2,3,4,5])
you should get exactly the same error...
Send me a zip with the tfconstants,py and a trimmed donw version of your main,py and i'll plug it in my IDE and see if i can reproduce It worked fine in Jupyter, but then i have all functins in same notebook hth marco
On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh < [hidden email]> wrote: I solved the issue of variable numRows within the lambda function not defined by defining it as a Global variable
global numRows numRows = 10 ## do in increment of 50K rows otherwise you blow up driver memory! #
Then I could call it within the lambda function as follows
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x,numRows), \ uf.scattered(x,numRows), \ uf.randomised(x, numRows), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) This then worked. I am not convinced this is *the correct* solution but somehow it worked.
Thanks
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
many thanks KR.
If i call the clusterted function on its own it works
print(uf.clustered(200,numRows)) and returns
0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached
However, in PyCharm, I do the following
UsedFunctions.py. Note that this file only contains functions and no class
import logging
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0])
In the main.py(PyCharm) I have this code which is failing
from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext from pyspark.sql import HiveContext from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql.types import StringType, ArrayType from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \ add_months from datetime import datetime, timedelta import os from os.path import join, abspath from typing import Optional import logging import random import string import math import mathOperations as mo import UsedFunctions as uf ##import test_oracle as to
class main:
rec = {} settings = [ ("hive.exec.dynamic.partition", "true"), ("hive.exec.dynamic.partition.mode", "nonstrict"), ("spark.sql.orc.filterPushdown", "true"), ("hive.msck.path.validation", "ignore"), ("spark.sql.caseSensitive", "true"), ("spark.speculation", "false"), ("hive.metastore.authorization.storage.checks", "false"), ("hive.metastore.client.connect.retry.delay", "5s"), ("hive.metastore.client.socket.timeout", "1800s"), ("hive.metastore.connect.retries", "12"), ("hive.metastore.execute.setugi", "false"), ("hive.metastore.failure.retries", "12"), ("hive.metastore.schema.verification", "false"), ("hive.metastore.schema.verification.record.version", "false"), ("hive.metastore.server.max.threads", "100000"), ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse") ] configs = {"DB":"pycharm", "tableName":"randomDataPy"} DB = "pycharm" tableName = "randomDataPy" fullyQualifiedTableName = DB +"."+tableName spark = SparkSession.builder \ .appName("app1") \ .enableHiveSupport() \ .getOrCreate()
spark.sparkContext._conf.setAll(settings)
sc = SparkContext.getOrCreate() print(sc.getConf().getAll()) sqlContext = SQLContext(sc) HiveContext = HiveContext(sc) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nStarted at");uf.println(lst)
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0
sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING") df.write.mode("overwrite").saveAsTable("pycharm.ABCD") df.printSchema() df.explain() df.createOrReplaceTempView("tmp") sqltext = f""" INSERT INTO TABLE {fullyQualifiedTableName} SELECT ID , CLUSTERED , SCATTERED , RANDOMISED , RANDOM_STRING , SMALL_VC , PADDING FROM tmp """ spark.sql(sqltext) spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False) ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nFinished at");usedFunctions.println(lst)
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers how bout this.. does this work fine? list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))
If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there
If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working.. Could you try that as well? kr
On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh < [hidden email]> wrote: Sorry, part of the code is not that visible
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)))
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
This is the code
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0 sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING")
And this is the run with error
Started at 11/12/2020 14:42:45.45 number of rows is 4500000 starting at ID = 4500001 ,ending on = 4600000 100000 0.00199 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft yield next(iterator) File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper return f(*args, **kwargs) File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
Regards,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.
On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh < [hidden email]> wrote: Hi,
This used to work but not anymore.
I have UsedFunctions.py file that has these functions
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0]) Now in the main().py module I import this file as follows:
import UsedFunctions as uf
Then I try the following
import UsedFunctions as uf
numRows = 100000 ## do in increment of 100K rows rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) The problem is that now it throws error for numRows as below
File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
I don't know why this error is coming!
Appreciate any ideas
Thanks,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|
thanks Marco.
When I stripped down spark etc and ran your map, it came back OK (no errors) WITHOUT global numRows
However, with full code, this is the unresolved reference notification I am getting as attached embedded your code WITHOUT global numRows
regards,
Mich Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hi Mich i dont think it's a good idea... I believe your IDE is playing tricks on you. Take spark out of the equation.... this is a python issue only. i am guessing your IDE is somehow messing up your environment.
if you take out the whole spark code and replace it by this code
map(lambda x: (x, uf.clustered(x,numRows), \ uf.scattered(x,numRows), \ uf.randomised(x, numRows), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)), [1,2,3,4,5])
you should get exactly the same error...
Send me a zip with the tfconstants,py and a trimmed donw version of your main,py and i'll plug it in my IDE and see if i can reproduce It worked fine in Jupyter, but then i have all functins in same notebook hth marco
On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh < [hidden email]> wrote: I solved the issue of variable numRows within the lambda function not defined by defining it as a Global variable
global numRows numRows = 10 ## do in increment of 50K rows otherwise you blow up driver memory! #
Then I could call it within the lambda function as follows
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x,numRows), \ uf.scattered(x,numRows), \ uf.randomised(x, numRows), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) This then worked. I am not convinced this is *the correct* solution but somehow it worked.
Thanks
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
many thanks KR.
If i call the clusterted function on its own it works
print(uf.clustered(200,numRows)) and returns
0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached
However, in PyCharm, I do the following
UsedFunctions.py. Note that this file only contains functions and no class
import logging
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0])
In the main.py(PyCharm) I have this code which is failing
from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext from pyspark.sql import HiveContext from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql.types import StringType, ArrayType from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \ add_months from datetime import datetime, timedelta import os from os.path import join, abspath from typing import Optional import logging import random import string import math import mathOperations as mo import UsedFunctions as uf ##import test_oracle as to
class main:
rec = {} settings = [ ("hive.exec.dynamic.partition", "true"), ("hive.exec.dynamic.partition.mode", "nonstrict"), ("spark.sql.orc.filterPushdown", "true"), ("hive.msck.path.validation", "ignore"), ("spark.sql.caseSensitive", "true"), ("spark.speculation", "false"), ("hive.metastore.authorization.storage.checks", "false"), ("hive.metastore.client.connect.retry.delay", "5s"), ("hive.metastore.client.socket.timeout", "1800s"), ("hive.metastore.connect.retries", "12"), ("hive.metastore.execute.setugi", "false"), ("hive.metastore.failure.retries", "12"), ("hive.metastore.schema.verification", "false"), ("hive.metastore.schema.verification.record.version", "false"), ("hive.metastore.server.max.threads", "100000"), ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse") ] configs = {"DB":"pycharm", "tableName":"randomDataPy"} DB = "pycharm" tableName = "randomDataPy" fullyQualifiedTableName = DB +"."+tableName spark = SparkSession.builder \ .appName("app1") \ .enableHiveSupport() \ .getOrCreate()
spark.sparkContext._conf.setAll(settings)
sc = SparkContext.getOrCreate() print(sc.getConf().getAll()) sqlContext = SQLContext(sc) HiveContext = HiveContext(sc) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nStarted at");uf.println(lst)
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0
sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING") df.write.mode("overwrite").saveAsTable("pycharm.ABCD") df.printSchema() df.explain() df.createOrReplaceTempView("tmp") sqltext = f""" INSERT INTO TABLE {fullyQualifiedTableName} SELECT ID , CLUSTERED , SCATTERED , RANDOMISED , RANDOM_STRING , SMALL_VC , PADDING FROM tmp """ spark.sql(sqltext) spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False) ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nFinished at");usedFunctions.println(lst)
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers how bout this.. does this work fine? list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))
If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there
If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working.. Could you try that as well? kr
On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh < [hidden email]> wrote: Sorry, part of the code is not that visible
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)))
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
This is the code
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0 sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING")
And this is the run with error
Started at 11/12/2020 14:42:45.45 number of rows is 4500000 starting at ID = 4500001 ,ending on = 4600000 100000 0.00199 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft yield next(iterator) File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper return f(*args, **kwargs) File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
Regards,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.
On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh < [hidden email]> wrote: Hi,
This used to work but not anymore.
I have UsedFunctions.py file that has these functions
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0]) Now in the main().py module I import this file as follows:
import UsedFunctions as uf
Then I try the following
import UsedFunctions as uf
numRows = 100000 ## do in increment of 100K rows rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) The problem is that now it throws error for numRows as below
File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
I don't know why this error is coming!
Appreciate any ideas
Thanks,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
Sure Mich...uhm...let me try to run your code in my IDE. .. I m intrigued by the error.. Will report back either if I find something or not. Kind regards thanks Marco.
When I stripped down spark etc and ran your map, it came back OK (no errors) WITHOUT global numRows
However, with full code, this is the unresolved reference notification I am getting as attached embedded your code WITHOUT global numRows
regards,
Mich Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hi Mich i dont think it's a good idea... I believe your IDE is playing tricks on you. Take spark out of the equation.... this is a python issue only. i am guessing your IDE is somehow messing up your environment.
if you take out the whole spark code and replace it by this code
map(lambda x: (x, uf.clustered(x,numRows), \ uf.scattered(x,numRows), \ uf.randomised(x, numRows), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)), [1,2,3,4,5])
you should get exactly the same error...
Send me a zip with the tfconstants,py and a trimmed donw version of your main,py and i'll plug it in my IDE and see if i can reproduce It worked fine in Jupyter, but then i have all functins in same notebook hth marco
On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh < [hidden email]> wrote: I solved the issue of variable numRows within the lambda function not defined by defining it as a Global variable
global numRows numRows = 10 ## do in increment of 50K rows otherwise you blow up driver memory! #
Then I could call it within the lambda function as follows
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x,numRows), \ uf.scattered(x,numRows), \ uf.randomised(x, numRows), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) This then worked. I am not convinced this is *the correct* solution but somehow it worked.
Thanks
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
many thanks KR.
If i call the clusterted function on its own it works
print(uf.clustered(200,numRows)) and returns
0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached
However, in PyCharm, I do the following
UsedFunctions.py. Note that this file only contains functions and no class
import logging
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0])
In the main.py(PyCharm) I have this code which is failing
from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext from pyspark.sql import HiveContext from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql.types import StringType, ArrayType from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \ add_months from datetime import datetime, timedelta import os from os.path import join, abspath from typing import Optional import logging import random import string import math import mathOperations as mo import UsedFunctions as uf ##import test_oracle as to
class main:
rec = {} settings = [ ("hive.exec.dynamic.partition", "true"), ("hive.exec.dynamic.partition.mode", "nonstrict"), ("spark.sql.orc.filterPushdown", "true"), ("hive.msck.path.validation", "ignore"), ("spark.sql.caseSensitive", "true"), ("spark.speculation", "false"), ("hive.metastore.authorization.storage.checks", "false"), ("hive.metastore.client.connect.retry.delay", "5s"), ("hive.metastore.client.socket.timeout", "1800s"), ("hive.metastore.connect.retries", "12"), ("hive.metastore.execute.setugi", "false"), ("hive.metastore.failure.retries", "12"), ("hive.metastore.schema.verification", "false"), ("hive.metastore.schema.verification.record.version", "false"), ("hive.metastore.server.max.threads", "100000"), ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse") ] configs = {"DB":"pycharm", "tableName":"randomDataPy"} DB = "pycharm" tableName = "randomDataPy" fullyQualifiedTableName = DB +"."+tableName spark = SparkSession.builder \ .appName("app1") \ .enableHiveSupport() \ .getOrCreate()
spark.sparkContext._conf.setAll(settings)
sc = SparkContext.getOrCreate() print(sc.getConf().getAll()) sqlContext = SQLContext(sc) HiveContext = HiveContext(sc) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nStarted at");uf.println(lst)
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0
sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING") df.write.mode("overwrite").saveAsTable("pycharm.ABCD") df.printSchema() df.explain() df.createOrReplaceTempView("tmp") sqltext = f""" INSERT INTO TABLE {fullyQualifiedTableName} SELECT ID , CLUSTERED , SCATTERED , RANDOMISED , RANDOM_STRING , SMALL_VC , PADDING FROM tmp """ spark.sql(sqltext) spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False) ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nFinished at");usedFunctions.println(lst)
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers how bout this.. does this work fine? list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))
If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there
If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working.. Could you try that as well? kr
On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh < [hidden email]> wrote: Sorry, part of the code is not that visible
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)))
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
This is the code
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0 sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING")
And this is the run with error
Started at 11/12/2020 14:42:45.45 number of rows is 4500000 starting at ID = 4500001 ,ending on = 4600000 100000 0.00199 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft yield next(iterator) File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper return f(*args, **kwargs) File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
Regards,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.
On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh < [hidden email]> wrote: Hi,
This used to work but not anymore.
I have UsedFunctions.py file that has these functions
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0]) Now in the main().py module I import this file as follows:
import UsedFunctions as uf
Then I try the following
import UsedFunctions as uf
numRows = 100000 ## do in increment of 100K rows rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) The problem is that now it throws error for numRows as below
File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
I don't know why this error is coming!
Appreciate any ideas
Thanks,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|
I don't believe you'll be able to use globals in a Spark task, as they won't exist on the remote executor machines. On Sun, Dec 13, 2020 at 3:46 AM Mich Talebzadeh < [hidden email]> wrote: thanks Marco.
When I stripped down spark etc and ran your map, it came back OK (no errors) WITHOUT global numRows
However, with full code, this is the unresolved reference notification I am getting as attached embedded your code WITHOUT global numRows
regards,
Mich Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hi Mich i dont think it's a good idea... I believe your IDE is playing tricks on you. Take spark out of the equation.... this is a python issue only. i am guessing your IDE is somehow messing up your environment.
if you take out the whole spark code and replace it by this code
map(lambda x: (x, uf.clustered(x,numRows), \ uf.scattered(x,numRows), \ uf.randomised(x, numRows), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)), [1,2,3,4,5])
you should get exactly the same error...
Send me a zip with the tfconstants,py and a trimmed donw version of your main,py and i'll plug it in my IDE and see if i can reproduce It worked fine in Jupyter, but then i have all functins in same notebook hth marco
On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh < [hidden email]> wrote: I solved the issue of variable numRows within the lambda function not defined by defining it as a Global variable
global numRows numRows = 10 ## do in increment of 50K rows otherwise you blow up driver memory! #
Then I could call it within the lambda function as follows
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x,numRows), \ uf.scattered(x,numRows), \ uf.randomised(x, numRows), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) This then worked. I am not convinced this is *the correct* solution but somehow it worked.
Thanks
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
many thanks KR.
If i call the clusterted function on its own it works
print(uf.clustered(200,numRows)) and returns
0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached
However, in PyCharm, I do the following
UsedFunctions.py. Note that this file only contains functions and no class
import logging
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0])
In the main.py(PyCharm) I have this code which is failing
from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext from pyspark.sql import HiveContext from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql.types import StringType, ArrayType from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \ add_months from datetime import datetime, timedelta import os from os.path import join, abspath from typing import Optional import logging import random import string import math import mathOperations as mo import UsedFunctions as uf ##import test_oracle as to
class main:
rec = {} settings = [ ("hive.exec.dynamic.partition", "true"), ("hive.exec.dynamic.partition.mode", "nonstrict"), ("spark.sql.orc.filterPushdown", "true"), ("hive.msck.path.validation", "ignore"), ("spark.sql.caseSensitive", "true"), ("spark.speculation", "false"), ("hive.metastore.authorization.storage.checks", "false"), ("hive.metastore.client.connect.retry.delay", "5s"), ("hive.metastore.client.socket.timeout", "1800s"), ("hive.metastore.connect.retries", "12"), ("hive.metastore.execute.setugi", "false"), ("hive.metastore.failure.retries", "12"), ("hive.metastore.schema.verification", "false"), ("hive.metastore.schema.verification.record.version", "false"), ("hive.metastore.server.max.threads", "100000"), ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse") ] configs = {"DB":"pycharm", "tableName":"randomDataPy"} DB = "pycharm" tableName = "randomDataPy" fullyQualifiedTableName = DB +"."+tableName spark = SparkSession.builder \ .appName("app1") \ .enableHiveSupport() \ .getOrCreate()
spark.sparkContext._conf.setAll(settings)
sc = SparkContext.getOrCreate() print(sc.getConf().getAll()) sqlContext = SQLContext(sc) HiveContext = HiveContext(sc) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nStarted at");uf.println(lst)
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0
sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING") df.write.mode("overwrite").saveAsTable("pycharm.ABCD") df.printSchema() df.explain() df.createOrReplaceTempView("tmp") sqltext = f""" INSERT INTO TABLE {fullyQualifiedTableName} SELECT ID , CLUSTERED , SCATTERED , RANDOMISED , RANDOM_STRING , SMALL_VC , PADDING FROM tmp """ spark.sql(sqltext) spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False) ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nFinished at");usedFunctions.println(lst)
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers how bout this.. does this work fine? list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))
If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there
If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working.. Could you try that as well? kr
On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh < [hidden email]> wrote: Sorry, part of the code is not that visible
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)))
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
This is the code
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0 sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING")
And this is the run with error
Started at 11/12/2020 14:42:45.45 number of rows is 4500000 starting at ID = 4500001 ,ending on = 4600000 100000 0.00199 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft yield next(iterator) File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper return f(*args, **kwargs) File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
Regards,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.
On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh < [hidden email]> wrote: Hi,
This used to work but not anymore.
I have UsedFunctions.py file that has these functions
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0]) Now in the main().py module I import this file as follows:
import UsedFunctions as uf
Then I try the following
import UsedFunctions as uf
numRows = 100000 ## do in increment of 100K rows rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) The problem is that now it throws error for numRows as below
File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
I don't know why this error is coming!
Appreciate any ideas
Thanks,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|
Thanks all.
Found out the problem :(
I defined the runner.py as
class main()
I replaced it with
and it worked without declaring numRows as global.
I am still wondering the reason for it working with def main()?
regards,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
I don't believe you'll be able to use globals in a Spark task, as they won't exist on the remote executor machines.
On Sun, Dec 13, 2020 at 3:46 AM Mich Talebzadeh < [hidden email]> wrote: thanks Marco.
When I stripped down spark etc and ran your map, it came back OK (no errors) WITHOUT global numRows
However, with full code, this is the unresolved reference notification I am getting as attached embedded your code WITHOUT global numRows
regards,
Mich Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hi Mich i dont think it's a good idea... I believe your IDE is playing tricks on you. Take spark out of the equation.... this is a python issue only. i am guessing your IDE is somehow messing up your environment.
if you take out the whole spark code and replace it by this code
map(lambda x: (x, uf.clustered(x,numRows), \ uf.scattered(x,numRows), \ uf.randomised(x, numRows), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)), [1,2,3,4,5])
you should get exactly the same error...
Send me a zip with the tfconstants,py and a trimmed donw version of your main,py and i'll plug it in my IDE and see if i can reproduce It worked fine in Jupyter, but then i have all functins in same notebook hth marco
On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh < [hidden email]> wrote: I solved the issue of variable numRows within the lambda function not defined by defining it as a Global variable
global numRows numRows = 10 ## do in increment of 50K rows otherwise you blow up driver memory! #
Then I could call it within the lambda function as follows
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x,numRows), \ uf.scattered(x,numRows), \ uf.randomised(x, numRows), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) This then worked. I am not convinced this is *the correct* solution but somehow it worked.
Thanks
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
many thanks KR.
If i call the clusterted function on its own it works
print(uf.clustered(200,numRows)) and returns
0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached
However, in PyCharm, I do the following
UsedFunctions.py. Note that this file only contains functions and no class
import logging
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0])
In the main.py(PyCharm) I have this code which is failing
from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext from pyspark.sql import HiveContext from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql.types import StringType, ArrayType from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \ add_months from datetime import datetime, timedelta import os from os.path import join, abspath from typing import Optional import logging import random import string import math import mathOperations as mo import UsedFunctions as uf ##import test_oracle as to
class main:
rec = {} settings = [ ("hive.exec.dynamic.partition", "true"), ("hive.exec.dynamic.partition.mode", "nonstrict"), ("spark.sql.orc.filterPushdown", "true"), ("hive.msck.path.validation", "ignore"), ("spark.sql.caseSensitive", "true"), ("spark.speculation", "false"), ("hive.metastore.authorization.storage.checks", "false"), ("hive.metastore.client.connect.retry.delay", "5s"), ("hive.metastore.client.socket.timeout", "1800s"), ("hive.metastore.connect.retries", "12"), ("hive.metastore.execute.setugi", "false"), ("hive.metastore.failure.retries", "12"), ("hive.metastore.schema.verification", "false"), ("hive.metastore.schema.verification.record.version", "false"), ("hive.metastore.server.max.threads", "100000"), ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse") ] configs = {"DB":"pycharm", "tableName":"randomDataPy"} DB = "pycharm" tableName = "randomDataPy" fullyQualifiedTableName = DB +"."+tableName spark = SparkSession.builder \ .appName("app1") \ .enableHiveSupport() \ .getOrCreate()
spark.sparkContext._conf.setAll(settings)
sc = SparkContext.getOrCreate() print(sc.getConf().getAll()) sqlContext = SQLContext(sc) HiveContext = HiveContext(sc) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nStarted at");uf.println(lst)
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0
sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING") df.write.mode("overwrite").saveAsTable("pycharm.ABCD") df.printSchema() df.explain() df.createOrReplaceTempView("tmp") sqltext = f""" INSERT INTO TABLE {fullyQualifiedTableName} SELECT ID , CLUSTERED , SCATTERED , RANDOMISED , RANDOM_STRING , SMALL_VC , PADDING FROM tmp """ spark.sql(sqltext) spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False) ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nFinished at");usedFunctions.println(lst)
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers how bout this.. does this work fine? list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))
If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there
If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working.. Could you try that as well? kr
On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh < [hidden email]> wrote: Sorry, part of the code is not that visible
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)))
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
This is the code
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0 sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING")
And this is the run with error
Started at 11/12/2020 14:42:45.45 number of rows is 4500000 starting at ID = 4500001 ,ending on = 4600000 100000 0.00199 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft yield next(iterator) File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper return f(*args, **kwargs) File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
Regards,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.
On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh < [hidden email]> wrote: Hi,
This used to work but not anymore.
I have UsedFunctions.py file that has these functions
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0]) Now in the main().py module I import this file as follows:
import UsedFunctions as uf
Then I try the following
import UsedFunctions as uf
numRows = 100000 ## do in increment of 100K rows rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) The problem is that now it throws error for numRows as below
File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
I don't know why this error is coming!
Appreciate any ideas
Thanks,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|
Hey Mich glad to know u got to the bottom In python, if you want to run a module - same as if you would use Java/Scala -you will have to define a def main() method You'll notice that the snippet i sent you had this syntax - if __name__ == "main": main()
I am guessing you just choose an unfortunate name for your class. Had you called it
class pincopallino: .....
Your IDE would not have called it because it could not find a main method, and then you would have been on the right track
I am guessing your main() class somehow confused your IDE.
The best way to run your spark code would be via a unit test though.... the code below might give you a head start - (you'll need to configure your IDE for this though..)
have fun kr marco
import logging from pyspark.sql import SparkSession from pyspark import HiveContext from pyspark import SparkConf from pyspark import SparkContext import pyspark from pyspark.sql import SparkSession import pytest import shutil
def spark_session(): return SparkSession.builder \ .master('local[1]') \ .appName('SparkByExamples.com') \ .getOrCreate()
def test_create_table(spark_session): df = spark_session.createDataFrame([['one', 'two']]).toDF(*['first', 'second']) print(df.show())
df2 = spark_session.createDataFrame([['one', 'two']]).toDF(*['first', 'second'])
df.createOrReplaceTempView('sample')
assert df.subtract(df2).count() == 0
On Sun, Dec 13, 2020 at 8:43 PM Mich Talebzadeh < [hidden email]> wrote:
Thanks all.
Found out the problem :(
I defined the runner.py as
class main()
I replaced it with
and it worked without declaring numRows as global.
I am still wondering the reason for it working with def main()?
regards,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
I don't believe you'll be able to use globals in a Spark task, as they won't exist on the remote executor machines.
On Sun, Dec 13, 2020 at 3:46 AM Mich Talebzadeh < [hidden email]> wrote: thanks Marco.
When I stripped down spark etc and ran your map, it came back OK (no errors) WITHOUT global numRows
However, with full code, this is the unresolved reference notification I am getting as attached embedded your code WITHOUT global numRows
regards,
Mich Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hi Mich i dont think it's a good idea... I believe your IDE is playing tricks on you. Take spark out of the equation.... this is a python issue only. i am guessing your IDE is somehow messing up your environment.
if you take out the whole spark code and replace it by this code
map(lambda x: (x, uf.clustered(x,numRows), \ uf.scattered(x,numRows), \ uf.randomised(x, numRows), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)), [1,2,3,4,5])
you should get exactly the same error...
Send me a zip with the tfconstants,py and a trimmed donw version of your main,py and i'll plug it in my IDE and see if i can reproduce It worked fine in Jupyter, but then i have all functins in same notebook hth marco
On Sat, Dec 12, 2020 at 9:02 PM Mich Talebzadeh < [hidden email]> wrote: I solved the issue of variable numRows within the lambda function not defined by defining it as a Global variable
global numRows numRows = 10 ## do in increment of 50K rows otherwise you blow up driver memory! #
Then I could call it within the lambda function as follows
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x,numRows), \ uf.scattered(x,numRows), \ uf.randomised(x, numRows), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) This then worked. I am not convinced this is *the correct* solution but somehow it worked.
Thanks
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
many thanks KR.
If i call the clusterted function on its own it works
print(uf.clustered(200,numRows)) and returns
0.00199
If I run all in one including the UsedFunctions claa in the same py file it works. The code is attached
However, in PyCharm, I do the following
UsedFunctions.py. Note that this file only contains functions and no class
import logging
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0])
In the main.py(PyCharm) I have this code which is failing
from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext from pyspark.sql import HiveContext from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql.types import StringType, ArrayType from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \ add_months from datetime import datetime, timedelta import os from os.path import join, abspath from typing import Optional import logging import random import string import math import mathOperations as mo import UsedFunctions as uf ##import test_oracle as to
class main:
rec = {} settings = [ ("hive.exec.dynamic.partition", "true"), ("hive.exec.dynamic.partition.mode", "nonstrict"), ("spark.sql.orc.filterPushdown", "true"), ("hive.msck.path.validation", "ignore"), ("spark.sql.caseSensitive", "true"), ("spark.speculation", "false"), ("hive.metastore.authorization.storage.checks", "false"), ("hive.metastore.client.connect.retry.delay", "5s"), ("hive.metastore.client.socket.timeout", "1800s"), ("hive.metastore.connect.retries", "12"), ("hive.metastore.execute.setugi", "false"), ("hive.metastore.failure.retries", "12"), ("hive.metastore.schema.verification", "false"), ("hive.metastore.schema.verification.record.version", "false"), ("hive.metastore.server.max.threads", "100000"), ("hive.metastore.authorization.storage.checks", "/apps/hive/warehouse") ] configs = {"DB":"pycharm", "tableName":"randomDataPy"} DB = "pycharm" tableName = "randomDataPy" fullyQualifiedTableName = DB +"."+tableName spark = SparkSession.builder \ .appName("app1") \ .enableHiveSupport() \ .getOrCreate()
spark.sparkContext._conf.setAll(settings)
sc = SparkContext.getOrCreate() print(sc.getConf().getAll()) sqlContext = SQLContext(sc) HiveContext = HiveContext(sc) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nStarted at");uf.println(lst)
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0
sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING") df.write.mode("overwrite").saveAsTable("pycharm.ABCD") df.printSchema() df.explain() df.createOrReplaceTempView("tmp") sqltext = f""" INSERT INTO TABLE {fullyQualifiedTableName} SELECT ID , CLUSTERED , SCATTERED , RANDOMISED , RANDOM_STRING , SMALL_VC , PADDING FROM tmp """ spark.sql(sqltext) spark.sql(f"SELECT MIN(id) AS minID, MAX(id) AS maxID FROM {fullyQualifiedTableName}").show(n=20,truncate=False,vertical=False) ##sqlContext.sql("""SELECT * FROM pycharm.randomDataPy ORDER BY id""").show(n=20,truncate=False,vertical=False) lst = (spark.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect() print("\nFinished at");usedFunctions.println(lst)
copying and pasting your code code in a jup notebook works fine. that is, using my own version of Range which is simply a list of numbers how bout this.. does this work fine? list(map(lambda x: (x, clustered(x, numRows)),[1,2,3,4]))
If it does, i'd look in what's inside your Range and what you get out of it. I suspect something wrong in there
If there was something with the clustered function, then you should be able to take it out of the map() and still have the code working.. Could you try that as well? kr
On Fri, Dec 11, 2020 at 5:04 PM Mich Talebzadeh < [hidden email]> wrote: Sorry, part of the code is not that visible
rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000)))
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
This is the code
numRows = 100000 ## do in increment of 50K rows otherwise you blow up driver memory! # ## Check if table exist otherwise create it
rows = 0 sqltext = "" if (spark.sql(f"SHOW TABLES IN {DB} like '{tableName}'").count() == 1): rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] print ("number of rows is ",rows) else: print(f"\nTable {fullyQualifiedTableName} does not exist, creating table ") sqltext = """ CREATE TABLE {DB}.{tableName}( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(50) , PADDING VARCHAR(4000) ) STORED AS PARQUET """ spark.sql(sqltext)
start = 0 if (rows == 0): start = 1 else: maxID = spark.sql(f"SELECT MAX(id) FROM {fullyQualifiedTableName}").collect()[0][0] start = maxID + 1 end = start + numRows - 1 print ("starting at ID = ",start, ",ending on = ",end) Range = range(start, end+1) ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class print(numRows) print(uf.clustered(200,numRows)) rdd = sc.parallelize(Range). \ map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) df = rdd.toDF(). \ withColumnRenamed("_1","ID"). \ withColumnRenamed("_2", "CLUSTERED"). \ withColumnRenamed("_3", "SCATTERED"). \ withColumnRenamed("_4", "RANDOMISED"). \ withColumnRenamed("_5", "RANDOM_STRING"). \ withColumnRenamed("_6", "SMALL_VC"). \ withColumnRenamed("_7", "PADDING")
And this is the run with error
Started at 11/12/2020 14:42:45.45 number of rows is 4500000 starting at ID = 4500001 ,ending on = 4600000 100000 0.00199 20/12/11 14:42:56 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 33) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "C:\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1440, in takeUpToNumLeft yield next(iterator) File "C:\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper return f(*args, **kwargs) File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
Regards,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Looks like a simple Python error - you haven't shown the code that produces it. Indeed, I suspect you'll find there is no such symbol.
On Fri, Dec 11, 2020 at 9:09 AM Mich Talebzadeh < [hidden email]> wrote: Hi,
This used to work but not anymore.
I have UsedFunctions.py file that has these functions
import random import string import math
def randomString(length): letters = string.ascii_letters result_str = ''.join(random.choice(letters) for i in range(length)) return result_str
def clustered(x,numRows): return math.floor(x -1)/numRows
def scattered(x,numRows): return abs((x -1 % numRows))* 1.0
def randomised(seed,numRows): random.seed(seed) return abs(random.randint(0, numRows) % numRows) * 1.0
def padString(x,chars,length): n = int(math.log10(x) + 1) result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x) return result_str
def padSingleChar(chars,length): result_str = ''.join(chars for i in range(length)) return result_str
def println(lst): for ll in lst: print(ll[0]) Now in the main().py module I import this file as follows:
import UsedFunctions as uf
Then I try the following
import UsedFunctions as uf
numRows = 100000 ## do in increment of 100K rows rdd = sc.parallelize(Range). \
map(lambda x: (x, uf.clustered(x, numRows), \ uf.scattered(x,10000), \ uf.randomised(x,10000), \ uf.randomString(50), \ uf.padString(x," ",50), \ uf.padSingleChar("x",4000))) The problem is that now it throws error for numRows as below
File "C:/Users/admin/PycharmProjects/pythonProject2/pilot/src/main.py", line 101, in <lambda> map(lambda x: (x, uf.clustered(x, numRows), \ NameError: name 'numRows' is not defined
I don't know why this error is coming!
Appreciate any ideas
Thanks,
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|