spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arun Manivannan <a...@arunma.com>
Subject Re: Equivalent of emptyDataFrame in StructuredStreaming
Date Sat, 17 Nov 2018 09:33:48 GMT
Hi Jungtaek,

Sorry about the delay in my response and thanks a ton for responding.

I am just trying to build a data pipeline which has a bunch of stages. The
goal is to use a Dataset to accumulate the transformation errors that may
happen in the stages of the pipeline.  As a benefit, I can pass only the
filtered Dataframe to the next stage.

The stages look something like this:

val pipelineStages = List(
  new AddRowKeyStage(EvergreenSchema),
  new WriteToHBaseStage(hBaseCatalog),
  new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols),
  new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols),
  new DataTypeValidatorStage(EvergreenSchema),
  new DataTypeCastStage(EvergreenSchema)
)

Each of the stage's implementation looks something like the following. Some
may return errors or some are just side-effecting. Say, the following stage
(AddRowKeyStage) just adds an UUID column to each row and therefore returns
an empty DataSet[Error]. A DataTypeValidatorStage on the other hand may
return a filled in DataSet[Errors] along with the filtered Dataframe value.


import cats.data.Writer
import com.thoughtworks.awayday.ingest.DataFrameOps
import com.thoughtworks.awayday.ingest.UDFs.generateUUID
import com.thoughtworks.awayday.ingest.models.ErrorModels.{DataError,
DataSetWithErrors}
import com.thoughtworks.awayday.ingest.stages.StageConstants.RowKey
import org.apache.spark.sql.{DataFrame, Encoder, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
class AddRowKeyStage(schemaWithRowKey: StructType)
(implicit spark: SparkSession, encoder: Encoder[DataError])
extends DataStage[DataFrame] {
override val stage: String = getClass.getSimpleName
def apply(dataRecords: DataFrame): DataSetWithErrors[DataFrame] =
addRowKeys(dataRecords)
def addRowKeys(data: DataFrame): DataSetWithErrors[DataFrame] = {
val colOrder = schemaWithRowKey.fields.map(_.name)
val withRowKeyDf = data.withColumn(RowKey, lit(generateUUID()))
val returnDf = withRowKeyDf.select(colOrder.map(col): _*)
Writer(DataFrameOps.emptyErrors(spark, encoder), returnDf)
}
}


For accumulating the errors at each stage, I am using a Writer monad from
the Cats library.  I have made provisions that the combination of errors
happen automatically by implementing a Semigroup for Spark Dataset.  This
way, I could do the following and have two Datasets (one for error and one
for value) when I start the stream.

val validRecordsWithErrors = pipelineStages.foldLeft(initDf) { case
(dfWithErrors, stage) =>
  for {
    df <- dfWithErrors
    applied <- stage(df)
  } yield applied
}

The validRecords is a combination of both transformation errors (left side)
and the dataframe of records that has successfully passed through the
stages (right)

Now, the tricky bit is this :

val initDf = Writer(*DataFrameOps.emptyErrorStream(spark)*, sourceRawDf)

The "zero" value of the fold and the error value for side-effecting stages
must be an empty stream. With Spark batch, I can always use an
"emptyDataFrame" but I have no clue on how to achieve this in Spark
streaming.  Unfortunately, "emptyDataFrame"  is not "isStreaming" and
therefore I won't be able to union the errors together.

I am sorry if I haven't done a good job in explaining it well.

Cheers,
Arun



On Tue, Nov 6, 2018 at 7:34 AM Jungtaek Lim <kabhwan@gmail.com> wrote:

> Could you explain what you're trying to do? It should have no batch for no
> data in stream, so it will end up to no-op even it is possible.
>
> - Jungtaek Lim (HeartSaVioR)
>
> 2018년 11월 6일 (화) 오전 8:29, Arun Manivannan <arun@arunma.com>님이 작성:
>
>> Hi,
>>
>> I would like to create a "zero" value for a Structured Streaming
>> Dataframe and unfortunately, I couldn't find any leads.  With Spark batch,
>> I can do a "emptyDataFrame" or "createDataFrame" with "emptyRDD" but with
>> StructuredStreaming, I am lost.
>>
>> If I use the "emptyDataFrame" as the zero value, I wouldn't be able to
>> join them with any other DataFrames in the program because Spark doesn't
>> allow you to mix batch and stream data frames. (isStreaming=false for the
>> Batch ones).
>>
>> Any clue is greatly appreciated. Here are the alternatives that I have at
>> the moment.
>>
>> *1. Reading from an empty file *
>> *Disadvantages : poll is expensive because it involves IO and it's error
>> prone in the sense that someone might accidentally update the file.*
>>
>> val emptyErrorStream = (spark: SparkSession) => {
>>   spark
>>     .readStream
>>     .format("csv")
>>     .schema(DataErrorSchema)
>>     .load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt")
>>     .as[DataError]
>> }
>>
>> *2. Use MemoryStream*
>>
>> *Disadvantages: MemoryStream itself is not recommended for production use because
of the ability to mutate it but I am converting it to DS immediately. So, I am leaning towards
this at the moment. *
>>
>>
>> val emptyErrorStream = (spark:SparkSession) => {
>>   implicit val sqlC = spark.sqlContext
>>   MemoryStream[DataError].toDS()
>> }
>>
>> Cheers,
>> Arun
>>
>

Mime
View raw message