tez-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jteagles <...@git.apache.org>
Subject [GitHub] tez pull request #33: TEZ-3998: support constructing DAG with concurrent edg...
Date Fri, 19 Oct 2018 20:17:13 GMT
Github user jteagles commented on a diff in the pull request:

    https://github.com/apache/tez/pull/33#discussion_r226768963
  
    --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
---
    @@ -0,0 +1,234 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tez.dag.library.vertexmanager;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.tez.common.TezUtils;
    +import org.apache.tez.dag.api.EdgeProperty;
    +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
    +import org.apache.tez.dag.api.InputDescriptor;
    +import org.apache.tez.dag.api.TezConfiguration;
    +import org.apache.tez.dag.api.TezUncheckedException;
    +import org.apache.tez.dag.api.VertexManagerPlugin;
    +import org.apache.tez.dag.api.VertexManagerPluginContext;
    +import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
    +import org.apache.tez.dag.api.event.VertexState;
    +import org.apache.tez.dag.api.event.VertexStateUpdate;
    +import org.apache.tez.runtime.api.Event;
    +import org.apache.tez.runtime.api.TaskAttemptIdentifier;
    +import org.apache.tez.runtime.api.events.VertexManagerEvent;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
    +
    +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin{
    +  private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
    +
    +  private final Map<String, Boolean> srcVerticesConfigured = Maps.newConcurrentMap();
    +  private int managedTasks;
    +  private boolean tasksScheduled = false;
    +  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
    +  private Configuration vertexConfig;
    +  private String vertexName;
    +  private ConcurrentEdgeTriggerType edgeTriggerType;
    +  private boolean allSrcVerticesConfigured;
    +
    +  int completedUpstreamTasks;
    +
    +  public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) {
    +    super(context);
    +  }
    +
    +  @Override
    +  public void initialize() {
    +    if (getContext().getUserPayload() == null) {
    +      throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput");
    +    }
    +    managedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
    +    Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
    +    for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
    +      if (!CONCURRENT.equals(entry.getValue().getSchedulingType())){
    +        throw new TezUncheckedException("All input edges to vertex " + vertexName +
    +            "  must be CONCURRENT.");
    +      }
    +      String srcVertex = entry.getKey();
    +      srcVerticesConfigured.put(srcVertex, false);
    +      getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED));
    +    }
    +
    +    try {
    +      vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
    +    } catch (IOException e) {
    +      throw new TezUncheckedException(e);
    +    }
    +    edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
    +        vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
    +            TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
    +    if (!ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED.equals(edgeTriggerType))
{
    +      // pending TEZ-3999
    +      throw new TezUncheckedException("Only support SOURCE_VERTEX_CONFIGURED triggering
type for now.");
    +    }
    +    LOG.info("VertexManagerWithConcurrentInput initialized with edgeTriggerType {}.",
edgeTriggerType);
    +
    +    vertexName = getContext().getVertexName();
    +    completedUpstreamTasks = 0;
    +  }
    +
    +  @Override
    +  public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
    +    onVertexStartedDone.set(true);
    +    scheduleTasks();
    +  }
    +
    +  @Override
    +  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
    +    Preconditions.checkArgument(VertexState.CONFIGURED.equals(stateUpdate.getVertexState()),
    +        "Received incorrect state notification : " + stateUpdate.getVertexState() + "
for vertex: "
    +            + stateUpdate.getVertexName() + " in vertex: " + this.vertexName);
    +    Preconditions.checkArgument(srcVerticesConfigured.containsKey(stateUpdate.getVertexName()),
    +        "Received incorrect vertex notification : " + stateUpdate.getVertexState() +
" for vertex: "
    +            + stateUpdate.getVertexName() + " in vertex: " + this.vertexName);
    +    LOG.info("Received configured notification: " + stateUpdate.getVertexState() + "
for vertex: "
    +        + stateUpdate.getVertexName() + " in vertex: " + this.vertexName);
    +    srcVerticesConfigured.put(stateUpdate.getVertexName(), true);
    +    // check for source vertices completely configured
    +    boolean checkAllSrcVerticesConfigured = true;
    +    for (Map.Entry<String, Boolean> entry : srcVerticesConfigured.entrySet()) {
    +      if (!entry.getValue()) {
    +        // vertex not configured
    +        LOG.info("Waiting for vertex {} in vertex {} ", entry.getKey(), this.vertexName);
    +        checkAllSrcVerticesConfigured = false;
    +        break;
    +      }
    +    }
    +    allSrcVerticesConfigured = checkAllSrcVerticesConfigured;
    +
    +    scheduleTasks();
    +  }
    +
    +  @Override
    +  public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
    +    completedUpstreamTasks ++;
    --- End diff --
    
    ```suggestion
        completedUpstreamTasks++;
    ```


---

Mime
View raw message