Structured Streaming: emitted record count

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Structured Streaming: emitted record count

aravias
     In structured streaming, the QueryProgressEvent does not seem to have
the final emitted record count to the destination, I see only the number of
input rows. I was trying to use the count (additional action after
persisting the dataset), but I face the below exception when calling persist
or count on the dataset before the query is started. I have a sample code
below, please suggest how to get the query running and the final count.

"Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming
sources must be executed with writeStream.start();;"


   Dataset<PropertyPageView> data = transform(kafkaTopic, SPECIFIC_AVRO,
                kafkaStreamSet, UserSessionEventJoin.class,
                PropertyPageView.class, (Function<UserSessionEventJoin,
Boolean> & Serializable)
                  (UserSessionEventJoin userSessionEventJoin) ->  {
                    UserEvent userEvent =
userSessionEventJoin.getUserEvent();
                    if (userEvent != null &&
TYPE_PAGE_VIEW.equalsIgnoreCase(userEvent.getType())) {
                        if (userEvent.getPayloadMap() != null) {
                            return PAGE_TYPE_PROPERTY.equalsIgnoreCase(
userEvent.getPayloadMap().get(PAGE_TYPE));
                        }
                    }
                    return false;
                }

            );

        data.persist(StorageLevel.MEMORY_AND_DISK());
        log.info("dataset persisted");
 
     
        long emittedCount = data.count();

        Map<String,String> metricTags = new HashMap<>();
        metricTags.put("source",kafkaTopic);
        metricTags.put("destination",sinkPath);
        DataMonitorMetric recordsWrittenMetric = dataMonitorUtils
            .buildDataMonitorMetricWithValue(null,
System.currentTimeMillis(),
                "numOutputRows", metricTags, Aspect.EMITTED, emittedCount);
        dataMonitorUtils.sendMetric(recordsWrittenMetric);

        StreamingQuery streamingQuery =
data.writeStream().outputMode("append")
            .format("parquet")
            .option("checkpointLocation",
"file:///Users/asethurathnam/Downloads/parquet/checkpoint")
            .trigger(Trigger.ProcessingTime(1000, TimeUnit.MILLISECONDS))
            .partitionBy("eventDate")
           
.start("file:///Users/asethurathnam/Downloads/parquet/output-parquet");


        data.unpersist();
        log.info("dataset unpersisted");

        streamingQuery.awaitTermination();






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]