Re: Chaining Spark Streaming Jobs

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: Chaining Spark Streaming Jobs

Sunita
Hi Michael,

I am wondering what I am doing wrong. I get error like:

Exception in thread "main" java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:223)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:125)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:134)
    at com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregates.scala:23)
    at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
    at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook


I tried specifying the schema as well.
Here is my code:

object Aggregates {

val aggregation=
"""select sum(col1), sum(col2), id, first(name)
from enrichedtb
group by id
""".stripMargin

def aggregator(conf:Config)={
implicit val spark = SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
implicit val sqlctx = spark.sqlContext
printf("Source path is" + conf.getString("source.path"))
val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // Added this as it was complaining about schema.
val df=spark.readStream.format("parquet").option("inferSchema", true).schema(schemadf.schema).load(conf.getString("source.path"))
df.createOrReplaceTempView("enrichedtb")
val res = spark.sql(aggregation)
res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
}

def main(args: Array[String]): Unit = {
val mainconf = ConfigFactory.load()
val conf = mainconf.getConfig(mainconf.getString("pipeline"))
print(conf.toString)
aggregator(conf)
}

}


I tried to extract schema from static read of the input path and provided it to the readStream API. With that, I get this error:

at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)

While running on the EMR cluster all paths point to S3. In my laptop, they all point to local filesystem.

I am using Spark2.2.0
Appreciate your help.

regards
Sunita

On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <[hidden email]> wrote:
If you use structured streaming and the file sink, you can have a subsequent stream read using the file source.  This will maintain exactly once processing even if there are hiccups or failures.

On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind <[hidden email]> wrote:
Hello Spark Experts,

I have a design question w.r.t Spark Streaming. I have a streaming job that consumes protocol buffer encoded real time logs from a Kafka cluster on premise. My spark application runs on EMR (aws) and persists data onto s3. Before I persist, I need to strip header and convert protobuffer to parquet (I use sparksql-scalapb to convert from Protobuff to Spark.sql.Row). I need to persist Raw logs as is. I can continue the enrichment on the same dataframe after persisting the raw data, however, in order to modularize I am planning to have a separate job which picks up the raw data and performs enrichment on it. Also,  I am trying to avoid all in 1 job as the enrichments could get project specific while raw data persistence stays customer/project agnostic.The enriched data is allowed to have some latency (few minutes)

My challenge is, after persisting the raw data, how do I chain the next streaming job. The only way I can think of is -  job 1 (raw data) partitions on current date (YYYYMMDD) and within current date, the job 2 (enrichment job) filters for records within 60s of current time and performs enrichment on it in 60s batches. 
Is this a good option? It seems to be error prone. When either of the jobs get delayed due to bursts or any error/exception this could lead to huge data losses and non-deterministic behavior . What are other alternatives to this? 

Appreciate any guidance in this regard.

regards
Sunita Koppar


Reply | Threaded
Open this post in threaded view
|

Re: Chaining Spark Streaming Jobs

vincent gromakowski
What about chaining with akka or akka stream and the fair scheduler ? 

Le 13 sept. 2017 01:51, "Sunita Arvind" <[hidden email]> a écrit :
Hi Michael,

I am wondering what I am doing wrong. I get error like:

Exception in thread "main" java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:223)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:125)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:134)
    at com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregates.scala:23)
    at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
    at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook


I tried specifying the schema as well.
Here is my code:

object Aggregates {

val aggregation=
"""select sum(col1), sum(col2), id, first(name)
from enrichedtb
group by id
""".stripMargin

def aggregator(conf:Config)={
implicit val spark = SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
implicit val sqlctx = spark.sqlContext
printf("Source path is" + conf.getString("source.path"))
val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // Added this as it was complaining about schema.
val df=spark.readStream.format("parquet").option("inferSchema", true).schema(schemadf.schema).load(conf.getString("source.path"))
df.createOrReplaceTempView("enrichedtb")
val res = spark.sql(aggregation)
res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
}

def main(args: Array[String]): Unit = {
val mainconf = ConfigFactory.load()
val conf = mainconf.getConfig(mainconf.getString("pipeline"))
print(conf.toString)
aggregator(conf)
}

}


