spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Saulo Sobreiro <>
Subject Re: [Spark2.1] SparkStreaming to Cassandra performance problem
Date Mon, 30 Apr 2018 12:24:27 GMT
Hi Javier,

I will try to implement this in scala then. As far as I can see in the documentation there
is no SaveToCassandra in the python interface unless you are working with dataframes and the
kafkaStream instance does not provide methods to convert an RDD into DF.

Regarding my table, it is very simple (see below). Can I change something to make it write
CREATE TABLE test_hdpkns.measurement (
  mid bigint,
  tt timestamp,
  in_tt timestamp,
  out_tt timestamp,
  sensor_id int,
  measure double,
  PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
) with compact storage;

The system CPU while the demo is running is almost always at 100% for both cores.

Thank you.

Best Regards,

On 29/04/2018 20:46:30, Javier Pareja <> wrote:

Hi Saulo,

I meant using this to save:

But it might be slow on a different area.
Another point is that Cassandra and spark running on the same machine might compete for resources
which will slow down the insert. You can check the CPU usage of the machine at the time. Also
the design of the table schema can make a big difference.

On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <<>>
Hi Javier,

I removed the map and used "map" directly instead of using transform, but the kafkaStream
is created with KafkaUtils which does not have a method to save to cassandra directly.

Do you know any workarround for this?

Thank you for the suggestion.

Best Regards,

On 29/04/2018 17:03:24, Javier Pareja <<>>

Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain performance right
away. Because of this, I would not use a transform, just directly the map.
I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra
without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <<>>

Hi all,

I am implementing a use case where I read some sensor data from Kafka with SparkStreaming
interface (KafkaUtils.createDirectStream) and, after some transformations, write the output
(RDD) to Cassandra.

Everything is working properly but I am having some trouble with the performance. My kafka
topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app
takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space.
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other than getting more
resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I tried some
different cassandra configurations following this link:
(recommended in stackoverflow for similar questions).

Thank you in advance,

Best Regards,

=============== # CODE # =================================
# run command:
# spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
 --conf'localhost' --num-executors 2 --executor-cores 2
localhost:6667 test_topic2

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
    (mid, tt, in_tt, sid, mv) = parseData( record )
    return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
    rdd2 = lambda w: recordHandler(w[1]) )
    if rdd2.count() > 0:
        return rdd2

def casssave(time, rdd):
    rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
        .setAppName("SensorDataStreamHandler") \
        .setMaster("local[*]") \
        .set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"": brokers})

kafkaStream \
    .transform(process) \



View raw message