tajo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Min Zhou <coderp...@gmail.com>
Subject Repartitioner.scheduleFragmentsForJoinQuery() questions
Date Wed, 19 Feb 2014 06:40:54 GMT
Hi all,

I spent more than half a day try to understand the logic
of Repartitioner.scheduleFragmentsForJoinQuery(), still be confused.
Can anyone help me ?


 public static void scheduleFragmentsForJoinQuery(SubQuery subQuery)
      throws IOException {
    MasterPlan masterPlan = subQuery.getMasterPlan();
    ExecutionBlock execBlock = subQuery.getBlock();
    QueryMasterTask.QueryMasterTaskContext masterContext =
subQuery.getContext();
    AbstractStorageManager storageManager = subQuery.getStorageManager();

    ScanNode[] scans = execBlock.getScanNodes();

    Path tablePath;
    // DOES TAJO ONLY SUPPORT 2 WAY JOINS?
    // WHY THIS ARRAY SIZE IS ONLY 2?
    FileFragment[] fragments = new FileFragment[2];
    long[] stats = new long[2];

    // initialize variables from the child operators
    for (int i = 0; i < 2; i++) {
      TableDesc tableDesc =
masterContext.getTableDescMap().get(scans[i].getCanonicalName());
      if (tableDesc == null) { // if it is a real table stored on storage
// WHAT'S THE COMMENT MEAN?
        // WHICH KIND OF SQL CAN RUN INTO HERE?
        // TODO - to be fixed (wrong directory)
        ExecutionBlock [] childBlocks = new ExecutionBlock[2];
        childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0);
        childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);

        tablePath = storageManager.getTablePath(scans[i].getTableName());
        stats[i] =
masterContext.getSubQuery(childBlocks[i].getId()).getTableStat().getNumBytes();
        fragments[i] = new FileFragment(scans[i].getCanonicalName(),
tablePath, 0, 0, new String[]{UNKNOWN_HOST});
      } else {
        tablePath = tableDesc.getPath();
        try {
          stats[i] = GlobalPlanner.computeDescendentVolume(scans[i]);
        } catch (PlanningException e) {
          throw new IOException(e);
        }
        fragments[i] =
storageManager.getSplits(scans[i].getCanonicalName(), tableDesc.getMeta(),
tableDesc.getSchema(),
            tablePath).get(0);  // WHY JUST RETURN THE FIRST MEMBER OF THE
SPLITS ARRAY?
      }
    }

    LOG.info(String.format("Left Volume: %d, Right Volume: %d", stats[0],
stats[1]));

    // Assigning either fragments or fetch urls to query units
    boolean leftSmall =
execBlock.isBroadcastTable(scans[0].getCanonicalName());
    boolean rightSmall =
execBlock.isBroadcastTable(scans[1].getCanonicalName());

    if (leftSmall && rightSmall) {
      LOG.info("[Distributed Join Strategy] : Immediate Two Way Join on
Single Machine");
      SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]);
    } else if (leftSmall ^ rightSmall) {
      int broadcastIdx = leftSmall ? 0 : 1;
      int baseScanIdx = leftSmall ? 1 : 0;
      LOG.info(String.format("[BRDCAST JOIN] base_table=%s, base_volume=%d",
          scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
      scheduleLeafTasksWithBroadcastTable(subQuery, baseScanIdx,
fragments[broadcastIdx]);
    } else {
      LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join");
      // The hash map is modeling as follows:
      // <Part Id, <Table Name, Intermediate Data>>
      Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries = new
HashMap<Integer, Map<String, List<IntermediateEntry>>>();

      // Grouping IntermediateData by a partition key and a table name
      for (ScanNode scan : scans) {
        SubQuery childSubQuery =
masterContext.getSubQuery(TajoIdUtils.createExecutionBlockId(scan.getCanonicalName()));
        for (QueryUnit task : childSubQuery.getQueryUnits()) {
          if (task.getIntermediateData() != null) {
            for (IntermediateEntry intermEntry :
task.getIntermediateData()) {
              if (hashEntries.containsKey(intermEntry.getPartId())) {
                Map<String, List<IntermediateEntry>> tbNameToInterm =
                    hashEntries.get(intermEntry.getPartId());

                if (tbNameToInterm.containsKey(scan.getCanonicalName())) {

tbNameToInterm.get(scan.getCanonicalName()).add(intermEntry);
                } else {
                  tbNameToInterm.put(scan.getCanonicalName(),
TUtil.newList(intermEntry));
                }
              } else {
                Map<String, List<IntermediateEntry>> tbNameToInterm =
                    new HashMap<String, List<IntermediateEntry>>();
                tbNameToInterm.put(scan.getCanonicalName(),
TUtil.newList(intermEntry));
                hashEntries.put(intermEntry.getPartId(), tbNameToInterm);
              }
            }
          }
        }
      }

      // hashEntries can be zero if there are no input data.
      // In the case, it will cause the zero divided exception.
      // it avoids this problem.
      int [] avgSize = new int[2];
      avgSize[0] = hashEntries.size() == 0 ? 0 : (int) (stats[0] /
hashEntries.size());
      avgSize[1] = hashEntries.size() == 0 ? 0 : (int) (stats[1] /
hashEntries.size());
      int bothFetchSize = avgSize[0] + avgSize[1];

      // Getting the desire number of join tasks according to the volumn
      // of a larger table
      int largerIdx = stats[0] >= stats[1] ? 0 : 1;
      int desireJoinTaskVolumn = subQuery.getContext().getConf().
          getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME);

      // calculate the number of tasks according to the data size
      int mb = (int) Math.ceil((double)stats[largerIdx] / 1048576);
      LOG.info("Larger intermediate data is approximately " + mb + " MB");
      // determine the number of task per 64MB
      int maxTaskNum = (int) Math.ceil((double)mb / desireJoinTaskVolumn);
      LOG.info("The calculated number of tasks is " + maxTaskNum);
      LOG.info("The number of total shuffle keys is " + hashEntries.size());
      // the number of join tasks cannot be larger than the number of
      // distinct partition ids.
      int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
      LOG.info("The determined number of join tasks is " + joinTaskNum);

      SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]); //
WHY JUST ONLY 2 FRAGMENT FOR THIS JOIN? JUST 2 HDFS BLOCKS?

      // Assign partitions to tasks in a round robin manner.
      for (Entry<Integer, Map<String, List<IntermediateEntry>>> entry
          : hashEntries.entrySet()) {
        addJoinShuffle(subQuery, entry.getKey(), entry.getValue());
      }

    }
  }


Thanks,
Min

-- 
My research interests are distributed systems, parallel computing and
bytecode based virtual machine.

My profile:
http://www.linkedin.com/in/coderplay
My blog:
http://coderplay.javaeye.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message