tez-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From beltran <...@git.apache.org>
Subject [GitHub] tez pull request #33: TEZ-3998: support constructing DAG with concurrent edg...
Date Thu, 08 Nov 2018 19:34:56 GMT
Github user beltran commented on a diff in the pull request:

    https://github.com/apache/tez/pull/33#discussion_r232028676
  
    --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
---
    @@ -0,0 +1,242 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tez.dag.library.vertexmanager;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.tez.common.TezUtils;
    +import org.apache.tez.dag.api.EdgeProperty;
    +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
    +import org.apache.tez.dag.api.InputDescriptor;
    +import org.apache.tez.dag.api.TezConfiguration;
    +import org.apache.tez.dag.api.TezUncheckedException;
    +import org.apache.tez.dag.api.VertexManagerPlugin;
    +import org.apache.tez.dag.api.VertexManagerPluginContext;
    +import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
    +import org.apache.tez.dag.api.event.VertexState;
    +import org.apache.tez.dag.api.event.VertexStateUpdate;
    +import org.apache.tez.runtime.api.Event;
    +import org.apache.tez.runtime.api.TaskAttemptIdentifier;
    +import org.apache.tez.runtime.api.events.VertexManagerEvent;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
    +
    +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
    +
    +  private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
    +
    +  private final Map<String, Boolean> srcVerticesConfigured = Maps.newConcurrentMap();
    +  private int managedTasks;
    +  private boolean tasksScheduled = false;
    +  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
    +  private Configuration vertexConfig;
    +  private String vertexName;
    +  private ConcurrentEdgeTriggerType edgeTriggerType;
    +  private boolean allSrcVerticesConfigured;
    +
    +  int completedUpstreamTasks;
    +
    +  public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) {
    +    super(context);
    +  }
    +
    +  @Override
    +  public void initialize() {
    +    if (getContext().getUserPayload() == null) {
    +      throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput");
    +    }
    +    managedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
    +    Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
    +    for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
    +      if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) {
    +        throw new TezUncheckedException("All input edges to vertex " + vertexName +
    +            "  must be CONCURRENT.");
    +      }
    +      String srcVertex = entry.getKey();
    +      srcVerticesConfigured.put(srcVertex, false);
    +      getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED));
    +    }
    +
    +    try {
    +      vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
    +    } catch (IOException e) {
    +      throw new TezUncheckedException(e);
    +    }
    +    edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
    +        vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
    +            TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
    +    if (!ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED.equals(edgeTriggerType))
{
    +      // pending TEZ-3999
    +      throw new TezUncheckedException("Only support SOURCE_VERTEX_CONFIGURED triggering
type for now.");
    +    }
    +    LOG.info("VertexManagerWithConcurrentInput initialized with edgeTriggerType {}.",
edgeTriggerType);
    +
    +    vertexName = getContext().getVertexName();
    +    completedUpstreamTasks = 0;
    +  }
    +
    +  @Override
    +  public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
    +    onVertexStartedDone.set(true);
    +    scheduleTasks();
    +  }
    +
    +  @Override
    +  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
    +    VertexState state = stateUpdate.getVertexState();
    +    String fromVertex = stateUpdate.getVertexName();
    +    if (!srcVerticesConfigured.containsKey(fromVertex)) {
    +      throw new IllegalArgumentException("Not expecting state update from vertex:" +
    +          fromVertex + " in vertex: " + this.vertexName);
    +    }
    +
    +    if (!VertexState.CONFIGURED.equals(state)) {
    +      throw new IllegalArgumentException("Received incorrect state notification : " +
    +          state + " from vertex: " + fromVertex + " in vertex: " + this.vertexName);
    +    }
    +
    +    LOG.info("Received configured notification: " + state + " for vertex: "
    +        + fromVertex + " in vertex: " + this.vertexName);
    +    srcVerticesConfigured.put(fromVertex, true);
    +
    +    // check for source vertices completely configured
    +    boolean checkAllSrcVerticesConfigured = true;
    +    for (Map.Entry<String, Boolean> entry : srcVerticesConfigured.entrySet()) {
    +      if (!entry.getValue()) {
    +        // vertex not configured
    +        LOG.info("Waiting for vertex {} in vertex {} ", entry.getKey(), this.vertexName);
    +        checkAllSrcVerticesConfigured = false;
    +        break;
    +      }
    +    }
    +    allSrcVerticesConfigured = checkAllSrcVerticesConfigured;
    --- End diff --
    
    Doing it like this makes me wonder if the tasks can be scheduled twice for example `onVertexStateUpdated`
being called twice closely:
    
    * Thread 1: calls onVertexStateUpdated
    * Thread 2: calls onVertexStateUpdated
    * Thread 1: at `allSrcVerticesConfigured = checkAllSrcVerticesConfigured;` is set to false
    * Thread 2: at `allSrcVerticesConfigured = checkAllSrcVerticesConfigured;` is set to true
now
    * Thread 1: scheduleTasks schedules all the tasks because  `allSrcVerticesConfigured`
is true
    * Thread 2: scheduleTasks schedules all the tasks because  `allSrcVerticesConfigured`
is true
    
    I see the variable `tasksScheduled` is used, but I think it could still happen. Maybe
with some `AtomicBoolean.getAndSet` we'd be safer.


---

Mime
View raw message