spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nguyen duc tuan <newvalu...@gmail.com>
Subject Re: how to add a column according to an existing column of a dataframe?
Date Thu, 30 Jun 2016 14:05:32 GMT
About spark issue that you refer to, it's is not related to your problem :D
In this case, you only have to to is using withColumn function. For example:
import org.apache.spark.sql.functions._
val getRange = udf((x: Int) => get price range code ...)
val priceRange = resultPrice.withColumn("range", getRange($"price"))

About code efficiency, this function can be as simply as:
val getRange = udf((x: String) =>
  if (x == null || x.length == 0) 0
else if (x.contains('万')) 22
else {
  val intVal = x.toInt
  if (intVal > 20000) 21
else (x / 5000) + 1
}
)

In general, you can store all the boundary values of the ranges in an array
(in order) then loop through all the values (or more efficency using binary
search) to find what range that value belongs to.

2016-06-30 20:08 GMT+07:00 <luohui20001@sina.com>:

> hi guys,
>      I have a dataframe with 3 columns, id(int) ,type(string)
> ,price(string) , and I want to add a column "price range", according to the
> value of price.
>      I checked the SPARK-15383
> <https://issues.apache.org/jira/browse/SPARK-15383>, however in my code I
> just want to append a column, which is transforming from the original
> dataframe "resultprice",  to resultprice
>      Is there a better way to do this? No matter the implements or code
> efficiency.
>      Will pattern matching help ? and How?
>      Thank you guys.
>
> here is my code:
>
>     val priceRange = resultprice.select("price").map { x =>
>       if (x.getString(0).trim == "null"||x.getString(0).trim == "")
> x(0).toString().trim.+("|0") else
>       if (x.getString(0).trim.contains('万'))
> x(0).toString().trim.replaceAll("万", "0000").+("|22") else
>       if (x.getString(0).trim.toInt < 5000) x(0).toString().+("|1") else
>       if (x.getString(0).trim.toInt >= 5000  && x.getString(0).trim.toInt
> < 10000) x(0).toString().trim+("|2") else
>       if (x.getString(0).trim.toInt >= 10000 && x.getString(0).trim.toInt
> < 15000) x(0).toString().trim.+("|3") else
>       if (x.getString(0).trim.toInt >= 15000 && x.getString(0).trim.toInt
> < 20000) x(0).toString().trim.+("|4") else
>       if (x.getString(0).trim.toInt >= 20000 && x.getString(0).trim.toInt
> < 25000) x(0).toString().trim.+("|5") else
>       if (x.getString(0).trim.toInt >= 25000 && x.getString(0).trim.toInt
> < 30000) x(0).toString().trim.+("|6") else
>       if (x.getString(0).trim.toInt >= 30000 && x.getString(0).trim.toInt
> < 35000) x(0).toString().trim.+("|7") else
>       if (x.getString(0).trim.toInt >= 35000 && x.getString(0).trim.toInt
> < 40000) x(0).toString().trim.+("|8") else
>       if (x.getString(0).trim.toInt >= 40000 && x.getString(0).trim.toInt
> < 45000) x(0).toString().trim.+("|9") else
>       if (x.getString(0).trim.toInt >= 45000 && x.getString(0).trim.toInt
> < 50000) x(0).toString().trim.+("|10") else
>       if (x.getString(0).trim.toInt >= 50000 && x.getString(0).trim.toInt
> < 55000) x(0).toString().trim.+("|11") else
>       if (x.getString(0).trim.toInt >= 55000 && x.getString(0).trim.toInt
> < 60000) x(0).toString().trim.+("|12") else
>       if (x.getString(0).trim.toInt >= 60000 && x.getString(0).trim.toInt
> < 65000) x(0).toString().trim.+("|13") else
>       if (x.getString(0).trim.toInt >= 65000 && x.getString(0).trim.toInt
> < 70000) x(0).toString().trim.+("|14") else
>       if (x.getString(0).trim.toInt >= 70000 && x.getString(0).trim.toInt
> < 75000) x(0).toString().trim.+("|15") else
>       if (x.getString(0).trim.toInt >= 75000 && x.getString(0).trim.toInt
> < 80000) x(0).toString().trim.+("|16") else
>       if (x.getString(0).trim.toInt >= 80000 && x.getString(0).trim.toInt
> < 85000) x(0).toString().trim.+("|17") else
>       if (x.getString(0).trim.toInt >= 85000 && x.getString(0).trim.toInt
> < 90000) x(0).toString().trim.+("|18") else
>       if (x.getString(0).trim.toInt >= 90000 && x.getString(0).trim.toInt
> < 95000) x(0).toString().trim.+("|19") else
>       if (x.getString(0).trim.toInt >= 95000 && x.getString(0).trim.toInt
> < 100000) x(0).toString().trim.+("|20") else
>       if (x.getString(0).trim.toInt >= 100000)
> x(0).toString().trim.+("|21")
>     }
>     priceRange.collect().foreach(println)
>     case class PriceRange(price:String,priceRange:Int)
>     val priceRange2 = priceRange.map(_.toString().split("|")).map { p =>
> PriceRange(p(0), p(1).trim.toInt)}.toDF()
>     val priceRangeCol = priceRange2.apply("priceRange")
>     val finalPrice = resultprice.withColumn("priceRange", priceRangeCol)
>
> here is the stacktrace:
> scala>     val finalPrice = resultprice.withColumn("priceRange",
> priceRangeCol)
> org.apache.spark.sql.AnalysisException: resolved attribute(s)
> priceRange#2629 missing from lp_loupan_id#1,price_type#26,price#101 in
> operator !Project [lp_loupan_id#1,price_type#26,price#101,priceRange#2629
> AS priceRange#2630];
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
>         at
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:183)
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:121)
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
>         at
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
>         at
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
>         at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
>         at org.apache.spark.sql.DataFrame.org
> $apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2126)
>         at org.apache.spark.sql.DataFrame.select(DataFrame.scala:707)
>         at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1188)
>         at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
>         at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61)
>         at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63)
>         at $iwC$$iwC$$iwC$$iwC.<init>(<console>:65)
>         at $iwC$$iwC$$iwC.<init>(<console>:67)
>         at $iwC$$iwC.<init>(<console>:69)
>         at $iwC.<init>(<console>:71)
>         at <init>(<console>:73)
>         at .<init>(<console>:77)
>         at .<clinit>(<console>)
>         at .<init>(<console>:7)
>         at .<clinit>(<console>)
>         at $print(<console>)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>         at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
>         at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>         at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>         at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>         at
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
>         at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
>         at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
>         at
> org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
>         at
> org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
>         at org.apache.spark.repl.SparkILoop.org
> $apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
>         at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
>         at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
>         at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
>         at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>         at org.apache.spark.repl.SparkILoop.org
> $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
>         at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
>         at org.apache.spark.repl.Main$.main(Main.scala:31)
>         at org.apache.spark.repl.Main.main(Main.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
>         at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>         at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
> --------------------------------
>
> Thanks&amp;Best regards!
> San.Luo
>

Mime
View raw message