I tried to extract schema from static read of the input path and provided it to the readStream API. With that, I get this error:

at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)

While running on the EMR cluster all paths point to S3. In my laptop, they all point to local filesystem.

I am using Spark2.2.0
Appreciate your help.

regards
Sunita

On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <[hidden email]> wrote:
If you use structured streaming and the file sink, you can have a subsequent stream read using the file source.  This will maintain exactly once processing even if there are hiccups or failures.

On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind <[hidden email]> wrote:
Hello Spark Experts,

I have a design question w.r.t Spark Streaming. I have a streaming job that consumes protocol buffer encoded real time logs from a Kafka cluster on premise. My spark application runs on EMR (aws) and persists data onto s3. Before I persist, I need to strip header and convert protobuffer to parquet (I use sparksql-scalapb to convert from Protobuff to Spark.sql.Row). I need to persist Raw logs as is. I can continue the enrichment on the same dataframe after persisting the raw data, however, in order to modularize I am planning to have a separate job which picks up the raw data and performs enrichment on it. Also,  I am trying to avoid all in 1 job as the enrichments could get project specific while raw data persistence stays customer/project agnostic.The enriched data is allowed to have some latency (few minutes)

My challenge is, after persisting the raw data, how do I chain the next streaming job. The only way I can think of is -  job 1 (raw data) partitions on current date (YYYYMMDD) and within current date, the job 2 (enrichment job) filters for records within 60s of current time and performs enrichment on it in 60s batches. 
Is this a good option? It seems to be error prone. When either of the jobs get delayed due to bursts or any error/exception this could lead to huge data losses and non-deterministic behavior . What are other alternatives to this? 

Appreciate any guidance in this regard.

regards
Sunita Koppar



Reply | Threaded
Open this post in threaded view
|

Re: Chaining Spark Streaming Jobs

Sunita
Thanks for your suggestion Vincent. Do not have much experience with akka as such. I will explore this option.

On Tue, Sep 12, 2017 at 11:01 PM, vincent gromakowski <[hidden email]> wrote:
What about chaining with akka or akka stream and the fair scheduler ? 

Le 13 sept. 2017 01:51, "Sunita Arvind" <[hidden email]> a écrit :
Hi Michael,

I am wondering what I am doing wrong. I get error like:

Exception in thread "main" java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:223)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:125)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:134)
    at com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregates.scala:23)
    at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
    at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook


I tried specifying the schema as well.
Here is my code:

object Aggregates {

val aggregation=
"""select sum(col1), sum(col2), id, first(name)
from enrichedtb
group by id
""".stripMargin

def aggregator(conf:Config)={
implicit val spark = SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
implicit val sqlctx = spark.sqlContext
printf("Source path is" + conf.getString("source.path"))
val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // Added this as it was complaining about schema.
val df=spark.readStream.format("parquet").option("inferSchema", true).schema(schemadf.schema).load(conf.getString("source.path"))
df.createOrReplaceTempView("enrichedtb")
val res = spark.sql(aggregation)
res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
}

def main(args: Array[String]): Unit = {
val mainconf = ConfigFactory.load()
val conf = mainconf.getConfig(mainconf.getString("pipeline"))
print(conf.toString)
aggregator(conf)
}

}


I tried to extract schema from static read of the input path and provided it to the readStream API. With that, I get this error:

at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)

While running on the EMR cluster all paths point to S3. In my laptop, they all point to local filesystem.

I am using Spark2.2.0
Appreciate your help.

regards
Sunita

On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <[hidden email]> wrote:
If you use structured streaming and the file sink, you can have a subsequent stream read using the file source.  This will maintain exactly once processing even if there are hiccups or failures.

