flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Miguel E. Coimbra (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs
Date Wed, 05 Dec 2018 12:29:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710007#comment-16710007

Miguel E. Coimbra commented on FLINK-10867:

[~StephanEwen], I've been thinking how to go about this on my [own fork of Apache Flink|https://github.com/mcoimbra/flink]
on GitHub. After having invested quite some time in the last two years using this platform
and opening bug issues now and then, I'm interested in contributing something which I believe
would be of use to the community

A complex task such as this would likely require a design document or something along those
lines. However, what I've done so far was to read the Flink source code to understand what
is necessary to change by tracing what the information flow would be, considering key moments:
 * Programmer invoking the {{.cache()}} function;
 * {{JobManager}} receiving a plan with a caching operator (the operator does not have a reference
to a previous job at this time);
 * {{TaskManager}}(s) receiving indication that the {{org.apache.flink.core.memory.MemorySegment
}}instances associated to the caching operator are to be kept as they are (in the job where
caching will occur, their data must have originated from a previous operator which produced
the data in the same job, e.g. {{Filter->Map->Cache}}). The operator would work like
a regular {{DataSink}}, but instead of writing data, its action is to not evict the segments;
 * {{JobManager}} receiving a plan with a caching operator referencing previously-cached data.


Additional behavior properties:
 * Adding a parameter to decide how long the cached data is to be stored in the cluster. Number
of jobs that the cached data should be persisted or amount of time? What would be desirable?
 * Would this imply that caching may only occur when executing in session mode so that the
Flink job manager knows that it is caching specifically for a user?
 * The {{org.apache.flink.runtime.executiongraph.ExecutionJobVertex}} instances that depend
on cached datasets could conceptually read their input from {{MemorySegment}} instances with
the same logic as if reading  from disk.
 * Create a new COMPLETED_AND_CACHED job status to make this concept explicit as far as job
management is concerned?


Besides the DataSet API (this part is the easier one), I've been thinking and perhaps it would
be best to define an {{org.apache.flink.api.java.operators.MemorySinkOperator }}class to explicitly
hold a reference to the previous job where caching occurred. The {{org.apache.flink.api.java.ExecutionEnvironment}}
instance could note the references to this {{MemorySinkOperator}} instance and store in them
the cluster job identification as an attribute. That way, when the client-side {{.execute()}}
call finishes successfully, it would store the reference there so that the {{MemorySinkOperator}}
operator reference can be reused in the next Flink job triggered by the user-level code.

The major packages for this functionality are:
 * {{org.apache.flink.optimizer}}
 * {{org.apache.flink.runtime}}
 * {{org.apache.flink.java:}}
 * {{org.apache.flink.client}}


I want to tackle this task myself, do you have any additional recommendations?

> Add a DataSet-based CacheOperator to reuse results between jobs
> ---------------------------------------------------------------
>                 Key: FLINK-10867
>                 URL: https://issues.apache.org/jira/browse/FLINK-10867
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management, DataSet API, Local Runtime
>    Affects Versions: 1.8.0
>            Reporter: Miguel E. Coimbra
>            Assignee: vinoyang
>            Priority: Major
>             Fix For: 1.8.0
> *Motivation.*
> There are job scenarios where Flink batch processing users may be interested in processing
a large amount of data, outputting results to disk and then reusing the results for another
type of computation in Flink again.
> This feature suggestion emerged from my work as a PhD researcher working on graph stream
> [https://arxiv.org/abs/1810.02781]
> More specifically, in our use case this would be very useful to maintain an evolving
graph while allowing for specific logic on challenges such as _when_ and _how_ to integrate
updates in the graph and also how to represent it.
> Furthermore, it would also be an enabler for rich use-cases that have synergy with this
existing Jira issue pertaining graph partitioning:
> FLINK-1536 - Graph partitioning operators for Gelly
> *Problem.*
> While it would be negligible to write the results to disk and then read them back in
a new job to be sent to the JobManager if they are small, this becomes prohibitive if there
are several gigabytes of data to write/read and using a distributed storage (e.g. HDFS) is
not an option.
> Even if there is a distributed storage available, as the number of sequential jobs increases,
even the benefits of the secondary storage being distributed will diminish.
> *Existing alternatives.*
> I also considered, as a possibility, to compose the sequence of jobs in a single big
job to submit to the JobManager, thus allowing reuse of results due to the natural forwarding
of results to subsequent operators in dataflow programing.
> However, this becomes difficult due to two reasons:
>  * The logic to connect the sequence of jobs may depend on factors external to Flink
and not known at the start of the job composition.
>  This also excludes limited iterative behavior like what is provided in {{BulkIteration/DeltaIteration;}}
>  ** Composing a job with "too many" operators and inter-dependencies may lead to the
Optimizer engaging an exponential optimization search space.
>  This is particularly true for operators with multiple valid execution strategies, leading
to a combinatorics problem.
>  This leads to the Flink compiler _taking forever_ to even create a plan.
>  I believe this is the current situation based on a reply I received from [~fhueske]
last year.
>  His reply was on the 7th of December 2017:
>  Link: [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
>  Mailing list thread title: "Re: How to perform efficient DataSet reuse between iterations"
> *Idea.*
> Perhaps the better way to describe this *CacheOperator* feature is the concept of "_job
chaining_", where a new type of DataSink would receive data that will:
>  - Be available to a subsequent job which somehow makes a reference to the DataSink of
the previous job;
>  - Have remained available (from the previous job execution) in the exact same TaskManagers
in the cluster.
> Likely, the optimal memory distribution will be pretty similar between chained jobs -
if the data was read from disk again between jobs, it would likely be distributed with the
same (automatic or not) strategies, hence the same distribution would likely be of use to
sequential jobs.
> *Design.*
> Potential conflicts with the current Flink cluster execution model:
>  - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job finishes in
local mode, so it would be necessary to change local mode to keep a FlinkMiniCluster alive
- what was the reasoning behind destroying it?
>  Simplifying the implementation?
>  - How would this look like in the API?
>  I envisioned an example like this:
> {{DataSet<Tuple2<Long, Long>> previousResult = callSomeFlinkDataflowOperator();
// The result of some previous computation.}}
>  {{CacheOperator<DataSet<Tuple2<Long, Long>>> op = previousResult.cache();}}
>  {{... // Other operations...}}
>  {{environment.execute();}}
>  {{... // The previous job has finished.}}
>  {{DataSet<Tuple2<Long, Long>> sorted = op.sort(0); // the previous DataSet,
which resulted from callSomeFlinkDataflowOperator() int the previous Flink job, remained in
>  {{environment.execute(); // Trigger a different job whose data depends on the previous
> Besides adding appropriate classes to the Flink Java API, implementing this feature would
require changing things so that:
>  * JobManagers are aware that a completed job had cached operators - likely a new COMPLETED_AND_REUSABLE
job state?
>  * TaskManagers must keep references to the Flink memory management segments associated
to the CacheOperator data;
>  * CacheOperator must have a default number of usages and/or amount of time to be kept
alive (I think both options should exist but the user may choose whether to use one or both);
>  * Cluster coordination: should the JobManager be the entity that ultimately triggers
the memory eviction order on the TaskManagers associated to a job with COMPLETED_AND_REUSABLE
> *Related work.*
> In Spark I believe the existing cache() operator does something similar to what I propose:
> [https://spark.apache.org/docs/latest/graphx-programming-guide.html#caching-and-uncaching]
> [https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED]]

This message was sent by Atlassian JIRA

View raw message