Record count query parallel processing in databricks spark delta lake

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Record count query parallel processing in databricks spark delta lake

anbutech
Hi,

I have a question on the design of monitoring pyspark script on the large
number of source json data coming from more than 100 kafka topics.
These multiple topics are store under separate bucket in aws s3.each of the
kafka topics having more Terabytes of json data with respect to the
partition year,month,day,hour data.
each hour having lot of json files with .gz compression format.

What is the best way to process more terabytes of data read from s3 under
partition year,month,day,hour for all the topics source.

we are using databricks delta lake in databricks platform.query is taking
lot of time to get the count of records by year,month,date wise.

what is the best approach to handle terabytes of data to get the record
counts for all the days.

please help me on the below problem:

topics_list.csv
--------------
I'm planning to put all the 150 topics in the csv file to read and process
the data to get day record count.

I have to iterate sequence one by one topics from csv file using for loop or
other options,to pass the year,month,date arguments
to get the record count for the particular day for all the topics.

df
=spark.read.json("s3a://kafka-bucket_name/topic_name/year/month/day/hour/")

df.createOrReplaceTempView(topic1_source)

spark.sql("select count(1) from topic1_source")

Could you help me or give an good  approach to parallely run the query for
all the topics to get the record day count for all the 150 topics
effectively using apache spark delta lake in databricks.

thanks











--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Record count query parallel processing in databricks spark delta lake

Farhan Misarwala

Hi Anbutech,
If I am not mistaken, I believe you are trying to read multiple dataframes from around 150 different paths (in your case the Kafka topics) to count their records. You have all these paths stored in a CSV with columns year, month, day and hour. 
Here is what I came up with; I have been using this approach for similar kind of problems, and it has worked for me in the past, in almost all of my cases. This should give you an idea. 
Thanks.

import multiprocessing
import time

import pandas as pd
from pyspark.sql import SparkSession

if __name__ == '__main__':
# Suppose you have hourly data for 1st three hours of the year, since 2018.
intervals = [
{"year": 2018, "month": 1, "day": 1, "hour": 1},
{"year": 2018, "month": 1, "day": 1, "hour": 2},
{"year": 2018, "month": 1, "day": 1, "hour": 3},
{"year": 2019, "month": 1, "day": 1, "hour": 1},
{"year": 2019, "month": 1, "day": 1, "hour": 2},
{"year": 2019, "month": 1, "day": 1, "hour": 3},
{"year": 2020, "month": 1, "day": 1, "hour": 1},
{"year": 2020, "month": 1, "day": 1, "hour": 2},
{"year": 2020, "month": 1, "day": 1, "hour": 3}
]

# A Dataframe from my list of dictionaries, you can use pd.read_csv to read your `topics_list.csv` into a Dataframe
input_paths_df = pd.DataFrame(list(intervals))
print(input_paths_df)

# In your case, reading your csv file (now a df) into a list of dictionaries
input_paths = input_paths_df.to_dict('records')

# Since this on my local, in PyCharm <3
spark = SparkSession.builder.master("local[*]").appName('Reading topics in parallel').getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 10)
spark.conf.set("spark.executor.pyspark.memory", "2G")


def get_counts(record):
year_ = record['year']
month_ = record['month']
day_ = record['day']
hour_ = record['hour']
path = "s3a://kafka-bucket_name/topic_name/{}/{}/{}/{}".format(year_, month_, day_, hour_)
df = spark.read.json(path)
# No need of createOrReplaceTempView, just do a df.count()
record['count'] = df.count()
# time.sleep(2)
record['filePath'] = path

return record


# Creating a pool of 20 processes. You can set this as per your intended parallelism and your available resources.
start = time.time()
pool = multiprocessing.Pool(20)
# This will execute get_counts() parallelly, on each element inside input_paths.
# result (a list of dictionary) is constructed when all executions are completed.
result = pool.map(get_counts, input_paths)
end = time.time()