On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind <[hidden email]> wrote:
Hello Spark Experts,

I have a design question w.r.t Spark Streaming. I have a streaming job that consumes protocol buffer encoded real time logs from a Kafka cluster on premise. My spark application runs on EMR (aws) and persists data onto s3. Before I persist, I need to strip header and convert protobuffer to parquet (I use sparksql-scalapb to convert from Protobuff to Spark.sql.Row). I need to persist Raw logs as is. I can continue the enrichment on the same dataframe after persisting the raw data, however, in order to modularize I am planning to have a separate job which picks up the raw data and performs enrichment on it. Also,  I am trying to avoid all in 1 job as the enrichments could get project specific while raw data persistence stays customer/project agnostic.The enriched data is allowed to have some latency (few minutes)

My challenge is, after persisting the raw data, how do I chain the next streaming job. The only way I can think of is -  job 1 (raw data) partitions on current date (YYYYMMDD) and within current date, the job 2 (enrichment job) filters for records within 60s of current time and performs enrichment on it in 60s batches. 
Is this a good option? It seems to be error prone. When either of the jobs get delayed due to bursts or any error/exception this could lead to huge data losses and non-deterministic behavior . What are other alternatives to this? 

Appreciate any guidance in this regard.

regards
Sunita Koppar




Reply | Threaded
Open this post in threaded view
|

Re: Chaining Spark Streaming Jobs

Michael Armbrust
In reply to this post by Sunita
You specify the schema when loading a dataframe by calling spark.read.schema(...)...

On Tue, Sep 12, 2017 at 4:50 PM, Sunita Arvind <[hidden email]> wrote:
Hi Michael,

I am wondering what I am doing wrong. I get error like:

Exception in thread "main" java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:223)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:125)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:134)
    at com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregates.scala:23)
    at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
    at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook


I tried specifying the schema as well.
Here is my code:

object Aggregates {

val aggregation=
"""select sum(col1), sum(col2), id, first(name)
from enrichedtb
group by id
""".stripMargin

def aggregator(conf:Config)={
implicit val spark = SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
implicit val sqlctx = spark.sqlContext
printf("Source path is" + conf.getString("source.path"))
val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // Added this as it was complaining about schema.
val df=spark.readStream.format("parquet").option("inferSchema", true).schema(schemadf.schema).load(conf.getString("source.path"))
df.createOrReplaceTempView("enrichedtb")
val res = spark.sql(aggregation)
res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
}

def main(args: Array[String]): Unit = {
val mainconf = ConfigFactory.load()
val conf = mainconf.getConfig(mainconf.getString("pipeline"))
print(conf.toString)
aggregator(conf)
}

}


I tried to extract schema from static read of the input path and provided it to the readStream API. With that, I get this error:

at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)

While running on the EMR cluster all paths point to S3. In my laptop, they all point to local filesystem.

I am using Spark2.2.0
Appreciate your help.

regards
Sunita

On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <[hidden email]> wrote:
If you use structured streaming and the file sink, you can have a subsequent stream read using the file source.  This will maintain exactly once processing even if there are hiccups or failures.

On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind <[hidden email]> wrote:
Hello Spark Experts,

I have a design question w.r.t Spark Streaming. I have a streaming job that consumes protocol buffer encoded real time logs from a Kafka cluster on premise. My spark application runs on EMR (aws) and persists data onto s3. Before I persist, I need to strip header and convert protobuffer to parquet (I use sparksql-scalapb to convert from Protobuff to Spark.sql.Row). I need to persist Raw logs as is. I can continue the enrichment on the same dataframe after persisting the raw data, however, in order to modularize I am planning to have a separate job which picks up the raw data and performs enrichment on it. Also,  I am trying to avoid all in 1 job as the enrichments could get project specific while raw data persistence stays customer/project agnostic.The enriched data is allowed to have some latency (few minutes)

