How does readStream() and writeStream() work?

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

How does readStream() and writeStream() work?

dddaaa
Hi
I'm wondering how does readStream() and writeStream() work internally
Lets take a simple example:

df = spark.readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", kafka_brokers) \
                .option("subscribe", kafka_topic) \
                .load() \
                .select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))

res =
df2.repartition('date1').writeStream.partitionBy('date1').format(hdfs_format)
\
                .option("checkpointLocation", hdfs_dir + "checkpoint") \
                .option("path",  hdfs_dir + "data") \
                .trigger(processingTime= ' 60 seconds') \
                .start()

This code read a kafka topic and then writes it to hdfs every 60 seconds.

My questions are:
1. when running readStream() what happens? does the spark job do something
or is it just like a "transformation" in spark's terminology and nothing
actually happens until an action is called?
2. writeStream() is started and the wrtiing happend every 60 seconds. Can I
intervene somehow in what happens when the actual writing occurs? for
example, can I write a log message "60 seconds passed, writing bulk to hdfs"
each time ?
3. is it possible to write to the same hdfs file each time the actual
writing occurs? for now it creates a new hdfs file each time.

Thanks.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]