spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Peter Liu <>
Subject re: spark streaming / AnalysisException on collect()
Date Mon, 30 Apr 2018 21:10:37 GMT
 Hello there,

I have a quick question regarding how to share data (a small data
collection) between a kafka producer and consumer using spark streaming
(spark 2.2):

the data published by a kafka producer is received in order on the kafka
consumer side (see (a) copied below).

however, collect() or cache() on a streaming dataframe does not seem to be
supported (see links in (b) below): i got this:
Exception in thread "DataProducer" org.apache.spark.sql.AnalysisException:
Queries with streaming sources must be executed with writeStream.start();;

My question would be:

--- How can I use the collection data (on a streaming dataframe) arrived on
the consumer side, e.g convert it to an array of objects?
--- Maybe there's another quick way to use kafka for sharing static data
(instead of streaming) between two spark application services (without any
common spark context and session etc.)?

I have copied some code snippet in (c).

It seems to be a very simple use case scenario to share a global collection
between a spark producer and consumer. But I spent entire day to try
various options and gone thru online resources such as

Any help would be very much appreciated!



(a) streaming data (df) received on the consumer side (console sink):

 |-- ad_id: string (nullable = true)
 |-- campaign_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

Batch: 0
|ad_id                               |campaign_id
|timestamp              |

(b) online discussions on unsupported operations on streaming dataframe:

(c) code snippet:


   val rawDf = spark
      .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
      .option("startingOffsets", "earliest")
      .option("subscribe", Variables.CAMPAIGNS_TOPIC)


val mySchema = StructType(Array(
      StructField("ad_id", StringType),
      StructField("campaign_id", StringType)))

    val campaignsDf2 =$"value",
mySchema).as("data"), $"timestamp")
      .select("data.*", "timestamp")


    .trigger(org.apache.spark.sql.streaming.Trigger.Once()) //trigger once
since this is a onetime static data

      val campaignsArrayRows = campaignsDf2.collect()  //<==== not
supported  ====> AnalysisException!

View raw message