flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm <...@git.apache.org>
Subject [GitHub] flink pull request: Framesize fix
Date Tue, 28 Jul 2015 10:57:44 GMT
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/934#discussion_r35635294
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
---
    @@ -560,35 +572,118 @@ public ExecutionContext getExecutionContext() {
     	}
     
     	/**
    +	 * This works as cache for already merged accumulators, as, in some cases,
    +	 * we do not want to remerge accumulators as this may lead to duplicate entries.
    +	 * */
    +	private Map<String, Accumulator<?, ?>> mergedSmallUserAccumulators;
    +
    +	/**
     	 * Merges all accumulator results from the tasks previously executed in the Executions.
     	 * @return The accumulator map
     	 */
    -	public Map<String, Accumulator<?,?>> aggregateUserAccumulators() {
    +	public Map<String, Accumulator<?,?>> aggregateSmallUserAccumulators() {
    +		return aggregateSmallUserAccumulators(true);
    +	}
     
    -		Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<String,
Accumulator<?, ?>>();
    +	/**
    +	 * Merges all accumulator results from the tasks previously executed in the Executions.
    +	 * If <code>reaggregate</code> is set to false, then no aggregation is performed,
and
    +	 * the cache merge result is returned. Otherwise accumulators are merged.
    +	 * @param  reaggregate <code>true</code> if we want to aggregate accumulators,
    +	 *                     <code>false</code> otherwise.
    +	 * @return The accumulator map
    +	 	 */
    +	public Map<String, Accumulator<?,?>> aggregateSmallUserAccumulators(boolean
reaggregate) {
    +		if(!reaggregate) {
    +			return mergedSmallUserAccumulators;
    +		}
    +		this.mergedSmallUserAccumulators = new HashMap<String, Accumulator<?, ?>>();
     
     		for (ExecutionVertex vertex : getAllExecutionVertices()) {
    -			Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
    +			Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getSmallUserAccumulators();
     			if (next != null) {
    -				AccumulatorHelper.mergeInto(userAccumulators, next);
    +				AccumulatorHelper.mergeInto(mergedSmallUserAccumulators, next);
     			}
     		}
    +		return mergedSmallUserAccumulators;
    +	}
    +
    +	/**
    +	 * Merges all blobKeys referring to blobs of large accumulators. These refer to blobs
in the
    +	 * blobCache holding accumulators (results of tasks) that did not fit in an akka frame,
    +	 * thus had to be sent through the BlobCache.
    +	 * @return The accumulator map
    +	 */
    +	public Map<String, List<BlobKey>> aggregateLargeUserAccumulatorBlobKeys()
{
    +		Map<String, List<BlobKey>> largeUserAccumulatorRefs = new HashMap<String,
List<BlobKey>>();
    +
    +		for (ExecutionVertex vertex : getAllExecutionVertices()) {
    +			Map<String, List<BlobKey>> next = vertex.getCurrentExecutionAttempt().getLargeUserAccumulatorBlobKeys();
    +			mergeLargeUserAccumulatorBlobKeys(largeUserAccumulatorRefs, next);
    +		}
    +		return largeUserAccumulatorRefs;
    +	}
    +
    +	/**
    +	 * Adds new blobKeys referring to blobs of large accumulators to the already existing
ones.
    +	 * These refer to blobs in the blobCache holding accumulators (results of tasks) that
did not
    +	 * fit in an akka frame, thus had to be sent through the BlobCache.
    +	 * @param target the initial blobKey map
    +	 * @param toMerge the new keys to add to the initial map
    +	 * @return The resulting accumulator map
    +	 */
    +	public Map<String, List<BlobKey>> addLargeUserAccumulatorBlobKeys(Map<String,
List<BlobKey>> target,
    +																	Map<String, List<BlobKey>> toMerge) {
    +		if(target == null) {
    +			target = new HashMap<String, List<BlobKey>>();
    +		}
    +		mergeLargeUserAccumulatorBlobKeys(target, toMerge);
    +		return target;
    +	}
    +
    +	private void mergeLargeUserAccumulatorBlobKeys(Map<String, List<BlobKey>>
target,
    +												Map<String, List<BlobKey>> toMerge) {
    +		if(toMerge == null || toMerge.isEmpty()) {
    +			return;
    +		}
     
    -		return userAccumulators;
    +		for (Map.Entry<String, List<BlobKey>> otherEntry : toMerge.entrySet())
{
    +			List<BlobKey> existing = target.get(otherEntry.getKey());
    +			if (existing == null) {
    +				target.put(otherEntry.getKey(), otherEntry.getValue());
    +			} else {
    +				existing.addAll(otherEntry.getValue());
    +			}
    +		}
     	}
     
     	/**
    -	 * Gets a serialized accumulator map.
    +	 * Gets a serialized map of the contents of the accumulators.
     	 * @return The accumulator map with serialized accumulator values.
     	 * @throws IOException
     	 */
    -	public Map<String, SerializedValue<Object>> getAccumulatorsSerialized()
throws IOException {
    +	public Map<String, SerializedValue<Object>> getSmallAccumulatorsContentSerialized()
throws IOException {
    +		return serializeAccumulators(true);
    +	}
    +
    +	/**
    +	 * Gets a serialized map of the objects of the accumulators. This means that the actual
    +	 * objects are serialized, thus merging can still be applied after deserialization.
    +	 * @return The accumulator map with serialized accumulator objects.
    +	 * @throws IOException
    +	 */
    +	public Map<String, SerializedValue<Object>> getSmallAccumulatorsSerialized()
throws IOException {
    +		return serializeAccumulators(false);
    +	}
    +
    +	private Map<String,  SerializedValue<Object>> serializeAccumulators(boolean
onlyContent) throws IOException {
    --- End diff --
    
    Please document this method. Why did you add the `onlyContent` flag here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message