spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ankit Khettry <justankit2...@gmail.com>
Subject Re: An alternative logic to collaborative filtering works fine but we are facing run time issues in executing the job
Date Wed, 17 Apr 2019 05:05:50 GMT
Hi Balakumar

Two things.

One - It seems like your cluster is running out of memory and then
eventually out of disc , likely while materializing the dataframe to write
(what's the volume of data created by the join?)

Two - Your job is running in local mode, and is able to utilize just the
master node resources.

Try running the job in yarn mode and if the issue persists, try increasing
the disc volumes.

Best Regards
Ankit Khettry

On Wed, 17 Apr, 2019, 9:44 AM Balakumar iyer S, <bala93kumar@gmail.com>
wrote:

> Hi ,
>
>
> While running the following spark code in the cluster with following
> configuration it is spread into  3 job Id's
>
> CLUSTER CONFIGURATION
>
> 3 NODE CLUSTER
>
> NODE 1 - 64GB 16CORES
>
> NODE 2 - 64GB 16CORES
>
> NODE 3 - 64GB 16CORES
>
>
> At Job Id 2 job is stuck at the stage 51 of 254 and then it starts
> utilising the disk space I am not sure why is this happening and my work is
> completely ruined . could someone help me on this
>
> I have attached screen shot of spark stages which are stuck for reference
>
> Please let me know for more questions with the setup and code
> Thanks
>
>
>
> code:
>
>    def main(args: Array[String]) {
>
>     Logger.getLogger("org").setLevel(Level.ERROR)
>
>     val ss = SparkSession
>
>       .builder
>
>       .appName("join_association").master("local[*]")
>
>       .getOrCreate()
>
>       import ss.implicits._
>
>      val dframe = ss.read.option("inferSchema",
> value=true).option("delimiter", ",").csv("in/matrimony.txt")
>
>      dframe.show()
>
>      dframe.printSchema()
>
>      //left_frame
>
>
>
>      val dfLeft = dframe.withColumnRenamed("_c1", "left_data")
>
>
>
>      val dfRight = dframe.withColumnRenamed("_c1", "right_data")
>
>
>
>      //Join
>
>
>
>      val joined = dfLeft.join(dfRight , dfLeft.col("_c0") ===
> dfRight.col("_c0") ).filter(col("left_data") !== col("right_data") )
>
>
>
>       joined.show()
>
>
>
>     val result = joined.select(col("left_data"), col("right_data") as
> "similar_ids" )
>
>
>
>     result.write.csv("/output")
>
>     ss.stop()
>
>
>
>   }
>
>
>
> --
> REGARDS
> BALAKUMAR SEETHARAMAN
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org

Mime
View raw message