spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jungtaek Lim <kabh...@gmail.com>
Subject Re: Spark Streaming
Date Tue, 27 Nov 2018 06:38:41 GMT
You may need to put efforts on triage how much time is spent on each part.
Without such information you are only able to get general tips and tricks.
Please check SQL tab and see DAG graph as well as details (logical plan,
physical plan) to see whether you're happy about these plans.

General tip on quick look of query: avoid using withColumn repeatedly and
try to put them in one select statement. If I'm not mistaken, it is known
as a bit costly since each call would produce a new Dataset. Defining
schema and using "from_json" will eliminate all the call of withColumn"s"
and extra calls of "get_json_object".

- Jungtaek Lim (HeartSaVioR)

2018년 11월 27일 (화) 오후 2:44, Siva Samraj <samraj.mit55@gmail.com>님이 작성:

> Hello All,
>
> I am using Spark 2.3 version and i am trying to write Spark Streaming
> Join. It is a basic join and it is taking more time to join the stream
> data. I am not sure any configuration we need to set on Spark.
>
> Code:
> *************************
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.types.TimestampType
>
> object OrderSalesJoin {
>   def main(args: Array[String]): Unit = {
>
>     setEnvironmentVariables(args(0))
>
>     val order_topic = args(1)
>     val invoice_topic = args(2)
>     val dest_topic_name = args(3)
>
>     val spark =
> SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()
>
>     val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name
>
>     import spark.implicits._
>
>
>     val order_df = spark
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>       .option("subscribe", order_topic)
>       .option("startingOffsets", "latest")
>       .option("failOnDataLoss", "false")
>       .option("kafka.replica.fetch.max.bytes", "15728640")
>       .load()
>
>
>     val invoice_df = spark
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>       .option("subscribe", invoice_topic)
>       .option("startingOffsets", "latest")
>       .option("failOnDataLoss", "false")
>       .option("kafka.replica.fetch.max.bytes", "15728640")
>       .load()
>
>
>     val order_details = order_df
>       .withColumn("s_order_id", get_json_object($"value".cast("String"),
> "$.order_id"))
>       .withColumn("s_customer_id",
> get_json_object($"value".cast("String"), "$.customer_id"))
>       .withColumn("s_promotion_id",
> get_json_object($"value".cast("String"), "$.promotion_id"))
>       .withColumn("s_store_id", get_json_object($"value".cast("String"),
> "$.store_id"))
>       .withColumn("s_product_id", get_json_object($"value".cast("String"),
> "$.product_id"))
>       .withColumn("s_warehouse_id",
> get_json_object($"value".cast("String"), "$.warehouse_id"))
>       .withColumn("unit_cost", get_json_object($"value".cast("String"),
> "$.unit_cost"))
>       .withColumn("total_cost", get_json_object($"value".cast("String"),
> "$.total_cost"))
>       .withColumn("units_sold", get_json_object($"value".cast("String"),
> "$.units_sold"))
>       .withColumn("promotion_cost",
> get_json_object($"value".cast("String"), "$.promotion_cost"))
>       .withColumn("date_of_order",
> get_json_object($"value".cast("String"), "$.date_of_order"))
>       .withColumn("tstamp_trans", current_timestamp())
>       .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
> "yyyyMMddHHmmss").cast(TimestampType))
>       .select($"s_customer_id", $"s_order_id", $"s_promotion_id",
> $"s_store_id", $"s_product_id",
>         $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
>         $"total_cost".cast("integer") as "total_cost",
> $"promotion_cost".cast("integer") as "promotion_cost",
>         $"date_of_order", $"tstamp_trans", $"TIMESTAMP",
> $"units_sold".cast("integer") as "units_sold")
>
>
>     val invoice_details = invoice_df
>       .withColumn("order_id", get_json_object($"value".cast("String"),
> "$.order_id"))
>       .withColumn("invoice_status",
> get_json_object($"value".cast("String"), "$.invoice_status"))
>       .where($"invoice_status" === "Success")
>
>       .withColumn("tstamp_trans", current_timestamp())
>       .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
> "yyyyMMddHHmmss").cast(TimestampType))
>
>
>
>     val order_wm = order_details.withWatermark("tstamp_trans", args(4))
>     val invoice_wm = invoice_details.withWatermark("tstamp_trans", args(5))
>
>     val join_df = order_wm
>       .join(invoice_wm, order_wm.col("s_order_id") ===
> invoice_wm.col("order_id"))
>       .select($"s_customer_id", $"s_promotion_id", $"s_store_id",
> $"s_product_id",
>         $"s_warehouse_id", $"unit_cost", $"total_cost",
>         $"promotion_cost",
>         $"date_of_order",
>         $"units_sold" as "units_sold", $"order_id")
>
>     val final_ids = join_df
>       .withColumn("value", to_json(struct($"s_customer_id",
> $"s_promotion_id", $"s_store_id", $"s_product_id",
>         $"s_warehouse_id", $"unit_cost".cast("Int") as "unit_cost",
> $"total_cost".cast("Int") as "total_cost",
>         $"promotion_cost".cast("Int") as "promotion_cost",
>         $"date_of_order",
>         $"units_sold".cast("Int") as "units_sold", $"order_id")))
>       .dropDuplicates("order_id")
>       .select("value")
>
>
>     val write_df = final_ids
>       .writeStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>       .option("topic", dest_topic_name)
>       .option("checkpointLocation", checkpoint_path)
>       .trigger(Trigger.ProcessingTime("1 second"))
>       .start()
>
>     write_df.awaitTermination()
>
>   }
>
> }
> ****************************
>
> Let me know, it is taking more than a minute for every run. The waiting
> time is keep on increasing as the data grows.
>
> Please let me know, any thing we need to configure to make it fast. I
> tried increase the parallesim.
>
> Executor: tried from <1 to 4> and memory i gave is 3GB. The data flow is
> very less. Even for the single data it is taking time.
>
>
>

Mime
View raw message