ignite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stuart Macdonald <stu...@stuwee.org>
Subject Re: Spark DataFrames With Cache Key and Value Objects
Date Wed, 01 Aug 2018 18:52:37 GMT
Val,

Happy to clarify my thoughts. Let’s take an example, say we have an Ignite
cache of Person objects, so in Nikolay’s Ignite Spark SQL implementation you
can currently obtain a DataFrame with a column called “age” because that’s
been registered as a field in Ignite. Then you can do something like this:

Dataset<Row> = spark.sql(“select * from Person where age = 20”)

And Spark will extract the age predicate and pass it back to the Ignite
Relation implementation to perform the predication. This is useful because
Ignite can index the age column and optimise the query appropriately.

Now what I’m talking about being able to do is:

Dataset<Person> = spark.sql(“select _val from Person where age =
20”).as(ignite.encoder(Person.class))

This would also push the predicate to Ignite but then additionally allow
access to the actual cache value objects in order to perform further local
Spark operations on data which hasn’t been — or can’t be — registered as a
field.

Of course even without Nikolay’s implementation you can just grab a Dataset
of Person objects from Ignite and then do something like:

dataset.filter(p -> p.age = 20)

But this has no way of passing that predicate back to Ignite, because the
filter doesn’t go through Catalyst (which is the Spark engine which
performs pushdown). Even if you convert that dataset to a dataframe and do
a select() there is no way of pushing down that predicate. Spark has to
have obtained the schema and dataframe from Ignite’s SQL relation
implementation in order for the Spark Catalyst implementation to perform
predicate pushdown.

I’m not saying that we need to use Ignite’s _key or _val columns in order
for this to work (though I have no other proposals as to how to make this
work), but I am saying that an API which provides a Dataset<K, V> will not
to the best of my knowledge allow for predicate pushdown to Ignite.

I’m likewise keen to hear Nikolay’s point of view as he is obviously the
expert.

Thanks for your help so far.

Stuart.

On 1 Aug 2018, at 18:17, Valentin Kulichenko <valentin.kulichenko@gmail.com>
wrote:

Stuart,

I don't see a reason why it would work with DataFrames, but not with
Datasets - they are pretty much the same thing. If you have any particular
thoughts on this, please let us know.

In any case, I would like to hear from Nikolay as he is an implementor of
this functionality. Nikolay, please share your thoughts on my suggestion
above.

-Val

On Wed, Aug 1, 2018 at 12:05 AM Stuart Macdonald <stuwee@stuwee.org> wrote:

> I believe suggested approach will not work with the Spark SQL
> relational optimisations which perform predicate pushdown from Spark
> to Ignite. For that to work we need both the key/val and the
> relational fields in a dataframe schema.
>
> Stuart.
>
> > On 1 Aug 2018, at 04:23, Valentin Kulichenko <
> valentin.kulichenko@gmail.com> wrote:
> >
> > I don't think there are exact plans to remove _key and _value fields as
> > it's pretty hard considering the fact that many users use them and that
> > they are deeply integrated into the product. However, we already had
> > multiple usability and other issues due to their existence, and while
> > fixing them we gradually get rid of _key/_val on public API. Hard to tell
> > when we will be able to completely deprecate/remove these fields, but we
> > definitely should avoid building new features based on them.
> >
> > On top of that, I also don't like this approach because it doesn't seem
> to
> > be Spark-friendly to me. That's not how they typically create typed
> > datasets (I already provided a documentation link [1] with examples
> > earlier).
> >
> > From API standpoint, I think we should do the following:
> > 1. Add 'IgniteSparkSession#createDataset(IgniteCache[K, V] cache):
> > Dataset[(K, V)]' method that would create a dataset based on a cache.
> > 2. (Scala only) Introduce 'IgniteCache.toDS()' that would do the same,
> but
> > via implicit conversions instead of SparkSession extension.
> >
> > On implementation level, we can use SqlQuery API (not SqlFieldQuery) that
> > is specifically designed to return key-value pairs instead of specific
> > fields, while still providing all SQL capabilities.
> >
> > *Nikolay*, does this makes sense to you? Is it feasible and how hard
> would
> > it be to implement? How much of the existing code can we reuse (I believe
> > it should it be majority of it)?
> >
> > [1]
> >
> https://spark.apache.org/docs/2.3.1/sql-programming-guide.html#creating-datasets
> >
> > -Val
> >
> >> On Tue, Jul 31, 2018 at 2:03 PM Denis Magda <dmagda@apache.org> wrote:
> >>
> >> Hello folks,
> >>
> >> The documentation goes with a small reference about _key and _val usage,
> >> and only for Ignite SQL APIs (Java, Net, C++). I tried to clean up all
> the
> >> documentation code snippets.
> >>
> >> As for the GitHub examples, they require a major overhaul. Instead of
> _key
> >> and _val usage, we need to use SQL fields. Hopefully, someone will groom
> >> the examples.
> >>
> >> Considering this, I wouldn't suggest us exposing _key and _val in other
> >> places like Spark. Are there any alternatives to this approach?
> >>
> >> --
> >> Denis
> >>
> >>
> >>
> >> On Tue, Jul 31, 2018 at 2:49 AM Nikolay Izhikov <nizhikov@apache.org>
> >> wrote:
> >>
> >>> Hello, Igniters.
> >>>
> >>> Valentin,
> >>>
> >>>> We never recommend to use these fields
> >>>
> >>> Actually, we did:
> >>>
> >>>        * Documentation [1]. Please, see "Predefined Fields" section.
> >>>        * Java Example [2]
> >>>        * DotNet Example [3]
> >>>        * Scala Example [4]
> >>>
> >>>> ...hopefully will be removed altogether one day
> >>>
> >>> This is new for me.
> >>>
> >>> Do we have specific plans for it?
> >>>
> >>> [1] https://apacheignite-sql.readme.io/docs/schema-and-indexes
> >>> [2]
> >>>
> >>
> https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/sql/SqlDmlExample.java#L88
> >>> [3]
> >>>
> >>
> https://github.com/apache/ignite/blob/master/modules/platforms/dotnet/examples/Apache.Ignite.Examples/Sql/SqlDmlExample.cs#L91
> >>> [4]
> >>>
> >>
> https://github.com/apache/ignite/blob/master/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCachePopularNumbersExample.scala#L124
> >>>
> >>> В Пт, 27/07/2018 в 15:22 -0700, Valentin Kulichenko пишет:
> >>>> Stuart,
> >>>>
> >>>> _key and _val fields is quite a dirty hack that was added years ago
> and
> >>> is
> >>>> virtually never used now. We never recommend to use these fields and
I
> >>>> would definitely avoid building new features based on them.
> >>>>
> >>>> Having said that, I'm not arguing the use case, but we need better
> >>>> implementation approach here. I suggest we think it over and come back
> >> to
> >>>> this next week :) I'm sure Nikolay will also chime in and share his
> >>>> thoughts.
> >>>>
> >>>> -Val
> >>>>
> >>>> On Fri, Jul 27, 2018 at 12:39 PM Stuart Macdonald <stuwee@stuwee.org>
> >>> wrote:
> >>>>
> >>>>> If your predicates and joins are expressed in Spark SQL, you cannot
> >>>>> currently optimise those and also gain access to the key/val objects.
> >>> If
> >>>>> you went without the Ignite Spark SQL optimisations and expressed
> >> your
> >>>>> query in Ignite SQL, you still need to use the _key/_val columns.
The
> >>>>> Ignite documentation has this specific example using the _val column
> >>> (right
> >>>>> at the end):
> >>>>> https://apacheignite-fs.readme.io/docs/ignitecontext-igniterdd
> >>>>>
> >>>>> Stuart.
> >>>>>
> >>>>> On 27 Jul 2018, at 20:05, Valentin Kulichenko <
> >>>>> valentin.kulichenko@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>> Well, the second approach would use the optimizations, no?
> >>>>>
> >>>>> -Val
> >>>>>
> >>>>>
> >>>>> On Fri, Jul 27, 2018 at 11:49 AM Stuart Macdonald <stuwee@stuwee.org
> >>>
> >>>>> wrote:
> >>>>>
> >>>>> Val,
> >>>>>
> >>>>>
> >>>>> Yes you can already get access to the cache objects as an RDD or
> >>>>>
> >>>>> Dataset but you can’t use the Ignite-optimised DataFrames with
these
> >>>>>
> >>>>> mechanisms. Optimised DataFrames have to be passed through Spark
> >> SQL’s
> >>>>>
> >>>>> Catalyst engine to allow for predicate pushdown to Ignite. So the
> >>>>>
> >>>>> usecase we’re talking about here is when we want to be able to
push
> >>>>>
> >>>>> Spark filters/joins to Ignite to optimise, but still have access
to
> >>>>>
> >>>>> the underlying cache objects, which is not possible currently.
> >>>>>
> >>>>>
> >>>>> Can you elaborate on the reason _key and _val columns in Ignite
SQL
> >>>>>
> >>>>> will be removed?
> >>>>>
> >>>>>
> >>>>> Stuart.
> >>>>>
> >>>>>
> >>>>> On 27 Jul 2018, at 19:39, Valentin Kulichenko <
> >>>>>
> >>>>> valentin.kulichenko@gmail.com> wrote:
> >>>>>
> >>>>>
> >>>>> Stuart, Nikolay,
> >>>>>
> >>>>>
> >>>>> I really don't like the idea of exposing '_key' and '_val' fields.
> >> This
> >>>>>
> >>>>> is
> >>>>>
> >>>>> legacy stuff that hopefully will be removed altogether one day.
Let's
> >>> not
> >>>>>
> >>>>> use it in new features.
> >>>>>
> >>>>>
> >>>>> Actually, I don't even think it's even needed. Spark docs [1] suggest
> >>> two
> >>>>>
> >>>>> ways of creating a typed dataset:
> >>>>>
> >>>>> 1. Based on RDD. This should be supported using IgniteRDD I believe.
> >>>>>
> >>>>> 2. Based on DataFrame providing a class. This would just work out
of
> >>> the
> >>>>>
> >>>>> box I guess.
> >>>>>
> >>>>>
> >>>>> Of course, this needs to be tested and verified, and there might
be
> >>>>>
> >>>>> certain
> >>>>>
> >>>>> pieces missing to fully support the use case. But generally I like
> >>> these
> >>>>>
> >>>>> approaches much more.
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>
> >>
> https://spark.apache.org/docs/2.3.1/sql-programming-guide.html#creating-datasets
> >>>>>
> >>>>>
> >>>>> -Val
> >>>>>
> >>>>>
> >>>>> On Fri, Jul 27, 2018 at 6:31 AM Stuart Macdonald <stuwee@stuwee.org>
> >>>>>
> >>>>> wrote:
> >>>>>
> >>>>>
> >>>>> Here’s the ticket:
> >>>>>
> >>>>>
> >>>>> https://issues.apache.org/jira/browse/IGNITE-9108
> >>>>>
> >>>>>
> >>>>> Stuart.
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Friday, 27 July 2018 at 14:19, Nikolay Izhikov wrote:
> >>>>>
> >>>>>
> >>>>> Sure.
> >>>>>
> >>>>>
> >>>>> Please, send ticket number in this thread.
> >>>>>
> >>>>>
> >>>>> пт, 27 июля 2018 г., 16:16 Stuart Macdonald <stuwee@stuwee.org
> >>>>>
> >>>>> (mailto:
> >>>>>
> >>>>> stuwee@stuwee.org)>:
> >>>>>
> >>>>>
> >>>>> Thanks Nikolay. For both options if the cache object isn’t a simple
> >>>>>
> >>>>> type,
> >>>>>
> >>>>> we’d probably do something like this in our Ignite SQL statement:
> >>>>>
> >>>>>
> >>>>> select cast(_key as binary), cast(_val as binary), ...
> >>>>>
> >>>>>
> >>>>> Which would give us the BinaryObject’s byte[], then for option
1 we
> >>>>>
> >>>>> keep
> >>>>>
> >>>>> the Ignite format and introduce a new Spark Encoder for Ignite binary
> >>>>>
> >>>>> types
> >>>>>
> >>>>> (
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>
> >>
> https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/Encoder.html
> >>>>>
> >>>>> ),
> >>>>>
> >>>>> so that the end user interface would be something like:
> >>>>>
> >>>>>
> >>>>> IgniteSparkSession session = ...
> >>>>>
> >>>>> Dataset<Row> dataFrame = ...
> >>>>>
> >>>>> Dataset<MyValClass> valDataSet =
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>
> >>
> dataFrame.select(“_val_).as(session.binaryObjectEncoder(MyValClass.class))
> >>>>>
> >>>>>
> >>>>> Or for option 2 we have a behind-the-scenes Ignite-to-Kryo UDF so
> >> that
> >>>>>
> >>>>> the
> >>>>>
> >>>>> user interface would be standard Spark:
> >>>>>
> >>>>>
> >>>>> Dataset<Row> dataFrame = ...
> >>>>>
> >>>>> DataSet<MyValClass> dataSet =
> >>>>>
> >>>>> dataFrame.select(“_val_).as(Encoders.kryo(MyValClass.class))
> >>>>>
> >>>>>
> >>>>> I’ll create a ticket and maybe put together a test case for further
> >>>>>
> >>>>> discussion?
> >>>>>
> >>>>>
> >>>>> Stuart.
> >>>>>
> >>>>>
> >>>>> On 27 Jul 2018, at 09:50, Nikolay Izhikov <nizhikov@apache.org
> >>>>>
> >>>>> (mailto:nizhikov@apache.org <nizhikov@apache.org>)> wrote:
> >>>>>
> >>>>>
> >>>>> Hello, Stuart.
> >>>>>
> >>>>>
> >>>>> I like your idea.
> >>>>>
> >>>>>
> >>>>> 1. Ignite BinaryObjects, in which case we’d need to supply a Spark
> >>>>>
> >>>>> Encoder
> >>>>>
> >>>>> implementation for BinaryObjects
> >>>>>
> >>>>>
> >>>>> 2. Kryo-serialised versions of the objects.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Seems like first option is simple adapter. Am I right?
> >>>>>
> >>>>> If yes, I think it's a more efficient way comparing with
> >>>>>
> >>>>> transformation of
> >>>>>
> >>>>> each object to some other(Kryo) format.
> >>>>>
> >>>>>
> >>>>> Can you provide some additional links for both options?
> >>>>>
> >>>>> Where I can find API or(and) examples?
> >>>>>
> >>>>>
> >>>>> As a second step, we can apply same approach to the regular key,
> >> value
> >>>>>
> >>>>> caches.
> >>>>>
> >>>>>
> >>>>> Feel free to create a ticket.
> >>>>>
> >>>>>
> >>>>> В Пт, 27/07/2018 в 09:37 +0100, Stuart Macdonald пишет:
> >>>>>
> >>>>>
> >>>>> Ignite Dev Community,
> >>>>>
> >>>>>
> >>>>>
> >>>>> Within Ignite-supplied Spark DataFrames, I’d like to propose adding
> >>>>>
> >>>>> support
> >>>>>
> >>>>>
> >>>>> for _key and _val columns which represent the cache key and value
> >>>>>
> >>>>> objects
> >>>>>
> >>>>>
> >>>>> similar to the current _key/_val column semantics in Ignite SQL.
> >>>>>
> >>>>>
> >>>>>
> >>>>> If the cache key or value objects are standard SQL types (eg. String,
> >>>>>
> >>>>> Int,
> >>>>>
> >>>>>
> >>>>> etc) they will be represented as such in the DataFrame schema,
> >>>>>
> >>>>> otherwise
> >>>>>
> >>>>>
> >>>>> they are represented as Binary types encoded as either: 1. Ignite
> >>>>>
> >>>>>
> >>>>> BinaryObjects, in which case we’d need to supply a Spark Encoder
> >>>>>
> >>>>>
> >>>>> implementation for BinaryObjects, or 2. Kryo-serialised versions
of
> >>>>>
> >>>>> the
> >>>>>
> >>>>>
> >>>>> objects. Option 1 would probably be more efficient but option 2
would
> >>>>>
> >>>>> be
> >>>>>
> >>>>>
> >>>>> more idiomatic Spark.
> >>>>>
> >>>>>
> >>>>>
> >>>>> This feature would be controlled with an optional parameter in the
> >>>>>
> >>>>> Ignite
> >>>>>
> >>>>>
> >>>>> data source, defaulting to the current implementation which doesn’t
> >>>>>
> >>>>> supply
> >>>>>
> >>>>>
> >>>>> _key or _val columns. The rationale behind this is the same as the
> >>>>>
> >>>>> Ignite
> >>>>>
> >>>>>
> >>>>> SQL _key and _val columns: to allow access to the full cache objects
> >>>>>
> >>>>> from a
> >>>>>
> >>>>>
> >>>>> SQL context.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Can I ask for feedback on this proposal please?
> >>>>>
> >>>>>
> >>>>>
> >>>>> I’d be happy to contribute this feature if we agree on the concept.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Stuart.
> >>>>>
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message