spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anwar AliKhan <anwaralikhan...@gmail.com>
Subject Re: When is a Bigint a long and when is a long a long
Date Sat, 27 Jun 2020 20:19:43 GMT
OK Thanks

On Sat, 27 Jun 2020, 17:36 Sean Owen, <srowen@gmail.com> wrote:

> It does not return a DataFrame. It returns Dataset[Long].
> You do not need to collect(). See my email.
>
> On Sat, Jun 27, 2020, 11:33 AM Anwar AliKhan <anwaralikhanuae@gmail.com>
> wrote:
>
>> So the range function actually returns BigInt (Spark SQL type)
>> and the fact Dataset[Long] and printSchema are displaying (toString())
>> Long instead of BigInt needs looking into.
>>
>> Putting that to one side
>>
>> My issue with using collect() to get around the casting of elements
>> returned
>> by range is,  I read some literature which says the collect() returns all
>> the data to the driver
>> and so can likely cause Out Of memory error.
>>
>> Question:
>> Is it correct that collect() behaves that way and can cause Out of memory
>> error ?
>>
>> Obviously it will be better to use  .map for casting because then the
>> work is being done by workers.
>> spark.range(10).map(_.toLong),reduce(_+_)
>> <http://www.backbutton.co.uk/>
>>
>>
>> On Sat, 27 Jun 2020, 15:42 Sean Owen, <srowen@gmail.com> wrote:
>>
>>> There are several confusing things going on here. I think this is part
>>> of the explanation, not 100% sure:
>>>
>>> 'bigint' is the Spark SQL type of an 8-byte long. 'long' is the type
>>> of a JVM primitive. Both are the same, conceptually, but represented
>>> differently internally as they are logically somewhat different ideas.
>>>
>>> The first thing I'm not sure about is why the toString of
>>> Dataset[Long] reports a 'bigint' and printSchema() reports 'long'.
>>> That might be a (cosmetic) bug.
>>>
>>> Second, in Scala 2.12, its SAM support causes calls to reduce() and
>>> other methods, using an Object type, to be ambiguous, because Spark
>>> has long since had Java-friendly overloads that support a SAM
>>> interface for Java callers. Those weren't removed to avoid breakage,
>>> at the cost of having to explicitly tell it what overload you want.
>>> (They are equivalent)
>>>
>>> This is triggered because range() returns java.lang.Longs, not long
>>> primitives (i.e. scala.Long). I assume that is to make it versatile
>>> enough to use in Java too, and because it's hard to write an overload
>>> (would have to rename it)
>>>
>>> But that means you trigger the SAM overload issue.
>>>
>>> Anything you do that makes this a Dataset[scala.Long] resolves it, as
>>> it is no longer ambiguous (Java-friendly Object-friendly overload does
>>> not apply). For example:
>>>
>>> spark.range(10).map(_.toLong).reduce(_+_)
>>>
>>> If you collect(), you still have an Array[java.lang.Long]. But Scala
>>> implicits and conversions make .reduce(_+_) work fine on that; there
>>> is no "Java-friendly" overload in the way.
>>>
>>> Normally all of this just works and you can ignore these differences.
>>> This is a good example of a corner case in which it's inconvenient,
>>> because of the old Java-friendly overloads. This is by design though.
>>>
>>> On Sat, Jun 27, 2020 at 8:29 AM Anwar AliKhan <anwaralikhanuae@gmail.com>
>>> wrote:
>>> >
>>> > As you know I have been puzzling over this issue :
>>> > How come spark.range(100).reduce(_+_)
>>> > worked in earlier spark version but not with the most recent versions.
>>> >
>>> > well,
>>> >
>>> > When you first create a dataset, by default the column "id" datatype
>>> is  [BigInt],
>>> > It is a bit like a coin Long on one side and bigint on the other side.
>>> >
>>> > scala> val myrange = spark.range(1,100)
>>> > myrange: org.apache.spark.sql.Dataset[Long] = [id: bigint]
>>> >
>>> > The Spark framework error message after parsing the reduce(_+_) method
>>> confirms this
>>> > and moreover stresses its constraints of expecting data  type long as
>>> parameter argument(s).
>>> >
>>> > scala> myrange.reduce(_+_)
>>> > <console>:26: error: overloaded method value reduce with alternatives:
>>> >   (func:
>>> org.apache.spark.api.java.function.ReduceFunction[java.lang.Long])java.lang.Long
>>> <and>
>>> >   (func: (java.lang.Long, java.lang.Long) =>
>>> java.lang.Long)java.lang.Long
>>> >  cannot be applied to ((java.lang.Long, java.lang.Long) => scala.Long)
>>> >        myrange.reduce(_+_)
>>> >                ^
>>> >
>>> > But if you ask the printSchema method it disagrees with both of the
>>> above and says the column "id" data is Long.
>>> > scala> range100.printSchema()
>>> > root
>>> >  |-- id: long (nullable = false)
>>> >
>>> > If I ask the collect() method, the collect() method  agrees with
>>> printSchema() that the datatype of column "id" is  Long and not BigInt.
>>> >
>>> > scala> range100.collect()
>>> > res10: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
>>> 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
>>> 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
>>> 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69,
>>> 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88,
>>> 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
>>> >
>>> > To settle the dispute between the methods and get the collect() to
>>> "show me the money" I  called the collect() to pass its return type to
>>> reduce(_+_).
>>> >
>>> > "Here is the money"
>>> > scala> range100.collect().reduce(_+_)
>>> > res11: Long = 4950
>>> >
>>> > The collect() and printSchema methods could be implying  there is no
>>> difference between a Long or  a BingInt.
>>> >
>>> > Questions :  These return type  differentials, are they  by design  or
>>> an oversight  bug ?
>>> > Questions :  Why the change from earlier version to later version ?
>>> > Question   :     Will you be updating the reduce(_+_)  method ?
>>> >
>>> > When it comes to creating a dataset using toDs there is no dispute,
>>> > all the methods agree that it is neither a BigInt or a Long but an int
>>> even integer.
>>> >
>>> > scala> val dataset = Seq(1, 2, 3).toDS()
>>> > dataset: org.apache.spark.sql.Dataset[Int] = [value: int]
>>> >
>>> > scala> dataset.collect()
>>> > res29: Array[Int] = Array(1, 2, 3)
>>> >
>>> > scala> dataset.printSchema()
>>> > root
>>> >  |-- value: integer (nullable = false)
>>> >
>>> > scala> dataset.show()
>>> > +-----+
>>> > |value|
>>> > +-----+
>>> > |    1|
>>> > |    2|
>>> > |    3|
>>> > +-----+
>>> >
>>> > scala> dataset.reduce(_+_)
>>> > res7: Int = 6
>>> >
>>>
>>

Mime
View raw message