From dev-return-5059-apmail-tajo-dev-archive=tajo.apache.org@tajo.incubator.apache.org Thu Feb 13 14:47:27 2014 Return-Path: X-Original-To: apmail-tajo-dev-archive@minotaur.apache.org Delivered-To: apmail-tajo-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E94891071F for ; Thu, 13 Feb 2014 14:47:27 +0000 (UTC) Received: (qmail 8512 invoked by uid 500); 13 Feb 2014 14:47:27 -0000 Delivered-To: apmail-tajo-dev-archive@tajo.apache.org Received: (qmail 8482 invoked by uid 500); 13 Feb 2014 14:47:26 -0000 Mailing-List: contact dev-help@tajo.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.incubator.apache.org Delivered-To: mailing list dev@tajo.incubator.apache.org Received: (qmail 8473 invoked by uid 99); 13 Feb 2014 14:47:25 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Feb 2014 14:47:25 +0000 X-ASF-Spam-Status: No, hits=2.2 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_NONE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: local policy includes SPF record at spf.trusted-forwarder.org) Received: from [209.85.192.174] (HELO mail-pd0-f174.google.com) (209.85.192.174) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Feb 2014 14:47:18 +0000 Received: by mail-pd0-f174.google.com with SMTP id z10so10481774pdj.5 for ; Thu, 13 Feb 2014 06:46:55 -0800 (PST) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:from:mime-version:content-type:subject:date :in-reply-to:to:references:message-id; bh=ZyQtm6pQ3Vdox7SBADBa47UgdtNl1k1RhRSrwtMTXjs=; b=PafnVqR+Zza9inzn4RJEFar6OaYfwmouN+Br9ME/pllXGC/gjYFHllDZgUnqBBJFJk dtiQnkbBpYfdbsb6IsEoC6aM6j5Gr2jQmfToJ4eRIMTTTqSeWcKq0ZKDVDXWQoXBwFMt q87o+qXEgxZsroAFVoZgHgLoxeNFL3/ZOzgPEYibutEkUm9qAaD/5o1LmDT1NlcqrCCr SmmeXKseSe//lZ/tFoJRiplMhdhkuJAbandkT/sinGeEVhvv2syRFof+DVT/nvlikMI2 PzjsihEnR7ok9DlaEHTuc0TNft7+wRINJHgyguoJmaYh/kBGk7RhChGkHpQtQnj+ElKH sBDA== X-Gm-Message-State: ALoCoQnAu4j8Tum/vB3vCpmcPlTO8KiV/7e8T/VEqEo5995BoWYfmc0pBgRrEAmCMw6n1XUTZEHp X-Received: by 10.66.148.134 with SMTP id ts6mr2069506pab.113.1392302814881; Thu, 13 Feb 2014 06:46:54 -0800 (PST) Received: from kimjh.localhost ([112.146.223.74]) by mx.google.com with ESMTPSA id ix5sm7053280pbd.36.2014.02.13.06.46.51 for (version=TLSv1 cipher=ECDHE-RSA-RC4-SHA bits=128/128); Thu, 13 Feb 2014 06:46:53 -0800 (PST) From: jinho kim Mime-Version: 1.0 (Apple Message framework v1085) Content-Type: multipart/alternative; boundary=Apple-Mail-46--468734779 Subject: Re: Question about disk-aware scheduling in tajo Date: Thu, 13 Feb 2014 23:46:49 +0900 In-Reply-To: To: dev@tajo.incubator.apache.org References: Message-Id: <8A823808-AB57-475A-BC5A-D1578C101FE9@gmail.com> X-Mailer: Apple Mail (2.1085) X-Virus-Checked: Checked by ClamAV on apache.org --Apple-Mail-46--468734779 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=euc-kr Hi Min, Here are my comments If you have any questions, please let me know Thanks. > Hi Jihoon, >=20 > Thank you for you answer. However, seem you didn't answer that how = tajo use > disk information to balance the io overhead. >=20 > And still can't understand the details, quite complex to me, = especially > the class TaskBlockLocation >=20 >=20 > public static class TaskBlockLocation { > // This is a mapping from diskId to a list of pending task, right? > private HashMap> > unAssignedTaskMap =3D Yes, this is a list of pending task of a host. (initial time) hostA(TaskBlockLocation) ------- Disk1 --------Task1(HDFS = replecation-1), 2 | Disk2 --------Task3,4 hostB -------------------------- Disk1 --------Task1(HDFS = replecation-2), 3 | Disk2 --------Task2,4 > new HashMap>(); > // How can I return a Task to the container according to the diskId? > private HashMap assignedContainerMap =3D new > HashMap(); When available container(unique per host) are request a task, a = container assign lowest concurrency disk.(runtime) > private TreeMap volumeUsageMap =3D new = TreeMap Integer>(); > private String host; >=20 > public TaskBlockLocation(String host){ > this.host =3D host; > } >=20 > public void addQueryUnitAttemptId(Integer volumeId, = QueryUnitAttemptId > attemptId){ > LinkedList list =3D = unAssignedTaskMap.get(volumeId); > if (list =3D=3D null) { > list =3D new LinkedList(); > unAssignedTaskMap.put(volumeId, list); > } > list.add(attemptId); >=20 > if(!volumeUsageMap.containsKey(volumeId)) > volumeUsageMap.put(volumeId, 0); > } >=20 > public LinkedList > getQueryUnitAttemptIdList(ContainerId containerId){ > Integer volumeId; >=20 > if (!assignedContainerMap.containsKey(containerId)) { > // assign a new container to a volume with the lowest = concurrency, > right? > volumeId =3D assignVolumeId(); Yes, it is > assignedContainerMap.put(containerId, volumeId); > } else { > volumeId =3D assignedContainerMap.get(containerId); > } >=20 > LinkedList list =3D null; > if (unAssignedTaskMap.size() > 0) { > int retry =3D unAssignedTaskMap.size(); > do { > list =3D unAssignedTaskMap.get(volumeId); > if (list =3D=3D null || list.size() =3D=3D 0) { > //clean and reassign remaining volume > unAssignedTaskMap.remove(volumeId); > volumeUsageMap.remove(volumeId); > if (volumeId < 0) break; // processed all block on disk >=20 > // WHY THIS LINE ASSIGN A VOLUMEID AGAIN? > volumeId =3D assignVolumeId(); > // WHY THIS LINE PUT AGAIN? > // if the container is a new container, does it put twice?? > assignedContainerMap.put(containerId, volumeId); Case 1 : disks greater than containers.=20 ex) host ------- container1 ---------- disk1 ---- tasks | disk2 = ---- tasks =20 Case 2 : This is unknown disk(-1) for the remote task. because all local = block is processed on disk if container is not assigned a remote task, end of all tasks = will be deadlock ex) hostA ------- container1 ---------- disk1( new assign = -1) ---- remote pending tasks(zero local pending task) | =20 container2 ---------- disk2 ---- = tasks hostB ------- container1 ---------- disk1 ---- tasks = (will decreased pending task) --Jinho 2014. 2. 13., =BF=C0=C8=C4 5:29, Min Zhou =C0=DB=BC=BA: > Hi Jihoon, >=20 > Thank you for you answer. However, seem you didn't answer that how = tajo use > disk information to balance the io overhead. >=20 > And still can't understand the details, quite complex to me, = especially > the class TaskBlockLocation >=20 >=20 > public static class TaskBlockLocation { > // This is a mapping from diskId to a list of pending task, right? > private HashMap> > unAssignedTaskMap =3D > new HashMap>(); > // How can I return a Task to the container according to the diskId? > private HashMap assignedContainerMap =3D new > HashMap(); > private TreeMap volumeUsageMap =3D new = TreeMap Integer>(); > private String host; >=20 > public TaskBlockLocation(String host){ > this.host =3D host; > } >=20 > public void addQueryUnitAttemptId(Integer volumeId, = QueryUnitAttemptId > attemptId){ > LinkedList list =3D = unAssignedTaskMap.get(volumeId); > if (list =3D=3D null) { > list =3D new LinkedList(); > unAssignedTaskMap.put(volumeId, list); > } > list.add(attemptId); >=20 > if(!volumeUsageMap.containsKey(volumeId)) > volumeUsageMap.put(volumeId, 0); > } >=20 > public LinkedList > getQueryUnitAttemptIdList(ContainerId containerId){ > Integer volumeId; >=20 > if (!assignedContainerMap.containsKey(containerId)) { > // assign a new container to a volume with the lowest = concurrency, > right? > volumeId =3D assignVolumeId(); > assignedContainerMap.put(containerId, volumeId); > } else { > volumeId =3D assignedContainerMap.get(containerId); > } >=20 > LinkedList list =3D null; > if (unAssignedTaskMap.size() > 0) { > int retry =3D unAssignedTaskMap.size(); > do { > list =3D unAssignedTaskMap.get(volumeId); > if (list =3D=3D null || list.size() =3D=3D 0) { > //clean and reassign remaining volume > unAssignedTaskMap.remove(volumeId); > volumeUsageMap.remove(volumeId); > if (volumeId < 0) break; // processed all block on disk >=20 > // WHY THIS LINE ASSIGN A VOLUMEID AGAIN? > volumeId =3D assignVolumeId(); > // WHY THIS LINE PUT AGAIN? > // if the container is a new container, does it put twice?? > assignedContainerMap.put(containerId, volumeId); > retry--; > } else { > break; > } > } while (retry > 0); > } > return list; > } >=20 > public Integer assignVolumeId(){ > Map.Entry volumeEntry =3D null; >=20 > // choose a volume with the lowest concurrency, right? > for (Map.Entry entry : = volumeUsageMap.entrySet()) { > if(volumeEntry =3D=3D null) volumeEntry =3D entry; >=20 > if (volumeEntry.getValue() >=3D entry.getValue()) { > volumeEntry =3D entry; > } > } >=20 > if(volumeEntry !=3D null){ > volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() = + > 1); > LOG.info("Assigned host : " + host + " Volume : " + > volumeEntry.getKey() + ", Concurrency : " > + volumeUsageMap.get(volumeEntry.getKey())); > return volumeEntry.getKey(); > } else { > return -1; // processed all block on disk > } > } >=20 > public String getHost() { > return host; > } > } >=20 > This class maintains a mapping (assignedContainerMap) from containerId = to > the assigned diskId, How can I retrieve a task based on the diskId to = the > container? >=20 >=20 > Thanks, > Min >=20 >=20 > On Wed, Feb 12, 2014 at 10:17 PM, Jihoon Son = wrote: >=20 >> Hi, Min. >>=20 >> In DefaultTaskScheduler, each container is mapped to each disk of all = nodes >> in a cluster. When a container requests a task, DefaultTaskScheduler >> selects a closest task and assigns it to the container. This process = works >> for only the local reads. The disk volume information is not = considered for >> remote reads. >>=20 >> In my opinion, this is enough for us because there are few remote = tasks in >> each sub query. =46rom a test on an in-house cluster composed of 32 = nodes, >> the ratio of remote tasks to whole tasks was only about 0.17% (The = query >> was 'select l_orderkey from lineitem', and the volume of the lineitem = table >> was about 1TB.). Since the number of tasks was very small, there were = small >> disk contentions. >>=20 >> Hope that answers your questions. >> Thanks, >> Jihoon >>=20 >> 2014-02-13 11:00 GMT+09:00 Min Zhou : >>=20 >>> Hi all, >>>=20 >>> Tajo leverages the feature supported by HDFS-3672, which exposes the = disk >>> volume id of each hdfs data block. I already found the related code = in >>> DefaultTaskScheduler.assignToLeafTasks, can anyone explain the = logic for >>> me? What the scheduler do when the hdfs read is a remote read on = the >>> other >>> machine's disk? >>>=20 >>>=20 >>> Thanks, >>> Min >>> -- >>> My research interests are distributed systems, parallel computing = and >>> bytecode based virtual machine. >>>=20 >>> My profile: >>> http://www.linkedin.com/in/coderplay >>> My blog: >>> http://coderplay.javaeye.com >>>=20 >>=20 >=20 >=20 >=20 > --=20 > My research interests are distributed systems, parallel computing and > bytecode based virtual machine. >=20 > My profile: > http://www.linkedin.com/in/coderplay > My blog: > http://coderplay.javaeye.com --Apple-Mail-46--468734779--