fluo-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikewalch <...@git.apache.org>
Subject [GitHub] incubator-fluo pull request #822: fixes #500 Made scanning for notifications...
Date Wed, 19 Apr 2017 15:07:06 GMT
Github user mikewalch commented on a diff in the pull request:

    https://github.com/apache/incubator-fluo/pull/822#discussion_r112219019
  
    --- Diff: modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ParitionManager.java
---
    @@ -0,0 +1,372 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
    + * agreements. See the NOTICE file distributed with this work for additional information
regarding
    + * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
    + * "License"); you may not use this file except in compliance with the License. You may
obtain a
    + * copy of the License at
    + * 
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing, software distributed under
the License
    + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
either express
    + * or implied. See the License for the specific language governing permissions and limitations
under
    + * the License.
    + */
    +
    +package org.apache.fluo.core.worker.finder.hash;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import org.apache.accumulo.core.data.ArrayByteSequence;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.recipes.cache.ChildData;
    +import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    +import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
    +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    +import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
    +import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
    +import org.apache.curator.utils.ZKPaths;
    +import org.apache.fluo.accumulo.iterators.NotificationHashFilter;
    +import org.apache.fluo.accumulo.util.NotificationUtil;
    +import org.apache.fluo.accumulo.util.ZookeeperPath;
    +import org.apache.fluo.api.data.Bytes;
    +import org.apache.fluo.core.impl.Environment;
    +import org.apache.fluo.core.impl.FluoConfigurationImpl;
    +import org.apache.fluo.core.impl.Notification;
    +import org.apache.fluo.core.util.ByteUtil;
    +import org.apache.fluo.core.util.FluoThreadFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import static java.nio.charset.StandardCharsets.UTF_8;
    +
    +/**
    + * This class manages partitioning of notifications across workers coordinating in ZooKeeper.
    + * Workers are divided into groups. Each group is given a subset of the Accumulo table.
All workers
    + * in a group scan that subset and use hash partitioning to equally divide notifications.
    + *
    + * <p>
    + * Grouping workers was a compromise between every worker scanning the entire table OR
each worker
    + * having a dedicated part of a table. This scheme allows multiple workers to share popular
parts of
    + * a table. However, it limits the number of workers that will scan a portion of a table
for
    + * notifications. This limitation is important for scaling, even if there are 1,000 workers
there
    + * will never be more than 7 to 13 workers scanning a portion of the table.
    + */
    +public class ParitionManager {
    +
    +  private static final Logger log = LoggerFactory.getLogger(ParitionManager.class);
    +
    +  private final PathChildrenCache childrenCache;
    +  private final PersistentEphemeralNode myESNode;
    +  private final int groupSize;
    +  private long paritionSetTime;
    +  private PartitionInfo partitionInfo;
    +  private final ScheduledExecutorService schedExecutor;
    +
    +  private CuratorFramework curator;
    +
    +  private Environment env;
    +
    +  private final long minSleepTime;
    +  private final long maxSleepTime;
    +  private long retrySleepTime;
    +
    +  private static final long STABILIZE_TIME = TimeUnit.SECONDS.toMillis(60);
    +
    +  private class FindersListener implements PathChildrenCacheListener {
    +
    +    @Override
    +    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws
Exception {
    +      switch (event.getType()) {
    +        case CHILD_ADDED:
    +        case CHILD_REMOVED:
    +        case CHILD_UPDATED:
    +          scheduleUpdate();
    +          break;
    +        default:
    +          break;
    +      }
    +    }
    +  }
    +
    +  static PartitionInfo getGroupInfo(String me, SortedSet<String> children,
    +      Collection<TabletRange> tablets, int groupSize) {
    +
    +    int numGroups = Math.max(1, children.size() / groupSize);
    +    int[] groupSizes = new int[numGroups];
    +    int count = 0;
    +    int myGroupId = -1;
    +    int myId = -1;
    +
    +    for (String child : children) {
    +      if (child.equals(me)) {
    +        myGroupId = count;
    +        myId = groupSizes[count];
    +      }
    +      groupSizes[count]++;
    +      count = (count + 1) % numGroups;
    +    }
    +
    +    List<TabletRange> tabletsCopy = new ArrayList<>(tablets);
    +    Collections.sort(tabletsCopy);
    +
    +    // The behavior of Random with a given seed and shuffle are the same across different
versions
    +    // of java. Both specify the algorithms in their javadoc and are meant to behave
the same across
    +    // versions. This is important because different workers may be running different
versions of
    +    // Java, but all workers need to do the same shuffle.
    +    //
    +    // Did try to use hashing to partition the tablets among groups, but it was slightly
uneven. One
    +    // group having a 10% more tablets would lead to uneven utilization.
    +    Collections.shuffle(tabletsCopy, new Random(42));
    +
    +    List<TabletRange> groupsTablets = new ArrayList<>();
    +
    +    count = 0;
    +    for (TabletRange tr : tabletsCopy) {
    +      if (count == myGroupId) {
    +        groupsTablets.add(tr);
    +      }
    +      count = (count + 1) % numGroups;
    +    }
    +
    +    return new PartitionInfo(myId, myGroupId, groupSizes[myGroupId], numGroups, children.size(),
    +        groupsTablets);
    +  }
    +
    +  private void updatePartitionInfo() {
    +    try {
    +      String me = myESNode.getActualPath();
    +      while (me == null) {
    +        Thread.sleep(100);
    +        me = myESNode.getActualPath();
    +      }
    +      me = ZKPaths.getNodeFromPath(me);
    +
    +      byte[] zkSplitData = null;
    +      SortedSet<String> children = new TreeSet<>();
    +      Set<String> groupSizes = new HashSet<>();
    +      for (ChildData childData : childrenCache.getCurrentData()) {
    +        String node = ZKPaths.getNodeFromPath(childData.getPath());
    +        if (node.equals("splits")) {
    +          zkSplitData = childData.getData();
    +        } else {
    +          children.add(node);
    +          groupSizes.add(new String(childData.getData(), UTF_8));
    +        }
    +      }
    +
    +      if (zkSplitData == null) {
    +        log.info("Did not find splits in zookeeper, will retry later.");
    +        setPartitionInfo(null); // disable this worker from processing notifications
    +        scheduleRetry();
    +        return;
    +      }
    +
    +      if (!children.contains(me)) {
    +        log.warn("Did not see self (" + me
    +            + "), cannot gather tablet and notification partitioning info.");
    +        setPartitionInfo(null); // disable this worker from processing notifications
    +        scheduleRetry();
    +        return;
    +      }
    +
    +      // ensure all workers agree on the group size
    +      if (groupSizes.size() != 1 || !groupSizes.contains(groupSize + "")) {
    +        log.warn("Group size disagreement " + groupSize + " " + groupSizes
    +            + ", cannot gather tablet and notification partitioning info.");
    +        setPartitionInfo(null); // disable this worker from processing notifications
    +        scheduleRetry();
    +        return;
    +      }
    +
    +      List<Bytes> zkSplits = new ArrayList<>();
    +      SerializedSplits.deserialize(zkSplits::add, zkSplitData);
    +
    +      Collection<TabletRange> tabletRanges = TabletRange.toTabletRanges(zkSplits);
    +      PartitionInfo newPI = getGroupInfo(me, children, tabletRanges, groupSize);
    +
    +      setPartitionInfo(newPI);
    +    } catch (InterruptedException e) {
    +      log.debug("Interrupted while gathering tablet and notification partitioning info.",
e);
    +    } catch (Exception e) {
    +      log.warn("Problem gathering tablet and notification partitioning info.", e);
    +      setPartitionInfo(null); // disable this worker from processing notifications
    +      scheduleRetry();
    +    }
    +  }
    +
    +  private synchronized void scheduleRetry() {
    +    schedExecutor.schedule(() -> updatePartitionInfo(), retrySleepTime, TimeUnit.MILLISECONDS);
    +    retrySleepTime =
    +        Math.min(maxSleepTime,
    +            (long) (1.5 * retrySleepTime) + (long) (retrySleepTime * Math.random()));
    +  }
    +
    +  private synchronized void scheduleUpdate() {
    +    schedExecutor.schedule(() -> updatePartitionInfo(), 0, TimeUnit.MILLISECONDS);
    --- End diff --
    
    Could use `this::updatePartionInfo`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message