Spark structured streaming - efficient way to do lots of aggregations on the same input files

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark structured streaming - efficient way to do lots of aggregations on the same input files

Filip-2
Hi,

I'm considering using Apache Spark for the development of an application.
This would replace a legacy program which reads CSV files and does lots
(tens/hundreds) of aggregations on them. The aggregations are fairly simple:
counts, sums, etc. while applying some filtering conditions on some of the
columns.

I prefer using structured streaming for its simplicity and low-latency. I'd
also like to use full SQL queries (via createOrReplaceTempView). However,
doing multiple queries means Spark will re-read the input files for each one
of them. This seems very inefficient for my use-case.

Does anyone have any suggestions? The only thing I found so far involves
using forEachBatch and manually updating my aggregates. But, I think there
should be a simpler solution for this use case.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming - efficient way to do lots of aggregations on the same input files

Jacek Laskowski
Hi Filip,

Care to share the code behind "The only thing I found so far involves using forEachBatch and manually updating my aggregates. "?

I'm not completely sure I understand your use case and hope the code could shed more light on it. Thank you.

On Thu, Jan 21, 2021 at 5:05 PM Filip <[hidden email]> wrote:
Hi,

I'm considering using Apache Spark for the development of an application.
This would replace a legacy program which reads CSV files and does lots
(tens/hundreds) of aggregations on them. The aggregations are fairly simple:
counts, sums, etc. while applying some filtering conditions on some of the
columns.

I prefer using structured streaming for its simplicity and low-latency. I'd
also like to use full SQL queries (via createOrReplaceTempView). However,
doing multiple queries means Spark will re-read the input files for each one
of them. This seems very inefficient for my use-case.

Does anyone have any suggestions? The only thing I found so far involves
using forEachBatch and manually updating my aggregates. But, I think there
should be a simpler solution for this use case.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming - efficient way to do lots of aggregations on the same input files

Filip-2
Hi,

I don't have any code for the forEachBatch approach, I mentioned it due to
this response to my question on SO:
https://stackoverflow.com/a/65803718/1017130

I have added some very simple code below that I think shows what I'm trying
to do:
val schema = StructType(
    Array(
        StructField("senderId1", LongType),
        StructField("senderId2", LongType),
        StructField("destId1", LongType),
    StructField("eventType", IntegerType)
    StructField("cost", LongType)
    )
)

val fileStreamDf = spark.readStream.schema(schema).option("delimiter",
"\t").csv("D:\\SparkTest")

fileStreamDf.createOrReplaceTempView("myTable")

spark.sql("SELECT senderId1, count(*) AS num_events FROM myTable GROUP BY
senderId1 HAVING count(*) >
10000").writeStream.format("console").outputMode("complete").start()
spark.sql("SELECT senderId2, sum(cost) AS total_cost FROM myTable WHERE
eventType = 3 GROUP BY senderId2 HAVING sum(cost) >
500").writeStream.format("console").outputMode("complete").start()
spark.sql("SELECT destId1, count(*) AS num_events WHERE event_type = 5 GROUP
BY destId1 HAVING count(*) >
1000").writeStream.format("console").outputMode("complete").start()

Of course, this is simplified; there are a lot more columns and the queries
should also group by time period, but I didn't want to complicate it.
With this example, I have 3 queries running on the same input files, but
Spark would need to read the files from disk 3 times. These extra reads are
what I'm trying to avoid.
In the real application, the number of queries would be a lot higher and
dynamic (they are generated in response to some configurations made by the
end users).



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]