spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Abhijeet Kumar <abhijeet.ku...@sentienz.com>
Subject Re: Spark Streaming join taking long to process
Date Tue, 27 Nov 2018 14:16:40 GMT
Yes, it did 
Thanks for the solution. I solved it locally, but I’m worried how I can do this when I’m
using yarn because that same 15 Sec is taking on the yarn too :)

> On 27-Nov-2018, at 4:42 PM, Srikanth Sriram <sriramsrikanth1985@gmail.com> wrote:
> 
> Hello Abhijeet,
> I am not sure my answer will solve your problem but just think of the master configuration
set for spark application.
> val spark = SparkSession.builder
>       .appName("Argoid_Realtime_Pipeline")
>       .master("local")
>       .getOrCreate()
> I see you have set it has "local" not as "local[*]".
> 
> From other blog, i got this information, sharing you in full sentence:
> "We are going to work locally, running the application straight from our IDE, so we are
setting the master to local[*], meaning we are creating as many threads as there are cores
on the machine."
> 
> Just check if this is reducing the time taken for processing, since by this local[*]
we are going to use all cores available, not just one core?
> 
> Regards,
> Sriram Srikanth
> 
> On Tue, Nov 27, 2018 at 1:46 PM Abhijeet Kumar <abhijeet.kumar@sentienz.com <mailto:abhijeet.kumar@sentienz.com>>
wrote:
> Hi All,
> 
> I'm just practicing Spark Streaming with joining two different stream. I noticed that
it's taking around 15 seconds for each record. Let me share the details and the code:
> 
> 
> 
> If you can see for stage id 2, it's taking 15 s. Isn't this strange.
> 
> Code:
> 
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.types.TimestampType
> import org.apache.log4j.{Level, Logger}
> 
> object StreamJoin{
> 
>   val kafkaTopic1 = "demo2"
>   val kafkaTopic2 = "demo3"
>   val bootstrapServer = "localhost:9092"
> 
>   def main(args: Array[String]): Unit = {
>     val checkPointDir = "hdfs://localhost:8020/checkpo <>"
> 
>     val spark = SparkSession.builder
>       .appName("Argoid_Realtime_Pipeline")
>       .master("local")
>       .getOrCreate()
> 
>     val rootLogger = Logger.getRootLogger()
>     rootLogger.setLevel(Level.ERROR)
> 
>     import spark.implicits._
> 
>     val df1 = spark
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", bootstrapServer)
>       .option("subscribe", kafkaTopic1)
>       .option("failOnDataLoss", "false")
>       .load()
> 
>     val df2 = spark
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", bootstrapServer)
>       .option("subscribe", kafkaTopic2)
>       .option("failOnDataLoss", "false")
>       .load()
> 
>     val order_details = df1
>       .withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id"))
>       .withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id"))
>       .withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id"))
>       .withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id"))
>       .withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id"))
>       .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id"))
>       .withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost"))
>       .withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost"))
>       .withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold"))
>       .withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost"))
>       .withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order"))
>       .withColumn("tstamp_trans", current_timestamp())
>       .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
>       .select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
>         $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
>         $"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer")
as "promotion_cost",
>         $"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer")
as "units_sold")
> 
>     val invoice_details = df2
>       .withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id"))
>       .withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status"))
>       .where($"invoice_status" === "Success")
> 
>       .withColumn("tstamp_trans", current_timestamp())
>       .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
> 
> 
>     val join_df = order_details
>       .join(invoice_details, order_details.col("s_order_id") === invoice_details.col("order_id"))
>       .select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
>         $"s_warehouse_id", $"unit_cost", $"total_cost",
>         $"promotion_cost",
>         $"date_of_order",
>         $"units_sold" as "units_sold", $"order_id")
> 
>     join_df.writeStream
>       .format("console")
>       .option("truncate", false)
>       .start()
>       .awaitTermination()
> 
>   }
> }
> 
> Thanks,
> Abhijeet Kumar
> 
> 
> 
> -- 
> Regards, 
> Srikanth Sriram
> <Screenshot 2018-11-27 at 1.24.47 PM.png><Screenshot 2018-11-27 at 1.24.39 PM.png><Screenshot
2018-11-27 at 1.24.39 PM.png><Screenshot 2018-11-27 at 1.24.39 PM.png>


Mime
View raw message