result_df = pd.DataFrame(result)
# You can use, result_df.to_csv() to store the results in a csv.
print(result_df)
print('Time take : {}'.format(end - start))

On Fri, Jan 17, 2020 at 11:49 PM anbutech <[hidden email]> wrote:
Hi,

I have a question on the design of monitoring pyspark script on the large
number of source json data coming from more than 100 kafka topics.
These multiple topics are store under separate bucket in aws s3.each of the
kafka topics having more Terabytes of json data with respect to the
partition year,month,day,hour data.
each hour having lot of json files with .gz compression format.

What is the best way to process more terabytes of data read from s3 under
partition year,month,day,hour for all the topics source.

we are using databricks delta lake in databricks platform.query is taking
lot of time to get the count of records by year,month,date wise.

what is the best approach to handle terabytes of data to get the record
counts for all the days.

please help me on the below problem:

topics_list.csv
--------------
I'm planning to put all the 150 topics in the csv file to read and process
the data to get day record count.

I have to iterate sequence one by one topics from csv file using for loop or
other options,to pass the year,month,date arguments
to get the record count for the particular day for all the topics.

df
=spark.read.json("s3a://kafka-bucket_name/topic_name/year/month/day/hour/")

df.createOrReplaceTempView(topic1_source)

spark.sql("select count(1) from topic1_source")

Could you help me or give an good  approach to parallely run the query for
all the topics to get the record day count for all the 150 topics
effectively using apache spark delta lake in databricks.

thanks











--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Record count query parallel processing in databricks spark delta lake

anbutech
Thank you Farhan so much for the help.

please help me on the design approach of this problem.what is the best way
to achieve this code to get the results better.

I have some clarification on the code.

want to take daily record count of ingestion source vs databricks delta lake
table vs snowflake table.how to combined the 3 different  query and do the
parallelism each of the count query and take the daily count of each.

I have s3a bucket with lot of ingestion topics folder with json files with
respect to the year, month,date,hour partition.

s3a://bucket_name/topic1/year=2019/Month=12/Day=25/hour=01
s3a://bucket_name/topic1/year=2019/Month=12/Day=25/hour=02
s3a://bucket_name/topic1/year=2019/Month=12/Day=25/hour=03
.............................................................................................
............................................................................................
...........................................................................................

s3a://bucket_name/topic1/year=2019/Month=12/Day=25/hour=23

s3a://bucket_name/topic2/year=2019/Month=12/Day=25/hour=01
s3a://bucket_name/topic2/year=2019/Month=12/Day=25/hour=02
s3a://bucket_name/topic2/year=2019/Month=12/Day=25/hour=03
.....................................................................................
.........................................................................................
...........................................................................................
.............................................................................................
s3a://bucket_name/topic2/year=2019/Month=12/Day=25/hour=23

s3a://bucket_name/topic3/year=2019/Month=12/Day=25/hour=01
s3a://bucket_name/topic3/year=2019/Month=12/Day=25/hour=02
s3a://bucket_name/topic3/year=2019/Month=12/Day=25/hour=03
............................................................................................
............................................................................................
............................................................................................
s3a://bucket_name/topic3/year=2019/Month=12/Day=25/hour=23
 
 
similarly for other 100 topics in the same S3 bucket location with other
topic name.


output:

Daily Day count table of all the 100 topic.

topics
,databricks_table_name,topic_count,databricks_table_count,snowflake_count,Y,M,D
topic1,logtable1,100,100,100,2019,12,25
topic2,logtable2,300,300,300,2019,12,25
topic3,logtable3,500,500,500,2019,12,25
topic4,logtable4,600,100,100,2019,12,25
topic5,logtable5,1000,1000,1000,2019,12,25
topic6,logtable6,200,200,200,2019,12,25
................................
................................
................................
topic100,logtable100,2000,2000,2000,2019,12,25

kindly help me on this problem.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]