Equivalent of emptyDataFrame in StructuredStreaming

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Equivalent of emptyDataFrame in StructuredStreaming

arunodhaya80
Hi, 

This is going to come off as a silly question with a stream being unbounded but this is problem that I have (created for myself). 

I am trying to build an ETL pipeline and I have a bunch of stages. 

val pipelineStages = List(
new AddRowKeyStage(EvergreenSchema),
new WriteToHBaseStage(hBaseCatalog),
new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols),
new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols),
new DataTypeValidatorStage(EvergreenSchema),
new DataTypeCastStage(EvergreenSchema)
)
I would like to collect the errors at each of these stages into a different stream. I am using a WriterMonad for this.  I have made provisions that the "collection" part of the Monad is also a DataFrame. Now, I would like to do a : 

val validRecords = pipelineStages.foldLeft(initDf) { case (dfWithErrors, stage) =>
for {
df <- dfWithErrors
applied <- stage.apply(df)
} yield applied
}
Now, the tricky bit is this : 

val initDf = Writer(DataFrameOps.emptyErrorStream(spark), sourceRawDf)

The "empty" of the fold must be an empty stream. With Spark batch, I can always use an "emptyDataFrame" but I have no clue on how to achieve this in Spark streaming.  Unfortunately, "emptyDataFrame"  is not "isStreaming" and therefore I won't be able to union the errors together. 

Appreciate if you could give me some pointers. 

Cheers,
Arun
Reply | Threaded
Open this post in threaded view
|

Re: Equivalent of emptyDataFrame in StructuredStreaming

arunodhaya80
I also looked into the MemoryStream as a workaround but I do see that it is not recommended for production use. But then, since I am converting this to a DS immediately, I couldn't see a problem with mutation.

Could you let me know if this is a better idea than to drop in an empty file (which is error friendly).  Even better, is there a cleaner way to create an empty stream.

val emptyErrorStream = (spark:SparkSession) => {
implicit val sqlC = spark.sqlContext
MemoryStream[DataError].toDS()
}

Cheers,
Arun

On Mon, Nov 5, 2018 at 2:41 PM Arun Manivannan <[hidden email]> wrote:
Hi, 

This is going to come off as a silly question with a stream being unbounded but this is problem that I have (created for myself). 

I am trying to build an ETL pipeline and I have a bunch of stages. 

val pipelineStages = List(
new AddRowKeyStage(EvergreenSchema),
new WriteToHBaseStage(hBaseCatalog),
new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols),
new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols),
new DataTypeValidatorStage(EvergreenSchema),
new DataTypeCastStage(EvergreenSchema)
)
I would like to collect the errors at each of these stages into a different stream. I am using a WriterMonad for this.  I have made provisions that the "collection" part of the Monad is also a DataFrame. Now, I would like to do a : 

val validRecords = pipelineStages.foldLeft(initDf) { case (dfWithErrors, stage) =>
for {
df <- dfWithErrors
applied <- stage.apply(df)
} yield applied
}
Now, the tricky bit is this : 

val initDf = Writer(DataFrameOps.emptyErrorStream(spark), sourceRawDf)

The "empty" of the fold must be an empty stream. With Spark batch, I can always use an "emptyDataFrame" but I have no clue on how to achieve this in Spark streaming.  Unfortunately, "emptyDataFrame"  is not "isStreaming" and therefore I won't be able to union the errors together. 

Appreciate if you could give me some pointers. 

Cheers,
Arun