hadoop-mapreduce-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "BELUGA BEHR (JIRA)" <j...@apache.org>
Subject [jira] [Created] (MAPREDUCE-7057) MergeThread Review
Date Tue, 20 Feb 2018 17:23:00 GMT
BELUGA BEHR created MAPREDUCE-7057:
--------------------------------------

             Summary: MergeThread Review
                 Key: MAPREDUCE-7057
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7057
             Project: Hadoop Map/Reduce
          Issue Type: Improvement
          Components: mrv2
    Affects Versions: 3.0.0
            Reporter: BELUGA BEHR


Source:
 [MergeThread.java|https://github.com/apache/hadoop/blob/178751ed8c9d47038acf8616c226f1f52e884feb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java]

Update this class to use Java 1.8 concurrent package.  There also some corner-cases not being
addressed with the current implementation:

{code:java|title=MergeThread.java}
// There is a scenario here where N threads have submitted inputs and are all waiting for
the 'pendingToBeMerged' object.  At this point, imagine the 'close' method is called.  The
close method will run, see nothing in the queue, interrupt the processing thread, and cause
it to exit.  Afterwards, the 'startMerge' threads will all be triggered and add the inputs
to a queue for which there is no consumer.  At this point, the T items have been removed from
the inputs with no way to recover them.  In practice, this may not ever be the case, but it
can be tightened up.

  public void startMerge(Set<T> inputs) {
    if (!closed) {
      numPending.incrementAndGet();
      List<T> toMergeInputs = new ArrayList<T>();
      Iterator<T> iter=inputs.iterator();
      for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
        toMergeInputs.add(iter.next());
        iter.remove();
      }
      LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() + 
               " segments, while ignoring " + inputs.size() + " segments");
      synchronized(pendingToBeMerged) {
        pendingToBeMerged.addLast(toMergeInputs);
        pendingToBeMerged.notifyAll();
      }
    }
  }

  public synchronized void close() throws InterruptedException {
    closed = true;
    waitForMerge();
    interrupt();
  }
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-dev-unsubscribe@hadoop.apache.org
For additional commands, e-mail: mapreduce-dev-help@hadoop.apache.org


Mime
View raw message