tajo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Min Zhou <coderp...@gmail.com>
Subject Re: Question about disk-aware scheduling in tajo
Date Thu, 13 Feb 2014 08:29:09 GMT
Hi Jihoon,

Thank you for you answer. However, seem you didn't answer that how tajo use
disk information to balance the io overhead.

And still can't understand the details,  quite complex to me, especially
the class TaskBlockLocation


public static class TaskBlockLocation {
    // This is a mapping from diskId to a list of pending task, right?
    private HashMap<Integer, LinkedList<QueryUnitAttemptId>>
unAssignedTaskMap =
        new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
   // How can I return a Task to the container according to the diskId?
    private HashMap<ContainerId, Integer> assignedContainerMap = new
HashMap<ContainerId, Integer>();
    private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer,
Integer>();
    private String host;

    public TaskBlockLocation(String host){
      this.host = host;
    }

    public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId
attemptId){
      LinkedList<QueryUnitAttemptId> list = unAssignedTaskMap.get(volumeId);
      if (list == null) {
        list = new LinkedList<QueryUnitAttemptId>();
        unAssignedTaskMap.put(volumeId, list);
      }
      list.add(attemptId);

      if(!volumeUsageMap.containsKey(volumeId))
volumeUsageMap.put(volumeId, 0);
    }

    public LinkedList<QueryUnitAttemptId>
getQueryUnitAttemptIdList(ContainerId containerId){
      Integer volumeId;

      if (!assignedContainerMap.containsKey(containerId)) {
        // assign a new container to a volume with the lowest concurrency,
right?
        volumeId = assignVolumeId();
        assignedContainerMap.put(containerId, volumeId);
      } else {
        volumeId = assignedContainerMap.get(containerId);
      }

      LinkedList<QueryUnitAttemptId> list = null;
      if (unAssignedTaskMap.size() >  0) {
        int retry = unAssignedTaskMap.size();
        do {
          list = unAssignedTaskMap.get(volumeId);
          if (list == null || list.size() == 0) {
            //clean and reassign remaining volume
            unAssignedTaskMap.remove(volumeId);
            volumeUsageMap.remove(volumeId);
            if (volumeId < 0) break; //  processed all block on disk

            // WHY THIS LINE ASSIGN A VOLUMEID AGAIN?
            volumeId = assignVolumeId();
            // WHY THIS LINE PUT AGAIN?
            // if the container is a new container, does it put twice??
            assignedContainerMap.put(containerId, volumeId);
            retry--;
          } else {
            break;
          }
        } while (retry > 0);
      }
      return list;
    }

    public Integer assignVolumeId(){
      Map.Entry<Integer, Integer> volumeEntry = null;

      // choose a volume with the lowest concurrency, right?
      for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
        if(volumeEntry == null) volumeEntry = entry;

        if (volumeEntry.getValue() >= entry.getValue()) {
          volumeEntry = entry;
        }
      }

      if(volumeEntry != null){
        volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() +
1);
        LOG.info("Assigned host : " + host + " Volume : " +
volumeEntry.getKey() + ", Concurrency : "
            + volumeUsageMap.get(volumeEntry.getKey()));
        return volumeEntry.getKey();
      } else {
         return -1;  // processed all block on disk
      }
    }

    public String getHost() {
      return host;
    }
  }

This class maintains a mapping (assignedContainerMap) from containerId to
the assigned diskId, How can I retrieve a task based on the diskId to the
container?


Thanks,
Min


On Wed, Feb 12, 2014 at 10:17 PM, Jihoon Son <jihoonson@apache.org> wrote:

> Hi, Min.
>
> In DefaultTaskScheduler, each container is mapped to each disk of all nodes
> in a cluster. When a container requests a task, DefaultTaskScheduler
> selects a closest task and assigns it to the container. This process works
> for only the local reads. The disk volume information is not considered for
> remote reads.
>
> In my opinion, this is enough for us because there are few remote tasks in
> each sub query. From a test on an in-house cluster composed of 32 nodes,
> the ratio of remote tasks to whole tasks was only about 0.17% (The query
> was 'select l_orderkey from lineitem', and the volume of the lineitem table
> was about 1TB.). Since the number of tasks was very small, there were small
> disk contentions.
>
> Hope that answers your questions.
> Thanks,
> Jihoon
>
> 2014-02-13 11:00 GMT+09:00 Min Zhou <coderplay@gmail.com>:
>
> > Hi all,
> >
> > Tajo leverages the feature supported by HDFS-3672, which exposes the disk
> > volume id of each hdfs data block.  I already found the related code in
> > DefaultTaskScheduler.assignToLeafTasks,  can anyone explain the logic for
> > me?  What the scheduler do when the hdfs read is a remote read on the
> > other
> > machine's disk?
> >
> >
> > Thanks,
> > Min
> > --
> > My research interests are distributed systems, parallel computing and
> > bytecode based virtual machine.
> >
> > My profile:
> > http://www.linkedin.com/in/coderplay
> > My blog:
> > http://coderplay.javaeye.com
> >
>



-- 
My research interests are distributed systems, parallel computing and
bytecode based virtual machine.

My profile:
http://www.linkedin.com/in/coderplay
My blog:
http://coderplay.javaeye.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message