spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mustafa Elbehery <elbeherymust...@gmail.com>
Subject Persist DStream into a single file on HDFS
Date Thu, 28 Sep 2017 14:58:07 GMT
Hi Folks,

I am writing a pipeline which reads from Kafka, applying some
transformations, then persist to HDFS.

Obviously such operation is not supported to DStream, since the
*DStream.save*(Path)
*method,
considers the Path as a directory, not a file. Also using
*repartition(1).mode(Savemode.APPEND) *before persisting did not work out.

Any thought how to solve such issue ? Below is a code snippet.


{

val inputStream = kafkaStreamingUtil.streamConsume(streamingContext,
Set(srcTopic), consumerGroupId).filter(_.value().matches(youtubeRegex)).map(_.value())

inputStream.foreachRDD(rdd => {

  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._
  val rddAsDataFrame = rdd.toDF()

  rddAsDataFrame.coalesce(1).write.mode(SaveMode.Append).csv(dstPath)
})

Mime
View raw message