flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StephanEwen <...@git.apache.org>
Subject [GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...
Date Thu, 01 Sep 2016 13:43:29 GMT
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2449#discussion_r77175883
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
---
    @@ -18,130 +18,88 @@
     
     package org.apache.flink.runtime.io.network;
     
    -import akka.dispatch.OnFailure;
     import org.apache.flink.api.common.JobID;
     import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    -import org.apache.flink.runtime.instance.ActorGateway;
    -import org.apache.flink.runtime.instance.InstanceConnectionInfo;
     import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
     import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
     import org.apache.flink.runtime.io.network.buffer.BufferPool;
     import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
    -import org.apache.flink.runtime.io.network.netty.NettyConfig;
    -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
    -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
     import org.apache.flink.runtime.io.network.partition.ResultPartition;
     import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
    -import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
     import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
     import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
    -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
    -import org.apache.flink.runtime.messages.TaskMessages.FailTask;
    -import org.apache.flink.runtime.query.KvStateID;
    -import org.apache.flink.runtime.query.KvStateMessage;
     import org.apache.flink.runtime.query.KvStateRegistry;
    -import org.apache.flink.runtime.query.KvStateRegistryListener;
    -import org.apache.flink.runtime.query.KvStateServerAddress;
     import org.apache.flink.runtime.query.TaskKvStateRegistry;
    -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
     import org.apache.flink.runtime.query.netty.KvStateServer;
    -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
     import org.apache.flink.runtime.taskmanager.Task;
     import org.apache.flink.runtime.taskmanager.TaskManager;
    -import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    -import scala.Option;
     import scala.Tuple2;
    -import scala.concurrent.ExecutionContext;
    -import scala.concurrent.Future;
    -import scala.concurrent.duration.FiniteDuration;
     
     import java.io.IOException;
     
    -import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Network I/O components of each {@link TaskManager} instance. The network environment
contains
      * the data structures that keep track of all intermediate results and all data exchanges.
    - *
    - * When initialized, the NetworkEnvironment will allocate the network buffer pool.
    - * All other components (netty, intermediate result managers, ...) are only created once
the
    - * environment is "associated" with a TaskManager and JobManager. This happens as soon
as the
    - * TaskManager actor gets created and registers itself at the JobManager.
      */
     public class NetworkEnvironment {
     
     	private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
     
     	private final Object lock = new Object();
     
    -	private final NetworkEnvironmentConfiguration configuration;
    -
    -	private final FiniteDuration jobManagerTimeout;
    -
     	private final NetworkBufferPool networkBufferPool;
     
    -	private ConnectionManager connectionManager;
    +	private final ConnectionManager connectionManager;
     
    -	private ResultPartitionManager partitionManager;
    +	private final ResultPartitionManager resultPartitionManager;
     
    -	private TaskEventDispatcher taskEventDispatcher;
    -
    -	private ResultPartitionConsumableNotifier partitionConsumableNotifier;
    -
    -	private PartitionStateChecker partitionStateChecker;
    +	private final TaskEventDispatcher taskEventDispatcher;
     
     	/** Server for {@link org.apache.flink.runtime.state.KvState} requests. */
    -	private KvStateServer kvStateServer;
    +	private final KvStateServer kvStateServer;
     
     	/** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */
    -	private KvStateRegistry kvStateRegistry;
    +	private final KvStateRegistry kvStateRegistry;
     
    -	private boolean isShutdown;
    +	private final IOMode defaultIOMode;
     
    -	/**
    -	 * ExecutionEnvironment which is used to execute remote calls with the
    -	 * {@link JobManagerResultPartitionConsumableNotifier}
    -	 */
    -	private final ExecutionContext executionContext;
    +	private final Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff;
    --- End diff --
    
    This should probably be two `int`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message