spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Silvio Fiorito <silvio.fior...@granturing.com>
Subject Re: [StructuredStreaming] multiple queries of the socket source: only one query works.
Date Mon, 14 Aug 2017 00:55:36 GMT
Hi Gerard,

Each query has its own distinct query plan and tracks offsets independently from other queries.
Also, each query will generate a dynamic group id to ensure it gets all events and appears
as a new consumer from Kafka’s perspective, that’s done internally to the Kafka source.
That’s why there’s no option for group id as there was with DStreams.

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L75

Thanks,
Silvio

From: Gerard Maas <gerard.maas@gmail.com>
Date: Sunday, August 13, 2017 at 9:50 AM
To: "Shixiong(Ryan) Zhu" <shixiong@databricks.com>
Cc: Rick Moritz <rahvin@gmail.com>, user <user@spark.apache.org>
Subject: Re: [StructuredStreaming] multiple queries of the socket source: only one query works.

Hi Shixiong,

Thanks for the explanation.

In my view, this is different from the intuitive understanding of the Structured Streaming
model [1], where incoming data is appended to an 'unbounded table' and queries are run on
that. I had expected that all queries would run on that 'unbounded table view', while I understand
from your explanation that every query maintains its own 'unbounded table' view of the data
stream. Is that correct?

How is that working in the case of Kafka? We have only one declared consumer, so we should
observe a similar behavior. Yet, the Kafka source is able to deliver multiple output queries.
What is the difference?
Where could I learn more about the internal structured streaming model?

kind regards, Gerard.



[1] https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#basic-concepts

On Sun, Aug 13, 2017 at 1:22 AM, Shixiong(Ryan) Zhu <shixiong@databricks.com<mailto:shixiong@databricks.com>>
wrote:
Spark creates one connection for each query. The behavior you observed is because how "nc
-lk" works. If you use `netstat` to check the tcp connections, you will see there are two
connections when starting two queries. However, "nc" forwards the input to only one connection.

On Fri, Aug 11, 2017 at 10:59 PM, Rick Moritz <rahvin@gmail.com<mailto:rahvin@gmail.com>>
wrote:
Hi Gerard, hi List,

I think what this would entail is for Source.commit to change its funcationality. You would
need to track all streams' offsets there. Especially in the socket source, you already have
a cache (haven't looked at Kafka's implementation to closely yet), so that shouldn't be the
issue, if at start-time all streams subscribed to a source are known.
What I worry about is, that this may need an API-change, to pass a stream-ID into commit.
Since different streams can use different Triggers, you can have any number of unforeseeable
results, when multiple threads call commit.

I'll look into that, since I am in the progress of building a TwitterSource based on the socket
source's general functionality, and due to the API restrictions there, it's even more important
for multiple streams using one source.

What I did observe was that every query did initialize a separate source. This won't work
so well with socket, since the socket is in use, once you try to set up a second one. It also
won't work so well with Twitter, since usually an API key is limited in how often it can be
used somultaneously (likely at 2).

An alternative to the socket source issue would be to open a new free socket, but then the
user has to figure out where the source is listening.

I second Gerard's request for additional information, and confirmation of my theories!

Thanks,

Rick

On Fri, Aug 11, 2017 at 2:53 PM, Gerard Maas <gerard.maas@gmail.com<mailto:gerard.maas@gmail.com>>
wrote:
Hi,

I've been investigating this SO question: https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming

TL;DR: when using the Socket source, trying to create multiple queries does not work properly,
only one the first query in the start order will receive data.

This minimal example reproduces the issue:

val lines = spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", "9999")
    .option("includeTimestamp", true)
    .load()

val q1 = lines.writeStream
  .outputMode("append")
  .format("console")
  .start()

val q2 = lines.withColumn("foo", lit("foo")).writeStream
  .outputMode("append")
  .format("console")
  .start()

Sample output (spark shell):

Batch: 0
-------------------------------------------
+-----+-------------------+
|value|          timestamp|
+-----+-------------------+
|  aaa|2017-08-11 23:37:59|
+-----+-------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-------------------+
|value|          timestamp|
+-----+-------------------+
|  aaa|2017-08-11 23:38:00|
+-----+-------------------+

q1.stop

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+-----+-------------------+---+
|value|          timestamp|foo|
+-----+-------------------+---+
|    b|2017-08-11 23:38:19|foo|
+-----+-------------------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-------------------+---+
|value|          timestamp|foo|
+-----+-------------------+---+
|    b|2017-08-11 23:38:19|foo|
+-----+-------------------+---+

This is certainly unexpected behavior. Even though the socket source is marked "not for production"
I wouldn't expect to be so limited.

Am I right to think that the first running query consumes all the data in the source, and
therefore all the other queries do not work (until the previous ones are stopped)?

Is this a generalized behavior? e.g. each query started on a structured streaming job fully
consumes the source? e.g. the Kafka source can be used with multiple queries because it can
be replayed?

As a workaround, would there be a way to cache the incoming data to multiplex it? We cannot
call `cache` a streaming dataset, but is there a maybe way to do that?

Could I have more details on the execution model (I've consumed all I could find) and what
are the (near) future plans?

thanks!

-Gerard.




Mime
View raw message