trafodion-codereview mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From DaveBirdsall <...@git.apache.org>
Subject [GitHub] incubator-trafodion pull request #1043: [TRAFODION-25] Temporary code for UP...
Date Mon, 03 Apr 2017 20:24:44 GMT
Github user DaveBirdsall commented on a diff in the pull request:

    https://github.com/apache/incubator-trafodion/pull/1043#discussion_r109515170
  
    --- Diff: core/sql/optimizer/ScmCostMethod.cpp ---
    @@ -3845,8 +3845,336 @@ CostMethodHbaseUpdate::scmComputeOperatorCostInternal(RelExpr*
op,
       const PlanWorkSpace* pws,
       Lng32& countOfStreams)
     {
    -  // TODO: Write this method; the line below is a stub
    -  return CostMethod::scmComputeOperatorCostInternal(op,pws,countOfStreams);
    +  // TODO: Write this method; the code below is a copy of the Delete
    +  // method which we'll use for the moment. This is better than just
    +  // a simple constant stub; we will get parallel Update plans with
    +  // this code, for example, that we won't get with constant cost.
    +
    +  // The theory of operation of Update is somewhat different (since it
    +  // might, for example, do a Delete + an Insert, or might do an Hbase
    +  // Update -- is that decided before we get here?), so this code will
    +  // underestimate the cost in general.
    +
    +  const Context * myContext = pws->getContext();
    +
    +  cacheParameters(op,myContext);
    +  estimateDegreeOfParallelism();
    +
    +  const InputPhysicalProperty* ippForMe =
    +    myContext->getInputPhysicalProperty();
    +
    +  // -----------------------------------------
    +  // Save off estimated degree of parallelism.
    +  // -----------------------------------------
    +  countOfStreams = countOfStreams_;
    +
    +  HbaseDelete* delOp = (HbaseDelete *)op;   // downcast
    +
    +  CMPASSERT(partFunc_ != NULL);
    +
    +  //  Later, if and when we start using NodeMaps to track active regions for 
    +  //  Trafodion tables in HBase (or native HBase tables), we can use the
    +  //  following to get active partitions.
    +  //CostScalar activePartitions =
    +  // (CostScalar)
    +  //   (((NodeMap *)(partFunc_->getNodeMap()))->getNumActivePartitions());
    +  //  But for now, we do the following:
    +  CostScalar activePartitions = (CostScalar)(partFunc_->getCountOfPartitions());
    +
    +  const IndexDesc* CIDesc = delOp->getIndexDesc();
    +  const CostScalar & recordSizeInKb = CIDesc->getRecordSizeInKb();
    +
    +  CostScalar tuplesProcessed(csZero);
    +  CostScalar tuplesProduced(csZero);
    +  CostScalar tuplesSent(csZero);  // we use tuplesSent to model sending rowIDs to Hbase

    +  CostScalar randomIOs(csZero);
    +  CostScalar sequentialIOs(csZero);
    +
    +  CostScalar countOfAsynchronousStreams = activePartitions;
    +
    +  // figure out if the probes are in order - if they are, then when
    +  // scanning, I/O will tend to be sequential
    +
    +  NABoolean probesInOrder = FALSE;
    +  if (ippForMe != NULL)  // input physical properties exist?
    +  {
    +    // See if the probes are in order.
    +
    +    // For delete, a partial order is ok.
    +    NABoolean partiallyInOrderOK = TRUE;
    +    NABoolean probesForceSynchronousAccess = FALSE;
    +    ValueIdList targetSortKey = CIDesc->getOrderOfKeyValues();
    +    ValueIdSet sourceCharInputs =
    +      delOp->getGroupAttr()->getCharacteristicInputs();
    +
    +    ValueIdSet targetCharInputs;
    +    // The char inputs are still in terms of the source. Map them to the target.
    +    // Note: The source char outputs in the ipp have already been mapped to
    +    // the target. CharOutputs are a set, meaning they do not have duplicates
    +    // But we could have cases where two columns of the target are matched to the
    +    // same source column, example: Sol: 10-040416-5166, where we have
    +    // INSERT INTO b6table1
    +    //		  ( SELECT f, h_to_f, f, 8.4
    +    //            FROM btre211
    +    //            );
    +    // Hence we use lists here instead of sets.
    +    // Check to see if there are any duplicates in the source Characteristics inputs
    +    // if no, we shall perform set operations, as these are faster
    +    ValueIdList bottomValues = delOp->updateToSelectMap().getBottomValues();
    +    ValueIdSet bottomValuesSet(bottomValues);
    +    NABoolean useListInsteadOfSet = FALSE;
    +
    +    CascadesGroup* group1 = (*CURRSTMT_OPTGLOBALS->memo)[delOp->getGroupId()];
    +
    +    GenericUpdate* upOperator = (GenericUpdate *) group1->getFirstLogExpr();
    +
    +    if (((upOperator->getTableName().getSpecialType() == ExtendedQualName::NORMAL_TABLE
) || (upOperator->getTableName().getSpecialType() == ExtendedQualName::GHOST_TABLE )) &&
    +     (bottomValuesSet.entries() != bottomValues.entries() ) )
    +    {
    +
    +      ValueIdList targetInputList;
    +      // from here get all the bottom values that appear in the sourceCharInputs
    +      bottomValues.findCommonElements(sourceCharInputs );
    +      bottomValuesSet = bottomValues;
    +
    +      // we can use the bottomValues only if these contain some duplicate columns of
    +      // characteristics inputs, otherwise we shall use the characteristics inputs.
    +      if (bottomValuesSet == sourceCharInputs)
    +      {
    +        useListInsteadOfSet = TRUE;
    +	delOp->updateToSelectMap().rewriteValueIdListUpWithIndex(
    +	  targetInputList,
    +	  bottomValues);
    +	targetCharInputs = targetInputList;
    +      }
    +    }
    +
    +    if (!useListInsteadOfSet)
    +    {
    +      delOp->updateToSelectMap().rewriteValueIdSetUp(
    +	targetCharInputs,
    +	sourceCharInputs);
    +    }
    +
    +    // If a target key column is covered by a constant on the source side,
    +    // then we need to remove that column from the target sort key
    +    removeConstantsFromTargetSortKey(&targetSortKey,
    +                                   &(delOp->updateToSelectMap()));
    +    NABoolean orderedNJ = TRUE;
    +    // Don't call ordersMatch if njOuterOrder_ is null.
    +    if (ippForMe->getAssumeSortedForCosting())
    +      orderedNJ = FALSE;
    +    else
    +      // if leading keys are not same then don't try ordered NJ.
    +      orderedNJ =
    +        isOrderedNJFeasible(*(ippForMe->getNjOuterOrder()), targetSortKey);
    +
    +    if (orderedNJ AND 
    +        ordersMatch(ippForMe,
    +                    CIDesc,
    +                    &targetSortKey,
    +                    targetCharInputs,
    +                    partiallyInOrderOK,
    +                    probesForceSynchronousAccess))
    +    {
    +      probesInOrder = TRUE;
    +      if (probesForceSynchronousAccess)
    +      {
    +        // The probes form a complete order across all partitions and
    +        // the clustering key and partitioning key are the same. So, the
    +        // only asynchronous I/O we will see will be due to ESPs. So,
    +        // limit the count of streams in DP2 by the count of streams in ESP.
    +
    +        // Get the logPhysPartitioningFunction, which we will use
    +        // to get the logical partitioning function. If it's NULL,
    +        // it means the table was not partitioned at all, so we don't
    +        // need to limit anything since there already is no asynch I/O.
    +
    +     // TODO: lppf is always null in Trafodion; figure out what to do instead...
    +        const LogPhysPartitioningFunction* lppf =
    +            partFunc_->castToLogPhysPartitioningFunction();
    +        if (lppf != NULL)
    +        {
    +          PartitioningFunction* logPartFunc =
    +            lppf->getLogPartitioningFunction();
    +          // Get the number of ESPs:
    +          CostScalar numParts = logPartFunc->getCountOfPartitions();
    +
    +          countOfAsynchronousStreams = MINOF(numParts,
    +                                             countOfAsynchronousStreams);
    +        } // lppf != NULL
    +      } // probesForceSynchronousAccess
    +    } // probes are in order
    +  } // if input physical properties exist
    +
    +  CostScalar currentCpus = 
    +    (CostScalar)myContext->getPlan()->getPhysicalProperty()->getCurrentCountOfCPUs();
    +  CostScalar activeCpus = MINOF(countOfAsynchronousStreams, currentCpus);
    +  CostScalar streamsPerCpu =
    +    (countOfAsynchronousStreams / activeCpus).getCeiling();
    +
    +
    +  CostScalar noOfProbesPerPartition(csOne);
    +
    +  CostScalar numRowsToDelete(csOne);
    +  CostScalar numRowsToScan(csOne);
    +
    +  CostScalar commonComputation;
    +
    +  // Determine # of rows to scan and to delete
    +
    +  if (delOp->getSearchKey() && delOp->getSearchKey()->isUnique() &&

    +    (noOfProbes_ == 1))
    +  {
    +    // unique access
    +
    +    activePartitions = csOne;
    +    countOfAsynchronousStreams = csOne;
    +    activeCpus = csOne;
    +    streamsPerCpu = csOne;
    +    numRowsToScan = csOne;
    +    // assume the 1 row always satisfies any executor predicates so
    +    // we'll always do the Delete
    +    numRowsToDelete = csOne;
    +  }
    +  else
    +  {
    +    // non-unique access
    +
    +    numRowsToDelete =
    +      ((myRowCount_ / activePartitions).getCeiling()).minCsOne();
    +    noOfProbesPerPartition =
    +      ((noOfProbes_ / countOfAsynchronousStreams).getCeiling()).minCsOne();
    +
    +    // need to compute the number of rows that satisfy the key predicates
    +    // to compute the I/Os that must be performed
    +
    +    // need to create a new histogram, since the one from input logical
    +    // prop. has the histogram for the table after all executor preds are
    +    // applied (i.e. the result cardinality)
    +    IndexDescHistograms histograms(*CIDesc,CIDesc->getIndexKey().entries());
    +
    +    // retrieve all of the key preds in key column order
    +    ColumnOrderList keyPredsByCol(CIDesc->getIndexKey());
    +    delOp->getSearchKey()->getKeyPredicatesByColumn(keyPredsByCol);
    +
    +    if ( NOT allKeyColumnsHaveHistogramStatistics( histograms, CIDesc ) )
    +    {
    +      // All key columns do not have histogram data, the best we can
    +      // do is use the number of rows that satisfy all predicates
    +      // (i.e. the number of rows we will be updating)
    +      numRowsToScan = numRowsToDelete;
    +    }
    +    else
    +    {
    +      numRowsToScan = numRowsToScanWhenAllKeyColumnsHaveHistograms(
    +	histograms,
    +	keyPredsByCol,
    +	activePartitions,
    +	CIDesc
    +	);
    +      if (numRowsToScan < numRowsToDelete) // sanity test
    +      {
    +        // we will scan at least as many rows as we delete
    +        numRowsToScan = numRowsToDelete;
    +      }
    +    }
    +  }
    +
    +  // Notes: At execution time, several different TCBs can be created
    +  // for a delete. We can class them three ways: Unique, Subset, and
    +  // Rowset. Representative examples of the three classes are:
    +  //
    +  //   ExHbaseUMDtrafUniqueTaskTcb
    +  //   ExHbaseUMDtrafSubsetTaskTcb
    +  //   ExHbaseAccessSQRowsetTcb
    +  //
    +  // The theory of operation of each of these differs somewhat. 
    +  //
    +  // For the Unique variant, we use an HBase "get" to obtain a row, apply
    +  // a predicate to it, then do an HBase "delete" to delete it if the
    +  // predicate is true. (If there is no predicate, we'll simply do a
    +  // "checkAndDelete" so there would be no "get" cost.) 
    +  //
    +  // For the Subset variant, we use an HBase "scan" to obtain a sequence
    +  // of rows, apply a predicate to each, then do an HBase "delete" on
    +  // each row that passes the predicate.
    +  //
    +  // For the Rowset variant, we simply pass all the input keys to 
    +  // HBase in batches in HBase "deleteRows" calls. (In Explain plans,
    +  // this TCB shows up as "trafodion_delete_vsbb", while the first two
    +  // show up as "trafodion_delete".) There is no "get" cost. In plans
    +  // with this TCB, there is a separate Scan TCB to obtain the keys,
    +  // which then flow to this Rowset TCB via a tuple flow or nested join.
    +  // (Such a separate Scan might exist with the first two TCBs also,
    +  // e.g., when an index is used to decide which rows to delete.)
    +  // The messaging cost to HBase is also reduced since multiple delete
    +  // keys are sent per HBase interaction.
    +  //
    +  // Unfortunately the decisions as to which TCB will be used are
    +  // currently made in the generator code and so aren't easily 
    +  // available to us here. For the moment then, we make no attempt 
    +  // to distinguish a separate "get" cost, nor do we take into account
    +  // possible reduced message cost in the Rowset case. Should this
    +  // choice be refactored in the future to push it into the Optimizer,
    +  // then we can do a better job here. We did attempt to distinguish
    +  // the unique case here from the others, but even there our criteria
    +  // are not quite the same as in the generator. So at best, this attempt
    +  // simply sharpens the cost estimate in this one particular case.
    +
    +
    +  // Compute the I/O cost
    +
    +  computeIOCostsForCursorOperation(
    +    randomIOs /* out */,
    +    sequentialIOs /* out */,
    +    CIDesc,
    +    numRowsToScan,
    +    probesInOrder
    +    );
    +
    +  // Compute the tuple cost
    +
    +  tuplesProduced = numRowsToDelete;
    +  tuplesProcessed = numRowsToScan; 
    +  tuplesSent = numRowsToDelete;
    +
    +  CostScalar rowSize = delOp->getIndexDesc()->getRecordLength();
    +  CostScalar rowSizeFactor = scmRowSizeFactor(rowSize); 
    +  CostScalar outputRowSize = delOp->getGroupAttr()->getRecordLength();
    +  CostScalar outputRowSizeFactor = scmRowSizeFactor(outputRowSize);
    +
    +  tuplesProcessed *= rowSizeFactor;
    +  tuplesSent *= rowSizeFactor;
    +  tuplesProduced *= outputRowSizeFactor;
    +
    +
    +  // ---------------------------------------------------------------------
    +  // Synthesize and return cost object.
    +  // ---------------------------------------------------------------------
    +
    +  CostScalar probeRowSize = delOp->getIndexDesc()->getKeyLength();
    +  Cost * updateCost = 
    +    scmCost(tuplesProcessed, tuplesProduced, tuplesSent, randomIOs, sequentialIOs, noOfProbes_,
    +	    rowSize, csZero, outputRowSize, probeRowSize);
    +
    +#ifndef NDEBUG
    +if ( CmpCommon::getDefault( OPTIMIZER_PRINT_COST ) == DF_ON )
    +    {
    +      pfp = stdout;
    +      fprintf(pfp, "HbaseDelete::scmComputeOperatorCostInternal()\n");
    +      updateCost->getScmCplr().print(pfp);
    +      fprintf(pfp, "HBase Delete elapsed time: ");
    --- End diff --
    
    Here too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message