spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ajay Chander <itsche...@gmail.com>
Subject Re: Code review / sqlContext Scope
Date Wed, 19 Oct 2016 20:00:36 GMT
Can someone please shed some lights on this. I wrote the below code in
Scala 2.10.5, can someone please tell me if this is the right way of doing
it?


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext

class Test {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)

    sqlContext.sql("set spark.sql.shuffle.partitions=1000");
    sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")

    val dataElementsFile = "hdfs://nameservice/user/ajay/spark/flds.txt"

    //    deDF has only 61 rows
    val deDF = sqlContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()

    deDF.withColumn("ds_nm", lit("UDA")).withColumn("tabl_nm",
lit("TEST_DB.TEST_TABLE")).collect().filter(filterByDataset).map(calculateMetricsAtDELevel).foreach(persistResults)


// if ds_nm starts with 'RAW_' I dont want to process it
def filterByDataset(de: Row): Boolean = {
val datasetName = de.getAs[String]("ds_nm").trim
if (datasetName.startsWith("RAW_")) {
return false
}
else {
return true
}
}

def calculateMetricsAtDELevel(de: Row): DataFrame = {
val dataElement = de.getAs[String]("DataElement").trim
val datasetName = de.getAs[String]("ds_nm").trim
val tableName = de.getAs[String]("tabl_nm").trim

// udaDF holds 107,762,849 Rows * 412 Columns / 105 files in HDFS and 176.5
GB * 3 Replication Factor
val udaDF = sqlContext.sql("SELECT '" + datasetName + "' as ds_nm, cyc_dt,
supplier_proc_i, " +
" '" + dataElement + "' as data_elm, " + dataElement + " as data_elm_val
FROM " + tableName + "")

println("udaDF num Partitions: "+udaDF.toJavaRDD.getNumPartitions)
// udaDF.toJavaRDD.getNumPartitions = 1490

val calculatedMetrics = udaDF.groupBy("ds_nm", "cyc_dt", "supplier_proc_i",
"data_elm", "data_elm_val").count()
println("calculatedMetrics num Partitions: "
+calculatedMetrics.toJavaRDD.getNumPartitions)
// calculatedMetrics.toJavaRDD.getNumPartitions = 1000 since I set it to
sqlContext.sql("set spark.sql.shuffle.partitions=1000");

val adjustedSchemaDF = calculatedMetrics.withColumnRenamed("count",
"derx_val_cnt").withColumn("load_dt", current_timestamp())
println("adjustedSchemaDF num Partitions: "
+adjustedSchemaDF.toJavaRDD.getNumPartitions)
// adjustedSchemaDF.toJavaRDD.getNumPartitions = 1000 as well

return adjustedSchemaDF
}

def persistResults(adjustedSchemaDF: DataFrame) = {
// persist the resukts into Hive table backed by PARQUET
adjustedSchemaDF.write.partitionBy("ds_nm", "cyc_dt").mode("Append"
).insertInto("devl_df2_spf_batch.spf_supplier_trans_metric_detl_base_1")
}

}
}

This is my cluster( Spark 1.6.0 on Yarn, Cloudera 5.7.1) configuration,

Memory -> 4.10 TB

VCores -> 544

I am deploying the application in yarn client mode and the cluster is
set to use Dynamic Memory Allocation.

Any pointers are appreciated.

Thank you


On Sat, Oct 8, 2016 at 1:17 PM, Ajay Chander <itschevva@gmail.com> wrote:

> Hi Everyone,
>
> Can anyone tell me if there is anything wrong with my code flow below ?
> Based on each element from the text file I would like to run a query
> against Hive table and persist results in another Hive table. I want to do
> this in parallel for each element in the file. I appreciate any of your
> inputs on this.
>
> $ cat /home/ajay/flds.txt
> PHARMY_NPI_ID
> ALT_SUPPLIER_STORE_NBR
> MAIL_SERV_NBR
>
> spark-shell  --name hivePersistTest --master yarn --deploy-mode client
>
> val dataElementsFile = "/home/ajay/flds.txt"
> val dataElements = Source.fromFile(dataElementsFile).getLines.toArray
>
> def calculateQuery (de: String)  : DataFrame = {
>   val calculatedQuery = sqlContext.sql("select 'UDA' as ds_nm, cyc_dt, supplier_proc_i
as supplier_proc_id, '" + de + "' as data_elm, " + de + " as data_elm_val," +
>     " count(1) as derx_val_cnt, current_timestamp as load_dt " +
>     "from SPRINT2_TEST2 " +
>     "group by 'UDA', cyc_dt, supplier_proc_i, '" + de + "' , " + de + " ")
>
>   return calculatedQuery
> }
>
> def persistResults (calculatedQuery: DataFrame) = {
>   calculatedQuery.write.insertInto("sprint2_stp1_test2")
> }
>
> dataElements.map(calculateQuery).foreach(persistResults)
>
>
> Thanks.
>
>

Mime
View raw message