My challenge is, after persisting the raw data, how do I chain the next streaming job. The only way I can think of is -  job 1 (raw data) partitions on current date (YYYYMMDD) and within current date, the job 2 (enrichment job) filters for records within 60s of current time and performs enrichment on it in 60s batches. 
Is this a good option? It seems to be error prone. When either of the jobs get delayed due to bursts or any error/exception this could lead to huge data losses and non-deterministic behavior . What are other alternatives to this? 

Appreciate any guidance in this regard.

regards
Sunita Koppar



Reply | Threaded
Open this post in threaded view
|

Re: Chaining Spark Streaming Jobs

Sunita
Sorry Michael, I ended up using kafka and missed noticing your message. Yes, I did specify the schema with read.schema and thats when I got:

at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)

regards
Sunita

On Mon, Sep 18, 2017 at 10:15 AM, Michael Armbrust <[hidden email]> wrote:
You specify the schema when loading a dataframe by calling spark.read.schema(...)...

On Tue, Sep 12, 2017 at 4:50 PM, Sunita Arvind <[hidden email]> wrote:
Hi Michael,

I am wondering what I am doing wrong. I get error like:

Exception in thread "main" java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:223)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:125)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:134)
    at com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregates.scala:23)
    at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
    at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook


I tried specifying the schema as well.
Here is my code:

object Aggregates {

val aggregation=
"""select sum(col1), sum(col2), id, first(name)
from enrichedtb
group by id
""".stripMargin

def aggregator(conf:Config)={
implicit val spark = SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
implicit val sqlctx = spark.sqlContext
printf("Source path is" + conf.getString("source.path"))
val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // Added this as it was complaining about schema.
val df=spark.readStream.format("parquet").option("inferSchema", true).schema(schemadf.schema).load(conf.getString("source.path"))
df.createOrReplaceTempView("enrichedtb")
val res = spark.sql(aggregation)
res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
}

def main(args: Array[String]): Unit = {
val mainconf = ConfigFactory.load()
val conf = mainconf.getConfig(mainconf.getString("pipeline"))
print(conf.toString)
aggregator(conf)
}

}


I tried to extract schema from static read of the input path and provided it to the readStream API. With that, I get this error:

at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)

While running on the EMR cluster all paths point to S3. In my laptop, they all point to local filesystem.

I am using Spark2.2.0
Appreciate your help.

regards
Sunita

On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <[hidden email]> wrote:
If you use structured streaming and the file sink, you can have a subsequent stream read using the file source.  This will maintain exactly once processing even if there are hiccups or failures.

On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind <[hidden email]> wrote:
Hello Spark Experts,

I have a design question w.r.t Spark Streaming. I have a streaming job that consumes protocol buffer encoded real time logs from a Kafka cluster on premise. My spark application runs on EMR (aws) and persists data onto s3. Before I persist, I need to strip header and convert protobuffer to parquet (I use sparksql-scalapb to convert from Protobuff to Spark.sql.Row). I need to persist Raw logs as is. I can continue the enrichment on the same dataframe after persisting the raw data, however, in order to modularize I am planning to have a separate job which picks up the raw data and performs enrichment on it. Also,  I am trying to avoid all in 1 job as the enrichments could get project specific while raw data persistence stays customer/project agnostic.The enriched data is allowed to have some latency (few minutes)

My challenge is, after persisting the raw data, how do I chain the next streaming job. The only way I can think of is -  job 1 (raw data) partitions on current date (YYYYMMDD) and within current date, the job 2 (enrichment job) filters for records within 60s of current time and performs enrichment on it in 60s batches. 
Is this a good option? It seems to be error prone. When either of the jobs get delayed due to bursts or any error/exception this could lead to huge data losses and non-deterministic behavior . What are other alternatives to this? 

Appreciate any guidance in this regard.

regards
Sunita Koppar