spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: Dataset Select Function after Aggregate Error
Date Sat, 18 Jun 2016 15:47:59 GMT
scala> ds.groupBy($"_1").count.select(expr("_1").as[String],
expr("count").as[Long])
res0: org.apache.spark.sql.Dataset[(String, Long)] = [_1: int, count:
bigint]

scala> ds.groupBy($"_1").count.select(expr("_1").as[String],
expr("count").as[Long]).show
+---+-----+
| _1|count|
+---+-----+
|  1|    1|
|  2|    1|
+---+-----+

On Sat, Jun 18, 2016 at 8:29 AM, Pedro Rodriguez <ski.rodriguez@gmail.com>
wrote:

> I am curious if there is a way to call this so that it becomes a compile
> error rather than runtime error:
>
> // Note mispelled count and name
> ds.groupBy($"name").count.select('nam, $"coun").show
>
> More specifically, what are the best type safety guarantees that Datasets
> provide? It seems like with Dataframes there is still the unsafety of
> specifying column names by string/symbol and expecting the type to be
> correct and exist, but if you do something like this then downstream code
> is safer:
>
> // This is Array[(String, Long)] instead of Array[sql.Row]
> ds.groupBy($"name").count.select('name.as[String], 'count.as
> [Long]).collect()
>
> Does that seem like a correct understanding of Datasets?
>
> On Sat, Jun 18, 2016 at 6:39 AM, Pedro Rodriguez <ski.rodriguez@gmail.com>
> wrote:
>
>> Looks like it was my own fault. I had spark 2.0 cloned/built, but had the
>> spark shell in my path so somehow 1.6.1 was being used instead of 2.0.
>> Thanks
>>
>> On Sat, Jun 18, 2016 at 1:16 AM, Takeshi Yamamuro <linguin.m.s@gmail.com>
>> wrote:
>>
>>> which version you use?
>>> I passed in 2.0-preview as follows;
>>> ---
>>>
>>> Spark context available as 'sc' (master = local[*], app id =
>>> local-1466234043659).
>>>
>>> Spark session available as 'spark'.
>>>
>>> Welcome to
>>>
>>>       ____              __
>>>
>>>      / __/__  ___ _____/ /__
>>>
>>>     _\ \/ _ \/ _ `/ __/  '_/
>>>
>>>    /___/ .__/\_,_/_/ /_/\_\   version 2.0.0-preview
>>>
>>>       /_/
>>>
>>>
>>>
>>> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
>>> 1.8.0_31)
>>>
>>> Type in expressions to have them evaluated.
>>>
>>> Type :help for more information.
>>>
>>>
>>> scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>>>
>>> hive.metastore.schema.verification is not enabled so recording the
>>> schema version 1.2.0
>>>
>>> ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int]
>>>
>>> scala> ds.groupBy($"_1").count.select($"_1", $"count").show
>>>
>>> +---+-----+
>>>
>>> | _1|count|
>>>
>>> +---+-----+
>>>
>>> |  1|    1|
>>>
>>> |  2|    1|
>>>
>>> +---+-----+
>>>
>>>
>>>
>>> On Sat, Jun 18, 2016 at 3:09 PM, Pedro Rodriguez <
>>> ski.rodriguez@gmail.com> wrote:
>>>
>>>> I went ahead and downloaded/compiled Spark 2.0 to try your code snippet
>>>> Takeshi. It unfortunately doesn't compile.
>>>>
>>>> scala> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>>>> ds: org.apache.spark.sql.Dataset[(Int, Int)] = [_1: int, _2: int]
>>>>
>>>> scala> ds.groupBy($"_1").count.select($"_1", $"count").show
>>>> <console>:28: error: type mismatch;
>>>>  found   : org.apache.spark.sql.ColumnName
>>>>  required: org.apache.spark.sql.TypedColumn[(org.apache.spark.sql.Row,
>>>> Long),?]
>>>>               ds.groupBy($"_1").count.select($"_1", $"count").show
>>>>                                                                  ^
>>>>
>>>> I also gave a try to Xinh's suggestion using the code snippet below
>>>> (partially from spark docs)
>>>> scala> val ds = Seq(Person("Andy", 32), Person("Andy", 2),
>>>> Person("Pedro", 24), Person("Bob", 42)).toDS()
>>>> scala> ds.groupBy(_.name).count.select($"name".as[String]).show
>>>> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given
>>>> input columns: [];
>>>> scala> ds.groupBy(_.name).count.select($"_1".as[String]).show
>>>> org.apache.spark.sql.AnalysisException: cannot resolve 'name' given
>>>> input columns: [];
>>>> scala> ds.groupBy($"name").count.select($"_1".as[String]).show
>>>> org.apache.spark.sql.AnalysisException: cannot resolve '_1' given input
>>>> columns: [];
>>>>
>>>> Looks like there are empty columns for some reason, the code below
>>>> works fine for the simple aggregate
>>>> scala> ds.groupBy(_.name).count.show
>>>>
>>>> Would be great to see an idiomatic example of using aggregates like
>>>> these mixed with spark.sql.functions.
>>>>
>>>> Pedro
>>>>
>>>> On Fri, Jun 17, 2016 at 9:59 PM, Pedro Rodriguez <
>>>> ski.rodriguez@gmail.com> wrote:
>>>>
>>>>> Thanks Xinh and Takeshi,
>>>>>
>>>>> I am trying to avoid map since my impression is that this uses a Scala
>>>>> closure so is not optimized as well as doing column-wise operations is.
>>>>>
>>>>> Looks like the $ notation is the way to go, thanks for the help. Is
>>>>> there an explanation of how this works? I imagine it is a method/function
>>>>> with its name defined as $ in Scala?
>>>>>
>>>>> Lastly, are there prelim Spark 2.0 docs? If there isn't a good
>>>>> description/guide of using this syntax I would be willing to contribute
>>>>> some documentation.
>>>>>
>>>>> Pedro
>>>>>
>>>>> On Fri, Jun 17, 2016 at 8:53 PM, Takeshi Yamamuro <
>>>>> linguin.m.s@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> In 2.0, you can say;
>>>>>> val ds = Seq[Tuple2[Int, Int]]((1, 0), (2, 0)).toDS
>>>>>> ds.groupBy($"_1").count.select($"_1", $"count").show
>>>>>>
>>>>>>
>>>>>> // maropu
>>>>>>
>>>>>>
>>>>>> On Sat, Jun 18, 2016 at 7:53 AM, Xinh Huynh <xinh.huynh@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Pedro,
>>>>>>>
>>>>>>> In 1.6.1, you can do:
>>>>>>> >> ds.groupBy(_.uid).count().map(_._1)
>>>>>>> or
>>>>>>> >> ds.groupBy(_.uid).count().select($"value".as[String])
>>>>>>>
>>>>>>> It doesn't have the exact same syntax as for DataFrame.
>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset
>>>>>>>
>>>>>>> It might be different in 2.0.
>>>>>>>
>>>>>>> Xinh
>>>>>>>
>>>>>>> On Fri, Jun 17, 2016 at 3:33 PM, Pedro Rodriguez <
>>>>>>> ski.rodriguez@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I am working on using Datasets in 1.6.1 and eventually 2.0
when its
>>>>>>>> released.
>>>>>>>>
>>>>>>>> I am running the aggregate code below where I have a dataset
where
>>>>>>>> the row has a field uid:
>>>>>>>>
>>>>>>>> ds.groupBy(_.uid).count()
>>>>>>>> // res0: org.apache.spark.sql.Dataset[(String, Long)] = [_1:
>>>>>>>> string, _2: bigint]
>>>>>>>>
>>>>>>>> This works as expected, however, attempts to run select statements
>>>>>>>> after fails:
>>>>>>>> ds.groupBy(_.uid).count().select(_._1)
>>>>>>>> // error: missing parameter type for expanded function ((x$2)
=>
>>>>>>>> x$2._1)
>>>>>>>> ds.groupBy(_.uid).count().select(_._1)
>>>>>>>>
>>>>>>>> I have tried several variants, but nothing seems to work.
Below is
>>>>>>>> the equivalent Dataframe code which works as expected:
>>>>>>>> df.groupBy("uid").count().select("uid")
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> --
>>>>>>>> Pedro Rodriguez
>>>>>>>> PhD Student in Distributed Machine Learning | CU Boulder
>>>>>>>> UC Berkeley AMPLab Alumni
>>>>>>>>
>>>>>>>> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
>>>>>>>> Github: github.com/EntilZha | LinkedIn:
>>>>>>>> https://www.linkedin.com/in/pedrorodriguezscience
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> ---
>>>>>> Takeshi Yamamuro
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Pedro Rodriguez
>>>>> PhD Student in Distributed Machine Learning | CU Boulder
>>>>> UC Berkeley AMPLab Alumni
>>>>>
>>>>> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
>>>>> Github: github.com/EntilZha | LinkedIn:
>>>>> https://www.linkedin.com/in/pedrorodriguezscience
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Pedro Rodriguez
>>>> PhD Student in Distributed Machine Learning | CU Boulder
>>>> UC Berkeley AMPLab Alumni
>>>>
>>>> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
>>>> Github: github.com/EntilZha | LinkedIn:
>>>> https://www.linkedin.com/in/pedrorodriguezscience
>>>>
>>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>>
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>

Mime
View raw message