How to write DataFrame to single parquet file instead of multiple files under a folder in spark?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

How to write DataFrame to single parquet file instead of multiple files under a folder in spark?

mithril
For better viewing,  please see
https://stackoverflow.com/questions/52964167/how-to-write-dataframe-to-single-parquet-file-instead-of-multiple-files-under-a

-----

I have a folder with files

[![enter image description here][1]][1]


I want to do some transform to each file and save to another folder with
same name(Because original data is alread splited by month, I would need
split again if merge them at first).

## My code(pyspark)

    URI           = sc._gateway.jvm.java.net.URI
    Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
    FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
    Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
   
   
    fs = FileSystem.get(URI("hdfs://xxxx:9000"), Configuration())
   
    status = fs.listStatus(Path('/arch/M/stockquantitylogs/2018/'))
   
    paths = list()
    for fileStatus in status:
        p = str(fileStatus.getPath())
        if p.endswith('.parquet'):
            paths.append(p)
           
   
    sqlContext = SQLContext(sc)
   
    def transform(path):
        columns = ["store_id", "product_id", "store_product_quantity_old",
"store_product_quantity_new",  "time_create"]
        df = sqlContext.read.parquet(path).select(columns)
        df1 = df[((df.store_product_quantity_old>0) &
(df.store_product_quantity_new==0))|(df.store_product_quantity_old==0) &
(df.store_product_quantity_new>0)]
        df1.coalesce(1).write.parquet(path.replace('/arch', '/clean'),
mode="overwrite")
       
       
    for p in paths:
        transform(p)

At first `write.parquet` would write multiple files in one folder,  after
some searching, I add `.coalesce(1)` . But still output folder with one file
under it.
Output:

[![enter image description here][2]][2]

I have tried `df1.write.save(path.replace('/arch', '/clean'),
mode="overwrite")` and  `df1.write.save('/clean/M/stockquantitylogs/2018/',
mode="overwrite")` , both not work.

I just want to process `/arch/M/stockquantitylogs/2018/*.parquet` to
`/clean/M/stockquantitylogs/2018/*.parquet`, each file with same name.



  [1]: https://i.stack.imgur.com/PqgP6.png
  [2]: https://i.stack.imgur.com/JPcsP.png



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]