The equivalent of Scala mapping in Pyspark

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

The equivalent of Scala mapping in Pyspark

Mich Talebzadeh
Hi,

I generate an array of random data and create a DF in Spark scala as follows

val end = start + numRows - 1
println (" starting at ID = " + start + " , ending on = " +  end )
val usedFunctions = new UsedFunctions
val text = ( start to end ).map(i =>
             (
                 i.toString
               , usedFunctions.clustered(i,numRows).toString
               , usedFunctions.scattered(i,numRows).toString
               , usedFunctions.randomised(i,numRows).toString
               , usedFunctions.randomString(chars.mkString(""),50)
               , usedFunctions.padString(i, " ", 50)
               , usedFunctions.padSingleChar("x ", 4000)
             )
           ).
    toArray

then I create a D
val df = sc.parallelize(text).
                              map(p => columns(
                                                  p._1.toString.toInt
                                                , p._2.toString.toDouble
                                                , p._3.toString.toDouble
                                                , p._4.toString.toDouble
                                                , p._5.toString
                                                , p._6.toString
                                                , p._7.toString
                                              )
                                 ).
    toDF


What is the equivalent of this in Pyspark, especially the first part val text = ..


Thanks


Mich


 Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: The equivalent of Scala mapping in Pyspark

Mich Talebzadeh
Hi all,

I managed to sort this one out in a trench way as the Pyspark available materials are not as comprehensive as Scala one. Frankly to sort this out was a bit of a struggle for me. However, I managed to make it work. 

What the script does in a nutshell is to generate rows in Spark, and save them to a Hive Parquet table through Spark, You can run the script anytime to append more rows. I had the script in Scala so I wrote it in PySpark

There are a lot of handy stuff in Scala like case class for defining column headers etc that don't seem to be available in Python (possibly my lack of in-depth Python knowledge). However, Spark documents frequently state availability of features to Scala and Java not Python.

However, I managed to call lambda functions in Pyspark to create an RDD based on random columns generated. This is what I did

start = 0
if (rows == 0):
  start = 1
else:
  maxID = sqlContext.sql("SELECT MAX(id) FROM test.randomDataPy").collect()[0][0]
  start = maxID + 1
end = start + numRows - 1
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end+1)
## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class
rdd = sc.parallelize(Range). \
         map(lambda x: (x, usedFunctions.clustered(x,numRows), \
                           usedFunctions.scattered(x,numRows), \
                           usedFunctions.randomised(x,numRows), \
                           usedFunctions.randomString(50), \
                           usedFunctions.padString(x," ",50), \
                           usedFunctions.padSingleChar("x",4000)))
df = rdd.toDF(). \
     withColumnRenamed("_1","ID"). \
     withColumnRenamed("_2", "CLUSTERED"). \
     withColumnRenamed("_3", "SCATTERED"). \
     withColumnRenamed("_4", "RANDOMISED"). \
     withColumnRenamed("_5", "RANDOM_STRING"). \
     withColumnRenamed("_6", "SMALL_VC"). \
     withColumnRenamed("_7", "PADDING")
df.printSchema()

This is the output

('number of rows is ', 200000)
('starting at ID = ', 200001, ',ending on = ', 250000)
root
 |-- ID: long (nullable = true)
 |-- CLUSTERED: double (nullable = true)
 |-- SCATTERED: double (nullable = true)
 |-- RANDOMISED: double (nullable = true)
 |-- RANDOM_STRING: string (nullable = true)
 |-- SMALL_VC: string (nullable = true)
 |-- PADDING: string (nullable = true)

Even small things like show() threw me back. This is what I had to use other with show() I was getting error message

sqlContext.sql("""SELECT MIN(id) AS minID, MAX(id) AS maxID FROM test.randomDataPy""").show(n=20,truncate=False,vertical=False)

In order to make this work with Hive I had to add spark.sql.catalogImplementation=hive to spark-submit, otherwise it was throwing errors

spark-submit --master yarn --deploy-mode client --conf spark.sql.catalogImplementation=hive <file.py>

Anyway, I have attached both the Scala code and Python code that do generate random column values plus Hive parquet tables and store them in Parquet format. If table exists, new rows are appended. Any feedback will be much appreciated (negative or positive so to speak).

Thanks,

Mich

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 13 Oct 2020 at 23:46, Mich Talebzadeh <[hidden email]> wrote:
Hi,

I generate an array of random data and create a DF in Spark scala as follows

val end = start + numRows - 1
println (" starting at ID = " + start + " , ending on = " +  end )
val usedFunctions = new UsedFunctions
val text = ( start to end ).map(i =>
             (
                 i.toString
               , usedFunctions.clustered(i,numRows).toString
               , usedFunctions.scattered(i,numRows).toString
               , usedFunctions.randomised(i,numRows).toString
               , usedFunctions.randomString(chars.mkString(""),50)
               , usedFunctions.padString(i, " ", 50)
               , usedFunctions.padSingleChar("x ", 4000)
             )
           ).
    toArray

then I create a D
val df = sc.parallelize(text).
                              map(p => columns(
                                                  p._1.toString.toInt
                                                , p._2.toString.toDouble
                                                , p._3.toString.toDouble
                                                , p._4.toString.toDouble
                                                , p._5.toString
                                                , p._6.toString
                                                , p._7.toString
                                              )
                                 ).
    toDF


What is the equivalent of this in Pyspark, especially the first part val text = ..


Thanks


Mich


 Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

dynamic_ARRAY_generator_parquet.py (5K) Download Attachment
dynamic_ARRAY_generator_parquet.scala (6K) Download Attachment