flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph
Date Thu, 25 Jan 2018 15:25:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339374#comment-16339374
] 

ASF GitHub Bot commented on FLINK-8449:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5308#discussion_r163874024
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
---
    @@ -752,16 +751,21 @@ public Executor getFutureExecutor() {
     	/**
     	 * Gets a serialized accumulator map.
     	 * @return The accumulator map with serialized accumulator values.
    -	 * @throws IOException
     	 */
     	@Override
    -	public Map<String, SerializedValue<Object>> getAccumulatorsSerialized()
throws IOException {
    +	public Map<String, SerializedValue<Object>> getAccumulatorsSerialized()
{
     
     		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
     
     		Map<String, SerializedValue<Object>> result = new HashMap<>(accumulatorMap.size());
     		for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet())
{
    -			result.put(entry.getKey(), new SerializedValue<>(entry.getValue().getLocalValue()));
    +
    +			try {
    +				final SerializedValue<Object> serializedValue = new SerializedValue<>(entry.getValue().getLocalValue());
    +				result.put(entry.getKey(), serializedValue);
    +			} catch (IOException ioe) {
    +				LOG.info("Could not serialize accumulator " + entry.getKey() + '.', ioe);
    --- End diff --
    
    This is mainly a question of behaviour. In case that something goes wrong while serializing
the accumulators one can either completely fail or try to return as much as possible, as it
is done here.
    
    I'll change the log level to error and insert a `FailedAccumulatorSerialization` entry
which throws the exception when being accessed instead.


> Extend OnCompletionActions to receive AccessExecutionGraph
> ----------------------------------------------------------
>
>                 Key: FLINK-8449
>                 URL: https://issues.apache.org/jira/browse/FLINK-8449
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the job terminates.
For further archiving of the {{ArchivedExecutionGraph}}, we should change this to {{AccessExecutionGraph}}.
The {{AccessExecutionGraph}} contains all the information to derive the {{JobResult}} and
additionally the information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message