ignite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Valentin Kulichenko <valentin.kuliche...@gmail.com>
Subject Re: Spark DataFrames With Cache Key and Value Objects
Date Wed, 01 Aug 2018 19:17:29 GMT
Stuart,

>From API standpoint Dataframe is just a Dataset of Rows, so they have the
same set of methods.

You have a valid point, by it's valid for both Dataset and Dataframe in the
same way. If you provide a function that filter Rows in a Dataframe,
current integration would also not take advantage of Ignite indexing.

In your example you should rather do 'filter(col("age") = 20)' or
'filter("age = 20")' in which case we can do our optimizations. Again, I'm
sure that would work in exact same way with Dataframes and Datasets, we
just need to provide proper support for the latter.

-Val

On Wed, Aug 1, 2018 at 11:52 AM Stuart Macdonald <stuwee@stuwee.org> wrote:

> Val,
>
> Happy to clarify my thoughts. Let’s take an example, say we have an Ignite
> cache of Person objects, so in Nikolay’s Ignite Spark SQL implementation you
> can currently obtain a DataFrame with a column called “age” because that’s
> been registered as a field in Ignite. Then you can do something like this:
>
> Dataset<Row> = spark.sql(“select * from Person where age = 20”)
>
> And Spark will extract the age predicate and pass it back to the Ignite
> Relation implementation to perform the predication. This is useful because
> Ignite can index the age column and optimise the query appropriately.
>
> Now what I’m talking about being able to do is:
>
> Dataset<Person> = spark.sql(“select _val from Person where age =
> 20”).as(ignite.encoder(Person.class))
>
> This would also push the predicate to Ignite but then additionally allow
> access to the actual cache value objects in order to perform further local
> Spark operations on data which hasn’t been — or can’t be — registered as a
> field.
>
> Of course even without Nikolay’s implementation you can just grab a
> Dataset of Person objects from Ignite and then do something like:
>
> dataset.filter(p -> p.age = 20)
>
> But this has no way of passing that predicate back to Ignite, because the
> filter doesn’t go through Catalyst (which is the Spark engine which
> performs pushdown). Even if you convert that dataset to a dataframe and do
> a select() there is no way of pushing down that predicate. Spark has to
> have obtained the schema and dataframe from Ignite’s SQL relation
> implementation in order for the Spark Catalyst implementation to perform
> predicate pushdown.
>
> I’m not saying that we need to use Ignite’s _key or _val columns in order
> for this to work (though I have no other proposals as to how to make this
> work), but I am saying that an API which provides a Dataset<K, V> will not
> to the best of my knowledge allow for predicate pushdown to Ignite.
>
> I’m likewise keen to hear Nikolay’s point of view as he is obviously the
> expert.
>
> Thanks for your help so far.
>
> Stuart.
>
> On 1 Aug 2018, at 18:17, Valentin Kulichenko <
> valentin.kulichenko@gmail.com> wrote:
>
> Stuart,
>
> I don't see a reason why it would work with DataFrames, but not with
> Datasets - they are pretty much the same thing. If you have any particular
> thoughts on this, please let us know.
>
> In any case, I would like to hear from Nikolay as he is an implementor of
> this functionality. Nikolay, please share your thoughts on my suggestion
> above.
>
> -Val
>
> On Wed, Aug 1, 2018 at 12:05 AM Stuart Macdonald <stuwee@stuwee.org>
> wrote:
>
>> I believe suggested approach will not work with the Spark SQL
>> relational optimisations which perform predicate pushdown from Spark
>> to Ignite. For that to work we need both the key/val and the
>> relational fields in a dataframe schema.
>>
>> Stuart.
>>
>> > On 1 Aug 2018, at 04:23, Valentin Kulichenko <
>> valentin.kulichenko@gmail.com> wrote:
>> >
>> > I don't think there are exact plans to remove _key and _value fields as
>> > it's pretty hard considering the fact that many users use them and that
>> > they are deeply integrated into the product. However, we already had
>> > multiple usability and other issues due to their existence, and while
>> > fixing them we gradually get rid of _key/_val on public API. Hard to
>> tell
>> > when we will be able to completely deprecate/remove these fields, but we
>> > definitely should avoid building new features based on them.
>> >
>> > On top of that, I also don't like this approach because it doesn't seem
>> to
>> > be Spark-friendly to me. That's not how they typically create typed
>> > datasets (I already provided a documentation link [1] with examples
>> > earlier).
>> >
>> > From API standpoint, I think we should do the following:
>> > 1. Add 'IgniteSparkSession#createDataset(IgniteCache[K, V] cache):
>> > Dataset[(K, V)]' method that would create a dataset based on a cache.
>> > 2. (Scala only) Introduce 'IgniteCache.toDS()' that would do the same,
>> but
>> > via implicit conversions instead of SparkSession extension.
>> >
>> > On implementation level, we can use SqlQuery API (not SqlFieldQuery)
>> that
>> > is specifically designed to return key-value pairs instead of specific
>> > fields, while still providing all SQL capabilities.
>> >
>> > *Nikolay*, does this makes sense to you? Is it feasible and how hard
>> would
>> > it be to implement? How much of the existing code can we reuse (I
>> believe
>> > it should it be majority of it)?
>> >
>> > [1]
>> >
>> https://spark.apache.org/docs/2.3.1/sql-programming-guide.html#creating-datasets
>> >
>> > -Val
>> >
>> >> On Tue, Jul 31, 2018 at 2:03 PM Denis Magda <dmagda@apache.org> wrote:
>> >>
>> >> Hello folks,
>> >>
>> >> The documentation goes with a small reference about _key and _val
>> usage,
>> >> and only for Ignite SQL APIs (Java, Net, C++). I tried to clean up all
>> the
>> >> documentation code snippets.
>> >>
>> >> As for the GitHub examples, they require a major overhaul. Instead of
>> _key
>> >> and _val usage, we need to use SQL fields. Hopefully, someone will
>> groom
>> >> the examples.
>> >>
>> >> Considering this, I wouldn't suggest us exposing _key and _val in other
>> >> places like Spark. Are there any alternatives to this approach?
>> >>
>> >> --
>> >> Denis
>> >>
>> >>
>> >>
>> >> On Tue, Jul 31, 2018 at 2:49 AM Nikolay Izhikov <nizhikov@apache.org>
>> >> wrote:
>> >>
>> >>> Hello, Igniters.
>> >>>
>> >>> Valentin,
>> >>>
>> >>>> We never recommend to use these fields
>> >>>
>> >>> Actually, we did:
>> >>>
>> >>>        * Documentation [1]. Please, see "Predefined Fields" section.
>> >>>        * Java Example [2]
>> >>>        * DotNet Example [3]
>> >>>        * Scala Example [4]
>> >>>
>> >>>> ...hopefully will be removed altogether one day
>> >>>
>> >>> This is new for me.
>> >>>
>> >>> Do we have specific plans for it?
>> >>>
>> >>> [1] https://apacheignite-sql.readme.io/docs/schema-and-indexes
>> >>> [2]
>> >>>
>> >>
>> https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/sql/SqlDmlExample.java#L88
>> >>> [3]
>> >>>
>> >>
>> https://github.com/apache/ignite/blob/master/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Sql/SqlDmlExample.cs#L91
>> >>> [4]
>> >>>
>> >>
>> https://github.com/apache/ignite/blob/master/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala#L124
>> >>>
>> >>> В Пт, 27/07/2018 в 15:22 -0700, Valentin Kulichenko пишет:
>> >>>> Stuart,
>> >>>>
>> >>>> _key and _val fields is quite a dirty hack that was added years
ago
>> and
>> >>> is
>> >>>> virtually never used now. We never recommend to use these fields
and
>> I
>> >>>> would definitely avoid building new features based on them.
>> >>>>
>> >>>> Having said that, I'm not arguing the use case, but we need better
>> >>>> implementation approach here. I suggest we think it over and come
>> back
>> >> to
>> >>>> this next week :) I'm sure Nikolay will also chime in and share
his
>> >>>> thoughts.
>> >>>>
>> >>>> -Val
>> >>>>
>> >>>> On Fri, Jul 27, 2018 at 12:39 PM Stuart Macdonald <stuwee@stuwee.org
>> >
>> >>> wrote:
>> >>>>
>> >>>>> If your predicates and joins are expressed in Spark SQL, you
cannot
>> >>>>> currently optimise those and also gain access to the key/val
>> objects.
>> >>> If
>> >>>>> you went without the Ignite Spark SQL optimisations and expressed
>> >> your
>> >>>>> query in Ignite SQL, you still need to use the _key/_val columns.
>> The
>> >>>>> Ignite documentation has this specific example using the _val
column
>> >>> (right
>> >>>>> at the end):
>> >>>>> https://apacheignite-fs.readme.io/docs/ignitecontext-igniterdd
>> >>>>>
>> >>>>> Stuart.
>> >>>>>
>> >>>>> On 27 Jul 2018, at 20:05, Valentin Kulichenko <
>> >>>>> valentin.kulichenko@gmail.com>
>> >>>>> wrote:
>> >>>>>
>> >>>>> Well, the second approach would use the optimizations, no?
>> >>>>>
>> >>>>> -Val
>> >>>>>
>> >>>>>
>> >>>>> On Fri, Jul 27, 2018 at 11:49 AM Stuart Macdonald <
>> stuwee@stuwee.org
>> >>>
>> >>>>> wrote:
>> >>>>>
>> >>>>> Val,
>> >>>>>
>> >>>>>
>> >>>>> Yes you can already get access to the cache objects as an RDD
or
>> >>>>>
>> >>>>> Dataset but you can’t use the Ignite-optimised DataFrames
with these
>> >>>>>
>> >>>>> mechanisms. Optimised DataFrames have to be passed through Spark
>> >> SQL’s
>> >>>>>
>> >>>>> Catalyst engine to allow for predicate pushdown to Ignite. So
the
>> >>>>>
>> >>>>> usecase we’re talking about here is when we want to be able
to push
>> >>>>>
>> >>>>> Spark filters/joins to Ignite to optimise, but still have access
to
>> >>>>>
>> >>>>> the underlying cache objects, which is not possible currently.
>> >>>>>
>> >>>>>
>> >>>>> Can you elaborate on the reason _key and _val columns in Ignite
SQL
>> >>>>>
>> >>>>> will be removed?
>> >>>>>
>> >>>>>
>> >>>>> Stuart.
>> >>>>>
>> >>>>>
>> >>>>> On 27 Jul 2018, at 19:39, Valentin Kulichenko <
>> >>>>>
>> >>>>> valentin.kulichenko@gmail.com> wrote:
>> >>>>>
>> >>>>>
>> >>>>> Stuart, Nikolay,
>> >>>>>
>> >>>>>
>> >>>>> I really don't like the idea of exposing '_key' and '_val' fields.
>> >> This
>> >>>>>
>> >>>>> is
>> >>>>>
>> >>>>> legacy stuff that hopefully will be removed altogether one day.
>> Let's
>> >>> not
>> >>>>>
>> >>>>> use it in new features.
>> >>>>>
>> >>>>>
>> >>>>> Actually, I don't even think it's even needed. Spark docs [1]
>> suggest
>> >>> two
>> >>>>>
>> >>>>> ways of creating a typed dataset:
>> >>>>>
>> >>>>> 1. Based on RDD. This should be supported using IgniteRDD I
believe.
>> >>>>>
>> >>>>> 2. Based on DataFrame providing a class. This would just work
out of
>> >>> the
>> >>>>>
>> >>>>> box I guess.
>> >>>>>
>> >>>>>
>> >>>>> Of course, this needs to be tested and verified, and there might
be
>> >>>>>
>> >>>>> certain
>> >>>>>
>> >>>>> pieces missing to fully support the use case. But generally
I like
>> >>> these
>> >>>>>
>> >>>>> approaches much more.
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>
>> >>
>> https://spark.apache.org/docs/2.3.1/sql-programming-guide.html#creating-datasets
>> >>>>>
>> >>>>>
>> >>>>> -Val
>> >>>>>
>> >>>>>
>> >>>>> On Fri, Jul 27, 2018 at 6:31 AM Stuart Macdonald <stuwee@stuwee.org
>> >
>> >>>>>
>> >>>>> wrote:
>> >>>>>
>> >>>>>
>> >>>>> Here’s the ticket:
>> >>>>>
>> >>>>>
>> >>>>> https://issues.apache.org/jira/browse/IGNITE-9108
>> >>>>>
>> >>>>>
>> >>>>> Stuart.
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> On Friday, 27 July 2018 at 14:19, Nikolay Izhikov wrote:
>> >>>>>
>> >>>>>
>> >>>>> Sure.
>> >>>>>
>> >>>>>
>> >>>>> Please, send ticket number in this thread.
>> >>>>>
>> >>>>>
>> >>>>> пт, 27 июля 2018 г., 16:16 Stuart Macdonald <stuwee@stuwee.org
>> >>>>>
>> >>>>> (mailto:
>> >>>>>
>> >>>>> stuwee@stuwee.org)>:
>> >>>>>
>> >>>>>
>> >>>>> Thanks Nikolay. For both options if the cache object isn’t
a simple
>> >>>>>
>> >>>>> type,
>> >>>>>
>> >>>>> we’d probably do something like this in our Ignite SQL statement:
>> >>>>>
>> >>>>>
>> >>>>> select cast(_key as binary), cast(_val as binary), ...
>> >>>>>
>> >>>>>
>> >>>>> Which would give us the BinaryObject’s byte[], then for option
1 we
>> >>>>>
>> >>>>> keep
>> >>>>>
>> >>>>> the Ignite format and introduce a new Spark Encoder for Ignite
>> binary
>> >>>>>
>> >>>>> types
>> >>>>>
>> >>>>> (
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>
>> >>
>> https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/Encoder.html
>> >>>>>
>> >>>>> ),
>> >>>>>
>> >>>>> so that the end user interface would be something like:
>> >>>>>
>> >>>>>
>> >>>>> IgniteSparkSession session = ...
>> >>>>>
>> >>>>> Dataset<Row> dataFrame = ...
>> >>>>>
>> >>>>> Dataset<MyValClass> valDataSet =
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>
>> >>
>> dataFrame.select(“_val_).as(session.binaryObjectEncoder(MyValClass.class))
>> >>>>>
>> >>>>>
>> >>>>> Or for option 2 we have a behind-the-scenes Ignite-to-Kryo UDF
so
>> >> that
>> >>>>>
>> >>>>> the
>> >>>>>
>> >>>>> user interface would be standard Spark:
>> >>>>>
>> >>>>>
>> >>>>> Dataset<Row> dataFrame = ...
>> >>>>>
>> >>>>> DataSet<MyValClass> dataSet =
>> >>>>>
>> >>>>> dataFrame.select(“_val_).as(Encoders.kryo(MyValClass.class))
>> >>>>>
>> >>>>>
>> >>>>> I’ll create a ticket and maybe put together a test case for
further
>> >>>>>
>> >>>>> discussion?
>> >>>>>
>> >>>>>
>> >>>>> Stuart.
>> >>>>>
>> >>>>>
>> >>>>> On 27 Jul 2018, at 09:50, Nikolay Izhikov <nizhikov@apache.org
>> >>>>>
>> >>>>> (mailto:nizhikov@apache.org <nizhikov@apache.org>)>
wrote:
>> >>>>>
>> >>>>>
>> >>>>> Hello, Stuart.
>> >>>>>
>> >>>>>
>> >>>>> I like your idea.
>> >>>>>
>> >>>>>
>> >>>>> 1. Ignite BinaryObjects, in which case we’d need to supply
a Spark
>> >>>>>
>> >>>>> Encoder
>> >>>>>
>> >>>>> implementation for BinaryObjects
>> >>>>>
>> >>>>>
>> >>>>> 2. Kryo-serialised versions of the objects.
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> Seems like first option is simple adapter. Am I right?
>> >>>>>
>> >>>>> If yes, I think it's a more efficient way comparing with
>> >>>>>
>> >>>>> transformation of
>> >>>>>
>> >>>>> each object to some other(Kryo) format.
>> >>>>>
>> >>>>>
>> >>>>> Can you provide some additional links for both options?
>> >>>>>
>> >>>>> Where I can find API or(and) examples?
>> >>>>>
>> >>>>>
>> >>>>> As a second step, we can apply same approach to the regular
key,
>> >> value
>> >>>>>
>> >>>>> caches.
>> >>>>>
>> >>>>>
>> >>>>> Feel free to create a ticket.
>> >>>>>
>> >>>>>
>> >>>>> В Пт, 27/07/2018 в 09:37 +0100, Stuart Macdonald пишет:
>> >>>>>
>> >>>>>
>> >>>>> Ignite Dev Community,
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> Within Ignite-supplied Spark DataFrames, I’d like to propose
adding
>> >>>>>
>> >>>>> support
>> >>>>>
>> >>>>>
>> >>>>> for _key and _val columns which represent the cache key and
value
>> >>>>>
>> >>>>> objects
>> >>>>>
>> >>>>>
>> >>>>> similar to the current _key/_val column semantics in Ignite
SQL.
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> If the cache key or value objects are standard SQL types (eg.
>> String,
>> >>>>>
>> >>>>> Int,
>> >>>>>
>> >>>>>
>> >>>>> etc) they will be represented as such in the DataFrame schema,
>> >>>>>
>> >>>>> otherwise
>> >>>>>
>> >>>>>
>> >>>>> they are represented as Binary types encoded as either: 1. Ignite
>> >>>>>
>> >>>>>
>> >>>>> BinaryObjects, in which case we’d need to supply a Spark Encoder
>> >>>>>
>> >>>>>
>> >>>>> implementation for BinaryObjects, or 2. Kryo-serialised versions
of
>> >>>>>
>> >>>>> the
>> >>>>>
>> >>>>>
>> >>>>> objects. Option 1 would probably be more efficient but option
2
>> would
>> >>>>>
>> >>>>> be
>> >>>>>
>> >>>>>
>> >>>>> more idiomatic Spark.
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> This feature would be controlled with an optional parameter
in the
>> >>>>>
>> >>>>> Ignite
>> >>>>>
>> >>>>>
>> >>>>> data source, defaulting to the current implementation which
doesn’t
>> >>>>>
>> >>>>> supply
>> >>>>>
>> >>>>>
>> >>>>> _key or _val columns. The rationale behind this is the same
as the
>> >>>>>
>> >>>>> Ignite
>> >>>>>
>> >>>>>
>> >>>>> SQL _key and _val columns: to allow access to the full cache
objects
>> >>>>>
>> >>>>> from a
>> >>>>>
>> >>>>>
>> >>>>> SQL context.
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> Can I ask for feedback on this proposal please?
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> I’d be happy to contribute this feature if we agree on the
concept.
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> Stuart.
>> >>>>>
>> >>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message