Equivalent of emptyDataFrame in StructuredStreaming

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Equivalent of emptyDataFrame in StructuredStreaming

arunodhaya80
Hi, 

I would like to create a "zero" value for a Structured Streaming Dataframe and unfortunately, I couldn't find any leads.  With Spark batch, I can do a "emptyDataFrame" or "createDataFrame" with "emptyRDD" but with StructuredStreaming, I am lost. 

If I use the "emptyDataFrame" as the zero value, I wouldn't be able to join them with any other DataFrames in the program because Spark doesn't allow you to mix batch and stream data frames. (isStreaming=false for the Batch ones).

Any clue is greatly appreciated. Here are the alternatives that I have at the moment. 

1. Reading from an empty file 
Disadvantages : poll is expensive because it involves IO and it's error prone in the sense that someone might accidentally update the file.
val emptyErrorStream = (spark: SparkSession) => {
spark
.readStream
.format("csv")
.schema(DataErrorSchema)
.load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt")
.as[DataError]
}

2. Use MemoryStream
Disadvantages: MemoryStream itself is not recommended for production use because of the ability to mutate it but I am converting it to DS immediately. So, I am leaning towards this at the moment. 

val emptyErrorStream = (spark:SparkSession) => {
implicit val sqlC = spark.sqlContext
MemoryStream[DataError].toDS()
}
Cheers,
Arun
Reply | Threaded
Open this post in threaded view
|

Re: Equivalent of emptyDataFrame in StructuredStreaming

Jungtaek Lim
Could you explain what you're trying to do? It should have no batch for no data in stream, so it will end up to no-op even it is possible.

- Jungtaek Lim (HeartSaVioR)

2018년 11월 6일 (화) 오전 8:29, Arun Manivannan <[hidden email]>님이 작성:
Hi, 

I would like to create a "zero" value for a Structured Streaming Dataframe and unfortunately, I couldn't find any leads.  With Spark batch, I can do a "emptyDataFrame" or "createDataFrame" with "emptyRDD" but with StructuredStreaming, I am lost. 

If I use the "emptyDataFrame" as the zero value, I wouldn't be able to join them with any other DataFrames in the program because Spark doesn't allow you to mix batch and stream data frames. (isStreaming=false for the Batch ones).

Any clue is greatly appreciated. Here are the alternatives that I have at the moment. 

1. Reading from an empty file 
Disadvantages : poll is expensive because it involves IO and it's error prone in the sense that someone might accidentally update the file.
val emptyErrorStream = (spark: SparkSession) => {
spark
.readStream
.format("csv")
.schema(DataErrorSchema)
.load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt")
.as[DataError]
}

2. Use MemoryStream
Disadvantages: MemoryStream itself is not recommended for production use because of the ability to mutate it but I am converting it to DS immediately. So, I am leaning towards this at the moment. 

val emptyErrorStream = (spark:SparkSession) => {
implicit val sqlC = spark.sqlContext
MemoryStream[DataError].toDS()
}
Cheers,
Arun
Reply | Threaded
Open this post in threaded view
|

Re: Equivalent of emptyDataFrame in StructuredStreaming

arunodhaya80
Hi Jungtaek, 

Sorry about the delay in my response and thanks a ton for responding. 

I am just trying to build a data pipeline which has a bunch of stages. The goal is to use a Dataset to accumulate the transformation errors that may happen in the stages of the pipeline.  As a benefit, I can pass only the filtered Dataframe to the next stage.

The stages look something like this: 
val pipelineStages = List(
new AddRowKeyStage(EvergreenSchema),
new WriteToHBaseStage(hBaseCatalog),
new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols),
new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols),
new DataTypeValidatorStage(EvergreenSchema),
new DataTypeCastStage(EvergreenSchema)
)
Each of the stage's implementation looks something like the following. Some may return errors or some are just side-effecting. Say, the following stage (AddRowKeyStage) just adds an UUID column to each row and therefore returns an empty DataSet[Error]. A DataTypeValidatorStage on the other hand may return a filled in DataSet[Errors] along with the filtered Dataframe value.


