spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeff Zhang <zjf...@gmail.com>
Subject Re: should I file a bug? Re: trouble implementing Transformer and calling DataFrame.withColumn()
Date Wed, 23 Dec 2015 01:20:39 GMT
>>> DataFrame transformerdDF = df.withColumn(fieldName, newCol);
>>> org.apache.spark.sql.AnalysisException: resolved attribute(s) _c0#2
missing from id#0,labelStr#1 in operator !Project [id#0,labelStr#1,_c0#2 AS
transformedByUDF#3];

I don't think it's a spark bug, it is your application bug. I haven't
checked your code details, but from the error message, df's schema is (id,
label) and it doesn't have column _c0. You need to keep that column in df.

On Wed, Dec 23, 2015 at 4:24 AM, Andy Davidson <
Andy@santacruzintegration.com> wrote:

> Hi Jeff
>
> Just a reminder of the original python code I am trying to port to java. I
> think think there is a bug in withColumn(). What do you think should I file
> a bug report?
>
> Also I have to write a lot of code. And go from a DataFrame to a
> JavaRDD<Row>
>
> def convertMultinomialLabelToBinary(dataFrame):
>     newColName = "binomialLabel"
>     binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else “signal",
StringType())
>     ret = dataFrame.withColumn(newColName, binomial(dataFrame["label"]))
>     return ret
>
>
> DataFrame transformerdDF = df.withColumn(fieldName, newCol); raises
>
>  org.apache.spark.sql.AnalysisException: resolved attribute(s) _c0#2
> missing from id#0,labelStr#1 in operator !Project [id#0,labelStr#1,_c0#2 AS
> transformedByUDF#3];
>
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:37)
>
>         at
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
>
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:154)
>
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:49)
>
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:103)
>
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:49)
>
>         at
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
>
>         at
> org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914)
>
>         at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
>
>         at org.apache.spark.sql.DataFrame.org
> $apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
>
>         at org.apache.spark.sql.DataFrame.select(DataFrame.scala:691)
>
>         at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1150)
>
>         at com.pws.fantasySport.ml.UDFTest.test(UDFTest.java:75)
>
>
>
>    @Test
>
>     public void test() {
>
>         logger.info("BEGIN”);
>
>
>         DataFrame df = createData();
>
>         final String tableName = "myTable";
>
>         sqlContext.registerDataFrameAsTable(df, tableName);
>
>
>         logger.info("print schema");
>
>         df.printSchema();
>
>         logger.info("original data before we applied UDF");
>
>         df.show();
>
>
>         MyUDF udf = new MyUDF();
>
>         final String udfName = "myUDF";
>
>         sqlContext.udf().register(udfName, udf, DataTypes.StringType);
>
>
>         String fmt = "SELECT %s(%s) FROM %s";
>
>         String stmt = String.format(fmt, udfName, tableName+".labelStr",
> tableName);
>
>         logger.info("AEDWIP stmt:{}", stmt);
>
>         DataFrame udfDF = sqlContext.sql(stmt);
>
>         Row[] results = udfDF.head(3);
>
>         for (Row row : results) {
>
>             logger.info("row returned by applying UDF {}", row);
>
>         }
>
>
>         logger.info("udfDF schema");
>
>         udfDF.printSchema();
>
>         logger.info("udfDF data");
>
>         udfDF.show();
>
>
>         final String fieldName = "transformedByUDF";
>
>         /*
>
>          * as() does not work
>
>          * // https://issues.apache.org/jira/browse/SPARK-12483
>
>         DataFrame niceUdfDF = udfDF.as(fieldName); // by default the col
> is '_c0'
>
>          niceUdfDF.printSchema();
>
>         logger.info("niceUdfDF data");
>
>         niceUdfDF.show();
>
>         */
>
>
>         // get column from data frame call df.withColumnName
>
>         Column newCol = udfDF.col("_c0");
>
>
>
>         DataFrame transformerdDF = df.withColumn(fieldName, newCol);
>
>         logger.info("print schema after calling df.withColumn()");
>
>         transformerdDF.printSchema();
>
>         logger.info("show() after calling df.withColumn()");
>
>         transformerdDF.show();
>
>
>         logger.info("END");
>
>     }
>
>
>     DataFrame createData() {
>
>         Features f1 = new Features(1, category1);
>
>         Features f2 = new Features(2, category2);
>
>         ArrayList<Features> data = new ArrayList<Features>(2);
>
>         data.add(f1);
>
>         data.add(f2);
>
>         //JavaRDD<Features> rdd =
> javaSparkContext.parallelize(Arrays.asList(f1, f2)); // does not work
>
>         JavaRDD<Features> rdd = javaSparkContext.parallelize(data);
>
>         DataFrame df = sqlContext.createDataFrame(rdd, Features.class);
>
>         return df;
>
>     }
>
>
>     class MyUDF implements UDF1<String, String> {
>
>         @Override
>
>         public String call(String s) throws Exception {
>
>             logger.info("AEDWIP s:{}", s);
>
>             String ret = s.equalsIgnoreCase(category1) ?  category1 :
> category3;
>
>             return ret;
>
>         }
>
>     }
>
>
>     public class Features implements Serializable{
>
>         private static final long serialVersionUID = 1L;
>
>         int id;
>
>         String labelStr;
>
>
>         Features(int id, String l) {
>
>             this.id = id;
>
>             this.labelStr = l;
>
>         }
>
>
>         public int getId() {
>
>             return id;
>
>         }
>
>
>         public void setId(int id) {
>
>             this.id = id;
>
>         }
>
>
>         public String getLabelStr() {
>
>             return labelStr;
>
>         }
>
>
>         public void setLabelStr(String labelStr) {
>
>             this.labelStr = labelStr;
>
>         }
>
>     }
>
>
>
> From: Andrew Davidson <Andy@SantaCruzIntegration.com>
> Date: Monday, December 21, 2015 at 7:47 PM
> To: Jeff Zhang <zjffdu@gmail.com>
> Cc: "user @spark" <user@spark.apache.org>
> Subject: Re: trouble implementing Transformer and calling
> DataFrame.withColumn()
>
> Hi Jeff
>
> I took a look at Tokenizer.cal, UnaryTransformer.scala, and
> Transformer.scala.  How ever I can not figure out how implement createTransformFunc()
> in Java 8.
>
> It would be nice to be able to use this transformer in my pipe line but
> not required. The real problem is I can not figure out how to create a
> Column I can pass to dataFrame.withColumn() in my Java code. Here is my
> original python
>
> binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else “signal", StringType())
>     ret = dataFrame.withColumn(newColName, binomial(dataFrame["label"]))
>
>
> Any suggestions would be greatly appreciated.
>
> Andy
>
> public class LabelToBinaryTransformer
>
>             extends UnaryTransformer<String, String,
> LabelToBinaryTransformer> {
>
>     private static final long serialVersionUID = 4202800448830968904L;
>
>     private  final UUID uid = UUID.randomUUID();
>
>
>     @Override
>
>     public String uid() {
>
>         return uid.toString();
>
>     }
>
>
>     @Override
>
>     public Function1<String, String> createTransformFunc() {
>
> // original python code
>
> // binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else
> “signal", StringType())
>
> Function1 interface is not easy to implement lots of functions
>
>         ???
>
>     }
>
>
>     @Override
>
>     public DataType outputDataType() {
>
>         StringType ret = new StringType();
>
>         return ret;
>
>     }
>
>
>
> }
>
>
> From: Jeff Zhang <zjffdu@gmail.com>
> Date: Monday, December 21, 2015 at 6:43 PM
> To: Andrew Davidson <Andy@SantaCruzIntegration.com>
> Cc: "user @spark" <user@spark.apache.org>
> Subject: Re: trouble implementing Transformer and calling
> DataFrame.withColumn()
>
> In your case, I would suggest you to extends UnaryTransformer which is
> much easier.
>
> Yeah, I have to admit that there's no document about how to write a custom
> Transformer, I think we need to add that, since writing custom Transformer
> is a very typical work in machine learning.
>
> On Tue, Dec 22, 2015 at 9:54 AM, Andy Davidson <
> Andy@santacruzintegration.com> wrote:
>
>>
>> I am trying to port the following python function to Java 8. I would like
>> my java implementation to implement Transformer so I can use it in a
>> pipeline.
>>
>> I am having a heck of a time trying to figure out how to create a Column
>> variable I can pass to DataFrame.withColumn(). As far as I know
>> withColumn() the only way to append a column to a data frame.
>>
>> Any comments or suggestions would be greatly appreciated
>>
>> Andy
>>
>>
>> def convertMultinomialLabelToBinary(dataFrame):
>>     newColName = "binomialLabel"
>>     binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else “signal",
StringType())
>>     ret = dataFrame.withColumn(newColName, binomial(dataFrame["label"]))
>>     return rettrainingDF2 = convertMultinomialLabelToBinary(trainingDF1)
>>
>>
>>
>> public class LabelToBinaryTransformer extends Transformer {
>>
>>     private static final long serialVersionUID = 4202800448830968904L;
>>
>>     private  final UUID uid = UUID.randomUUID();
>>
>>     public String inputCol;
>>
>>     public String outputCol;
>>
>>
>>
>>     @Override
>>
>>     public String uid() {
>>
>>         return uid.toString();
>>
>>     }
>>
>>
>>     @Override
>>
>>     public Transformer copy(ParamMap pm) {
>>
>>         Params xx = defaultCopy(pm);
>>
>>         return ???;
>>
>>     }
>>
>>
>>     @Override
>>
>>     public DataFrame transform(DataFrame df) {
>>
>>         MyUDF myUDF = new MyUDF(myUDF, null, null);
>>
>>         Column c = df.col(inputCol);
>>
>> ??? UDF apply does not take a col????
>>
>>         Column col = myUDF.apply(df.col(inputCol));
>>
>>         DataFrame ret = df.withColumn(outputCol, col);
>>
>>         return ret;
>>
>>     }
>>
>>
>>     @Override
>>
>>     public StructType transformSchema(StructType arg0) {
>>
>>        *??? What is this function supposed to do???*
>>
>>       ???Is this the type of the new output column????
>>
>>     }
>>
>>
>>
>>     class MyUDF extends UserDefinedFunction {
>>
>>         public MyUDF(Object f, DataType dataType, Seq<DataType>
>> inputTypes) {
>>
>>             super(f, dataType, inputTypes);
>>
>>             ??? Why do I have to implement this constructor ???
>>
>>     ??? What are the arguments ???
>>
>>         }
>>
>>
>>
>>         @Override
>>
>>         public
>>
>>         Column apply(scala.collection.Seq<Column> exprs) {
>>
>>     What do you do with a scala seq?
>>
>>             return ???;
>>
>>         }
>>
>>     }
>>
>> }
>>
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>


-- 
Best Regards

Jeff Zhang

Mime
View raw message