spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Otávio Carvalho <otav...@gmail.com>
Subject Spark-shell doesn't see changes coming from Kafka topic
Date Thu, 01 Dec 2016 14:31:12 GMT
Hello hivemind,

I am trying to connect my Spark 2.0.2 cluster to an Apache Kafka 0.10
cluster via spark-shell.

The connection works fine, but it is not able to receive the messages
published to the topic.

It doesn't throw any error, but it is not able to retrieve any message (I
am sure that messages are being published 'cause I am able to read from the
topic from the same machine)

Here follows the spark-shell code/output:

*val ds1 = spark.readStream*
*.format("kafka")*
*.option("subscribe", "clickstream")*
*.option("kafka.bootstrap.servers",
"ec2-54-208-12-171.compute-1.amazonaws.com:9092
<http://ec2-54-208-12-171.compute-1.amazonaws.com:9092>")*
*.option("startingOffsets", "latest")*
*.load*

*// Exiting paste mode, now interpreting.*

*ds1: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5
more fields]*

*scala> val counter = ds1.groupBy("value").count*
*counter: org.apache.spark.sql.DataFrame = [value: binary, count: bigint]*

*scala> import org.apache.spark.sql.streaming.OutputMode.Complete*
*import org.apache.spark.sql.streaming.OutputMode.Complete*

*val query = counter.writeStream*
*  .outputMode(Complete)*
*  .format("console")*
*  .start*

*// Exiting paste mode, now interpreting.*

*query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query -
query-1 [state = ACTIVE]*

*scala> query.status*
*res0: org.apache.spark.sql.streaming.StreamingQueryStatus =*
*Status of query 'query-1'*
*    Query id: 1*
*    Status timestamp: 1480602056895*
*    Input rate: 0.0 rows/sec*
*    Processing rate 0.0 rows/sec*
*    Latency: - ms*
*    Trigger details:*
*        isTriggerActive: true*
*        statusMessage: Finding new data from sources*
*        timestamp.triggerStart: 1480602056894*
*        triggerId: -1*
*    Source statuses [1 source]:*
*        Source 1 - KafkaSource[Subscribe[clickstream]]*
*            Available offset: -*
*            Input rate: 0.0 rows/sec*
*            Processing rate: 0.0 rows/sec*
*            Trigger details:*
*                triggerId: -1*
*    Sink status -
org.apache.spark.sql.execution.streaming.ConsoleSink@54d5b6cb*
*        Committed offsets: [-]*

I am starting the spark-shell as follows:
/root/spark/bin/spark-shell --packages
org.apache.spark:spark-sql-kafka-0-10_2.10:2.0.2

Thanks,
Otávio Carvalho.

-- 
Otávio Carvalho
Consultant Developer
Email ocarvalh@thoughtworks.com
Telephone +55 53 91565742 <+55+53+91565742>
[image: ThoughtWorks]
<http://www.thoughtworks.com/?utm_campaign=ot%C3%A1vio-moraes%20de%20carvalho-signature&utm_medium=email&utm_source=thoughtworks-email-signature-generator>

Mime
View raw message