spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vincent gromakowski <>
Subject Re: Chaining Spark Streaming Jobs
Date Wed, 13 Sep 2017 06:01:44 GMT
What about chaining with akka or akka stream and the fair scheduler ?

Le 13 sept. 2017 01:51, "Sunita Arvind" <> a écrit :

Hi Michael,

I am wondering what I am doing wrong. I get error like:

Exception in thread "main" java.lang.IllegalArgumentException: Schema must
be specified when creating a streaming source DataFrame. If some files
already exist in the directory, then depending on the file format you may
be able to create a static DataFrame on that directory with
'' and infer schema from it.
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(
    at org.apache.spark.sql.streaming.DataStreamReader.
    at org.apache.spark.sql.streaming.DataStreamReader.
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(
    at java.lang.reflect.Method.invoke(
    at com.intellij.rt.execution.application.AppMain.main(
17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook

I tried specifying the schema as well.
Here is my code:

object Aggregates {

  val aggregation=
    """select sum(col1), sum(col2), id, first(name)
      from enrichedtb
      group by id

  def aggregator(conf:Config)={
    implicit val spark =
    implicit val sqlctx = spark.sqlContext
    printf("Source path is" + conf.getString("source.path"))
    val schemadf ="source.path"))
// Added this as it was complaining about schema.
    val df=spark.readStream.format("parquet").option("inferSchema",
    val res = spark.sql(aggregation)

  def main(args: Array[String]): Unit = {
    val mainconf = ConfigFactory.load()
    val conf = mainconf.getConfig(mainconf.getString("pipeline"))


I tried to extract schema from static read of the input path and
provided it to the readStream API. With that, I get this error:

at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)

While running on the EMR cluster all paths point to S3. In my laptop,
they all point to local filesystem.

I am using Spark2.2.0

Appreciate your help.



On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <>

> If you use structured streaming and the file sink, you can have a
> subsequent stream read using the file source.  This will maintain exactly
> once processing even if there are hiccups or failures.
> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind <>
> wrote:
>> Hello Spark Experts,
>> I have a design question w.r.t Spark Streaming. I have a streaming job
>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>> on premise. My spark application runs on EMR (aws) and persists data onto
>> s3. Before I persist, I need to strip header and convert protobuffer to
>> parquet (I use sparksql-scalapb to convert from Protobuff to
>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>> enrichment on the same dataframe after persisting the raw data, however, in
>> order to modularize I am planning to have a separate job which picks up the
>> raw data and performs enrichment on it. Also,  I am trying to avoid all in
>> 1 job as the enrichments could get project specific while raw data
>> persistence stays customer/project agnostic.The enriched data is allowed to
>> have some latency (few minutes)
>> My challenge is, after persisting the raw data, how do I chain the next
>> streaming job. The only way I can think of is -  job 1 (raw data)
>> partitions on current date (YYYYMMDD) and within current date, the job 2
>> (enrichment job) filters for records within 60s of current time and
>> performs enrichment on it in 60s batches.
>> Is this a good option? It seems to be error prone. When either of the
>> jobs get delayed due to bursts or any error/exception this could lead to
>> huge data losses and non-deterministic behavior . What are other
>> alternatives to this?
>> Appreciate any guidance in this regard.
>> regards
>> Sunita Koppar

View raw message