spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sunita Arvind <sunitarv...@gmail.com>
Subject Re: Chaining Spark Streaming Jobs
Date Wed, 13 Sep 2017 16:33:45 GMT
Thanks for your suggestion Vincent. Do not have much experience with akka
as such. I will explore this option.

On Tue, Sep 12, 2017 at 11:01 PM, vincent gromakowski <
vincent.gromakowski@gmail.com> wrote:

> What about chaining with akka or akka stream and the fair scheduler ?
>
> Le 13 sept. 2017 01:51, "Sunita Arvind" <sunitarvind@gmail.com> a écrit :
>
> Hi Michael,
>
> I am wondering what I am doing wrong. I get error like:
>
> Exception in thread "main" java.lang.IllegalArgumentException: Schema
> must be specified when creating a streaming source DataFrame. If some files
> already exist in the directory, then depending on the file format you may
> be able to create a static DataFrame on that directory with
> 'spark.read.load(directory)' and infer schema from it.
>     at org.apache.spark.sql.execution.datasources.DataSource.
> sourceSchema(DataSource.scala:223)
>     at org.apache.spark.sql.execution.datasources.DataSource.
> sourceInfo$lzycompute(DataSource.scala:87)
>     at org.apache.spark.sql.execution.datasources.DataSource.
> sourceInfo(DataSource.scala:87)
>     at org.apache.spark.sql.execution.streaming.StreamingRelation$.
> apply(StreamingRelation.scala:30)
>     at org.apache.spark.sql.streaming.DataStreamReader.load(
> DataStreamReader.scala:125)
>     at org.apache.spark.sql.streaming.DataStreamReader.load(
> DataStreamReader.scala:134)
>     at com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregate
> s.scala:23)
>     at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
>     at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
> ssorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
> thodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at com.intellij.rt.execution.application.AppMain.main(AppMain.
> java:144)
> 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook
>
>
> I tried specifying the schema as well.
> Here is my code:
>
> object Aggregates {
>
>   val aggregation=
>     """select sum(col1), sum(col2), id, first(name)
>       from enrichedtb
>       group by id
>     """.stripMargin
>
>   def aggregator(conf:Config)={
>     implicit val spark = SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
>     implicit val sqlctx = spark.sqlContext
>     printf("Source path is" + conf.getString("source.path"))
>     val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // Added this as
it was complaining about schema.
>     val df=spark.readStream.format("parquet").option("inferSchema", true).schema(schemadf.schema).load(conf.getString("source.path"))
>     df.createOrReplaceTempView("enrichedtb")
>     val res = spark.sql(aggregation)
>     res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
>   }
>
>   def main(args: Array[String]): Unit = {
>     val mainconf = ConfigFactory.load()
>     val conf = mainconf.getConfig(mainconf.getString("pipeline"))
>     print(conf.toString)
>     aggregator(conf)
>   }
>
> }
>
>
> I tried to extract schema from static read of the input path and provided it to the readStream
API. With that, I get this error:
>
> at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
> 	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
> 	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
> 	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
> 	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
> 	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)
>
> While running on the EMR cluster all paths point to S3. In my laptop, they all point
to local filesystem.
>
> I am using Spark2.2.0
>
> Appreciate your help.
>
> regards
>
> Sunita
>
>
> On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <michael@databricks.com>
> wrote:
>
>> If you use structured streaming and the file sink, you can have a
>> subsequent stream read using the file source.  This will maintain exactly
>> once processing even if there are hiccups or failures.
>>
>> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind <sunitarvind@gmail.com>
>> wrote:
>>
>>> Hello Spark Experts,
>>>
>>> I have a design question w.r.t Spark Streaming. I have a streaming job
>>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>>> on premise. My spark application runs on EMR (aws) and persists data onto
>>> s3. Before I persist, I need to strip header and convert protobuffer to
>>> parquet (I use sparksql-scalapb to convert from Protobuff to
>>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>>> enrichment on the same dataframe after persisting the raw data, however, in
>>> order to modularize I am planning to have a separate job which picks up the
>>> raw data and performs enrichment on it. Also,  I am trying to avoid all in
>>> 1 job as the enrichments could get project specific while raw data
>>> persistence stays customer/project agnostic.The enriched data is allowed to
>>> have some latency (few minutes)
>>>
>>> My challenge is, after persisting the raw data, how do I chain the next
>>> streaming job. The only way I can think of is -  job 1 (raw data)
>>> partitions on current date (YYYYMMDD) and within current date, the job 2
>>> (enrichment job) filters for records within 60s of current time and
>>> performs enrichment on it in 60s batches.
>>> Is this a good option? It seems to be error prone. When either of the
>>> jobs get delayed due to bursts or any error/exception this could lead to
>>> huge data losses and non-deterministic behavior . What are other
>>> alternatives to this?
>>>
>>> Appreciate any guidance in this regard.
>>>
>>> regards
>>> Sunita Koppar
>>>
>>
>>
>
>

Mime
View raw message