spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Abhijeet Kumar <abhijeet.ku...@sentienz.com>
Subject Spark Streaming join taking long to process
Date Tue, 27 Nov 2018 08:15:31 GMT
Hi All,

I'm just practicing Spark Streaming with joining two different stream. I noticed that it's
taking around 15 seconds for each record. Let me share the details and the code:



If you can see for stage id 2, it's taking 15 s. Isn't this strange.

Code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.TimestampType
import org.apache.log4j.{Level, Logger}

object StreamJoin{

  val kafkaTopic1 = "demo2"
  val kafkaTopic2 = "demo3"
  val bootstrapServer = "localhost:9092"

  def main(args: Array[String]): Unit = {
    val checkPointDir = "hdfs://localhost:8020/checkpo"

    val spark = SparkSession.builder
      .appName("Argoid_Realtime_Pipeline")
      .master("local")
      .getOrCreate()

    val rootLogger = Logger.getRootLogger()
    rootLogger.setLevel(Level.ERROR)

    import spark.implicits._

    val df1 = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServer)
      .option("subscribe", kafkaTopic1)
      .option("failOnDataLoss", "false")
      .load()

    val df2 = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServer)
      .option("subscribe", kafkaTopic2)
      .option("failOnDataLoss", "false")
      .load()

    val order_details = df1
      .withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id"))
      .withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id"))
      .withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id"))
      .withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id"))
      .withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id"))
      .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id"))
      .withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost"))
      .withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost"))
      .withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold"))
      .withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost"))
      .withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order"))
      .withColumn("tstamp_trans", current_timestamp())
      .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
      .select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
        $"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer") as
"promotion_cost",
        $"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer") as
"units_sold")

    val invoice_details = df2
      .withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id"))
      .withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status"))
      .where($"invoice_status" === "Success")

      .withColumn("tstamp_trans", current_timestamp())
      .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))


    val join_df = order_details
      .join(invoice_details, order_details.col("s_order_id") === invoice_details.col("order_id"))
      .select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost", $"total_cost",
        $"promotion_cost",
        $"date_of_order",
        $"units_sold" as "units_sold", $"order_id")

    join_df.writeStream
      .format("console")
      .option("truncate", false)
      .start()
      .awaitTermination()

  }
}

Thanks,
Abhijeet Kumar


Mime
View raw message