spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arun Manivannan <>
Subject Re: Equivalent of emptyDataFrame in StructuredStreaming
Date Sat, 17 Nov 2018 09:33:48 GMT
Hi Jungtaek,

Sorry about the delay in my response and thanks a ton for responding.

I am just trying to build a data pipeline which has a bunch of stages. The
goal is to use a Dataset to accumulate the transformation errors that may
happen in the stages of the pipeline.  As a benefit, I can pass only the
filtered Dataframe to the next stage.

The stages look something like this:

val pipelineStages = List(
  new AddRowKeyStage(EvergreenSchema),
  new WriteToHBaseStage(hBaseCatalog),
  new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols),
  new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols),
  new DataTypeValidatorStage(EvergreenSchema),
  new DataTypeCastStage(EvergreenSchema)

Each of the stage's implementation looks something like the following. Some
may return errors or some are just side-effecting. Say, the following stage
(AddRowKeyStage) just adds an UUID column to each row and therefore returns
an empty DataSet[Error]. A DataTypeValidatorStage on the other hand may
return a filled in DataSet[Errors] along with the filtered Dataframe value.

import com.thoughtworks.awayday.ingest.DataFrameOps
import com.thoughtworks.awayday.ingest.UDFs.generateUUID
import com.thoughtworks.awayday.ingest.models.ErrorModels.{DataError,
import com.thoughtworks.awayday.ingest.stages.StageConstants.RowKey
import org.apache.spark.sql.{DataFrame, Encoder, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
class AddRowKeyStage(schemaWithRowKey: StructType)
(implicit spark: SparkSession, encoder: Encoder[DataError])
extends DataStage[DataFrame] {
override val stage: String = getClass.getSimpleName
def apply(dataRecords: DataFrame): DataSetWithErrors[DataFrame] =
def addRowKeys(data: DataFrame): DataSetWithErrors[DataFrame] = {
val colOrder =
val withRowKeyDf = data.withColumn(RowKey, lit(generateUUID()))
val returnDf = _*)
Writer(DataFrameOps.emptyErrors(spark, encoder), returnDf)

For accumulating the errors at each stage, I am using a Writer monad from
the Cats library.  I have made provisions that the combination of errors
happen automatically by implementing a Semigroup for Spark Dataset.  This
way, I could do the following and have two Datasets (one for error and one
for value) when I start the stream.

val validRecordsWithErrors = pipelineStages.foldLeft(initDf) { case
(dfWithErrors, stage) =>
  for {
    df <- dfWithErrors
    applied <- stage(df)
  } yield applied

The validRecords is a combination of both transformation errors (left side)
and the dataframe of records that has successfully passed through the
stages (right)

Now, the tricky bit is this :

val initDf = Writer(*DataFrameOps.emptyErrorStream(spark)*, sourceRawDf)

The "zero" value of the fold and the error value for side-effecting stages
must be an empty stream. With Spark batch, I can always use an
"emptyDataFrame" but I have no clue on how to achieve this in Spark
streaming.  Unfortunately, "emptyDataFrame"  is not "isStreaming" and
therefore I won't be able to union the errors together.

I am sorry if I haven't done a good job in explaining it well.


On Tue, Nov 6, 2018 at 7:34 AM Jungtaek Lim <> wrote:

> Could you explain what you're trying to do? It should have no batch for no
> data in stream, so it will end up to no-op even it is possible.
> - Jungtaek Lim (HeartSaVioR)
> 2018년 11월 6일 (화) 오전 8:29, Arun Manivannan <>님이 작성:
>> Hi,
>> I would like to create a "zero" value for a Structured Streaming
>> Dataframe and unfortunately, I couldn't find any leads.  With Spark batch,
>> I can do a "emptyDataFrame" or "createDataFrame" with "emptyRDD" but with
>> StructuredStreaming, I am lost.
>> If I use the "emptyDataFrame" as the zero value, I wouldn't be able to
>> join them with any other DataFrames in the program because Spark doesn't
>> allow you to mix batch and stream data frames. (isStreaming=false for the
>> Batch ones).
>> Any clue is greatly appreciated. Here are the alternatives that I have at
>> the moment.
>> *1. Reading from an empty file *
>> *Disadvantages : poll is expensive because it involves IO and it's error
>> prone in the sense that someone might accidentally update the file.*
>> val emptyErrorStream = (spark: SparkSession) => {
>>   spark
>>     .readStream
>>     .format("csv")
>>     .schema(DataErrorSchema)
>>     .load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt")
>>     .as[DataError]
>> }
>> *2. Use MemoryStream*
>> *Disadvantages: MemoryStream itself is not recommended for production use because
of the ability to mutate it but I am converting it to DS immediately. So, I am leaning towards
this at the moment. *
>> val emptyErrorStream = (spark:SparkSession) => {
>>   implicit val sqlC = spark.sqlContext
>>   MemoryStream[DataError].toDS()
>> }
>> Cheers,
>> Arun

View raw message