import cats.data.Writer
import com.thoughtworks.awayday.ingest.DataFrameOps
import com.thoughtworks.awayday.ingest.UDFs.generateUUID
import com.thoughtworks.awayday.ingest.models.ErrorModels.{DataError, DataSetWithErrors}
import com.thoughtworks.awayday.ingest.stages.StageConstants.RowKey
import org.apache.spark.sql.{DataFrame, Encoder, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
class AddRowKeyStage(schemaWithRowKey: StructType)
(implicit spark: SparkSession, encoder: Encoder[DataError])
extends DataStage[DataFrame] {
override val stage: String = getClass.getSimpleName
def apply(dataRecords: DataFrame): DataSetWithErrors[DataFrame] = addRowKeys(dataRecords)
def addRowKeys(data: DataFrame): DataSetWithErrors[DataFrame] = {
val colOrder = schemaWithRowKey.fields.map(_.name)
val withRowKeyDf = data.withColumn(RowKey, lit(generateUUID()))
val returnDf = withRowKeyDf.select(colOrder.map(col): _*)
Writer(DataFrameOps.emptyErrors(spark, encoder), returnDf)
}
}


For accumulating the errors at each stage, I am using a Writer monad from the Cats library.  I have made provisions that the combination of errors happen automatically by implementing a Semigroup for Spark Dataset.  This way, I could do the following and have two Datasets (one for error and one for value) when I start the stream. 

val validRecordsWithErrors = pipelineStages.foldLeft(initDf) { case (dfWithErrors, stage) =>
for {
df <- dfWithErrors
applied <- stage(df)
} yield applied
}
The validRecords is a combination of both transformation errors (left side) and the dataframe of records that has successfully passed through the stages (right)

Now, the tricky bit is this : 

val initDf = Writer(DataFrameOps.emptyErrorStream(spark), sourceRawDf)

The "zero" value of the fold and the error value for side-effecting stages must be an empty stream. With Spark batch, I can always use an "emptyDataFrame" but I have no clue on how to achieve this in Spark streaming.  Unfortunately, "emptyDataFrame"  is not "isStreaming" and therefore I won't be able to union the errors together. 

I am sorry if I haven't done a good job in explaining it well. 

Cheers,
Arun



On Tue, Nov 6, 2018 at 7:34 AM Jungtaek Lim <[hidden email]> wrote:
Could you explain what you're trying to do? It should have no batch for no data in stream, so it will end up to no-op even it is possible.

- Jungtaek Lim (HeartSaVioR)

2018년 11월 6일 (화) 오전 8:29, Arun Manivannan <[hidden email]>님이 작성:
Hi, 

I would like to create a "zero" value for a Structured Streaming Dataframe and unfortunately, I couldn't find any leads.  With Spark batch, I can do a "emptyDataFrame" or "createDataFrame" with "emptyRDD" but with StructuredStreaming, I am lost. 

If I use the "emptyDataFrame" as the zero value, I wouldn't be able to join them with any other DataFrames in the program because Spark doesn't allow you to mix batch and stream data frames. (isStreaming=false for the Batch ones).

Any clue is greatly appreciated. Here are the alternatives that I have at the moment. 

1. Reading from an empty file 
Disadvantages : poll is expensive because it involves IO and it's error prone in the sense that someone might accidentally update the file.
val emptyErrorStream = (spark: SparkSession) => {
spark
.readStream
.format("csv")
.schema(DataErrorSchema)
.load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt")
.as[DataError]
}

2. Use MemoryStream
Disadvantages: MemoryStream itself is not recommended for production use because of the ability to mutate it but I am converting it to DS immediately. So, I am leaning towards this at the moment. 

val emptyErrorStream = (spark:SparkSession) => {
implicit val sqlC = spark.sqlContext
MemoryStream[DataError].toDS()
}
Cheers,
Arun