spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Carlo.Allocca <carlo.allo...@open.ac.uk>
Subject SPARK UDF related issue
Date Mon, 25 Jul 2016 21:49:59 GMT
Hi All, 

I am using SPARK 2.0 and I have got the following issue: 

I am able to run the step 1-5 (see below) but not the step 6 which uses an UDF. Actually,
the step 1-5 takes few second and the step 6 looks like that it never ends.

Is there anything wrong?  how should I address it?

Any suggestion/feedback would be very appreciated. 

Many Thanks in advance for your help. 

Best regards, 
Carlo


 

====== Code 


=== STEP 1
        SparkSession spark = SparkSession
                .builder()
                .master("local[2]")
                .appName("DatasetForCaseNew")
                .config("spark.executor.memory", "3g")
                .getOrCreate();


=== STEP 2
        this.spark.udf().register("computeBindingValue", new UDF1<String,Integer>()
{
        @Override
        public Integer call(String newBindingValue) throws Exception {            
            if(newBindingValue.contains("Paperback")) return 1;
            return 2;
            }
        }, DataTypes.IntegerType);

=== STEP 3
        Dataset<Row> cmptDS_TMP = cmptDS
                .select(window(cmptDS.col("created"), "1 hour").as("PRD_TimeWindow#1"),
                        cmptDS.col("asin").as("PRD_asin#1"),
                        cmptDS.col("sale_rank").as("PRD_global_sale_rank")
                );

=== STEP 4
        Dataset<Row> resultProd = prdDS
                .select(
                        prdDS.col("asin").alias("PRD_asin#300"),
                        prdDS.col("rppprice").alias("PRD_rppprice"),
                        prdDS.col("binding").alias("PRD_binding")
                        
                ).distinct().sort("PRD_asin#300");

=== STEP 5
	Dataset<Row> cmptDS_TMP_join_resultProd=cmptDS_TMP
                .join(resultProd, cmptDS_TMP.col("PRD_asin#1").equalTo(resultProd.col("PRD_asin#300")),
"inner");       
        cmptDS_TMP_join_resultProd.show();
        
=== STEP 6
       Dataset<Row> prodWithBindingValue = cmptDS_TMP_join_resultProd.withColumn("PRD_bindingValue",
                callUDF("computeBindingValue", cmptDS_TMP_join_resultProd.col("PRD_binding")));
        prodWithBindingValue.show();


        
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message