drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] weijietong commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
Date Fri, 28 Sep 2018 08:04:30 GMT
weijietong commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating
work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r221171069
 
 

 ##########
 File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##########
 @@ -36,25 +40,63 @@
 
   private RuntimeFilterWritable aggregated = null;
 
-  private Queue<RuntimeFilterWritable> rfQueue = new ConcurrentLinkedQueue<>();
+  private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
 
   private AtomicBoolean running = new AtomicBoolean(true);
 
+  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+
+  private Thread asyncAggregateThread;
+
+  private BufferAllocator bufferAllocator;
+
+  private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class);
+
+
+  public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+    this.bufferAllocator = bufferAllocator;
+    AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
+    asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
+    asyncAggregateThread.start();
+  }
+
   public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-    rfQueue.add(runtimeFilterWritable);
-    if (currentBookId.get() == 0) {
-      AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-      Thread asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
-      asyncAggregateThread.start();
+    if (running.get()) {
+      if (containOne()) {
+        boolean same = aggregated.same(runtimeFilterWritable);
+        if (!same) {
+          //This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
+          //share the same FragmentContext.
 
 Review comment:
   The directly example is this TPC-H sql:
   ```
   select l.l_orderkey, sum(l.l_extendedprice * (1 - l.l_discount)) as revenue, o.o_orderdate,
o.o_shippriority  
   from dfs.`/tpch-parquet/customer` c, dfs.`/tpch-parquet/orders` o, dfs.`/tpch-parquet/lineitem`
l  
   where c.c_mktsegment = 'HOUSEHOLD' and c.c_custkey = o.o_custkey and l.l_orderkey = o.o_orderkey
and o.o_orderdate < date '1995-03-25' and l.l_shipdate > date '1995-03-25'  
   group by l.l_orderkey, o.o_orderdate, o.o_shippriority 
   order by revenue desc, o.o_orderdate limit 10
   ```
   The corresponding plan is:
   ```
   
   00-00    Screen : rowType = RecordType(ANY l_orderkey, ANY revenue, ANY o_orderdate, ANY
o_shippriority): rowcount = 10.0, cumulative cost = {4051714.1799999997 rows, 3.094535517999211E7
cpu, 0.0 io, 0.0 network, 1.2986673920000002E7 memory}, id = 3423
   00-01      Project(l_orderkey=[$0], revenue=[$1], o_orderdate=[$2], o_shippriority=[$3])
: rowType = RecordType(ANY l_orderkey, ANY revenue, ANY o_orderdate, ANY o_shippriority):
rowcount = 10.0, cumulative cost = {4051713.1799999997 rows, 3.094535417999211E7 cpu, 0.0
io, 0.0 network, 1.2986673920000002E7 memory}, id = 3422
   00-02        SelectionVectorRemover : rowType = RecordType(ANY l_orderkey, ANY revenue,
ANY o_orderdate, ANY o_shippriority): rowcount = 10.0, cumulative cost = {4051703.1799999997
rows, 3.094531417999211E7 cpu, 0.0 io, 0.0 network, 1.2986673920000002E7 memory}, id = 3421
   00-03          Limit(fetch=[10]) : rowType = RecordType(ANY l_orderkey, ANY revenue, ANY
o_orderdate, ANY o_shippriority): rowcount = 10.0, cumulative cost = {4051693.1799999997 rows,
3.094530417999211E7 cpu, 0.0 io, 0.0 network, 1.2986673920000002E7 memory}, id = 3420
   00-04            SelectionVectorRemover : rowType = RecordType(ANY l_orderkey, ANY revenue,
ANY o_orderdate, ANY o_shippriority): rowcount = 3002.8599999999997, cumulative cost = {4051683.1799999997
rows, 3.094526417999211E7 cpu, 0.0 io, 0.0 network, 1.2986673920000002E7 memory}, id = 3419
   00-05              TopN(limit=[10]) : rowType = RecordType(ANY l_orderkey, ANY revenue,
ANY o_orderdate, ANY o_shippriority): rowcount = 3002.8599999999997, cumulative cost = {4048680.32
rows, 3.094226131999211E7 cpu, 0.0 io, 0.0 network, 1.2986673920000002E7 memory}, id = 3418
   00-06                Project(l_orderkey=[$0], revenue=[$3], o_orderdate=[$1], o_shippriority=[$2])
: rowType = RecordType(ANY l_orderkey, ANY revenue, ANY o_orderdate, ANY o_shippriority):
rowcount = 3002.8599999999997, cumulative cost = {4045677.46 rows, 3.0862459040000003E7 cpu,
0.0 io, 0.0 network, 1.2986673920000002E7 memory}, id = 3417
   00-07                  HashAgg(group=[{0, 1, 2}], revenue=[SUM($3)]) : rowType = RecordType(ANY
l_orderkey, ANY o_orderdate, ANY o_shippriority, ANY revenue): rowcount = 3002.8599999999997,
cumulative cost = {4042674.6 rows, 3.08504476E7 cpu, 0.0 io, 0.0 network, 1.2986673920000002E7
memory}, id = 3416
   00-08                    HashAgg(group=[{0, 1, 2}], revenue=[SUM($3)]) : rowType = RecordType(ANY
l_orderkey, ANY o_orderdate, ANY o_shippriority, ANY revenue): rowcount = 30028.6, cumulative
cost = {4012646.0 rows, 2.9769418E7 cpu, 0.0 io, 0.0 network, 1.1929667200000001E7 memory},
id = 3415
   00-09                      Project(l_orderkey=[$6], o_orderdate=[$4], o_shippriority=[$5],
$f3=[*($8, -(1, $9))]) : rowType = RecordType(ANY l_orderkey, ANY o_orderdate, ANY o_shippriority,
ANY $f3): rowcount = 300286.0, cumulative cost = {3712360.0 rows, 1.8959122E7 cpu, 0.0 io,
0.0 network, 1359600.0 memory}, id = 3414
   00-10                        Project(c_mktsegment=[$8], c_custkey=[$9], o_custkey=[$4],
o_orderkey=[$5], o_orderdate=[$6], o_shippriority=[$7], l_orderkey=[$0], l_shipdate=[$1],
l_extendedprice=[$2], l_discount=[$3]) : rowType = RecordType(ANY c_mktsegment, ANY c_custkey,
ANY o_custkey, ANY o_orderkey, ANY o_orderdate, ANY o_shippriority, ANY l_orderkey, ANY l_shipdate,
ANY l_extendedprice, ANY l_discount): rowcount = 300286.0, cumulative cost = {3412074.0 rows,
1.685712E7 cpu, 0.0 io, 0.0 network, 1359600.0 memory}, id = 3413
   00-11                          HashJoin(condition=[=($5, $0)], joinType=[inner]) : rowType
= RecordType(ANY l_orderkey, ANY l_shipdate, ANY l_extendedprice, ANY l_discount, ANY o_custkey,
ANY o_orderkey, ANY o_orderdate, ANY o_shippriority, ANY c_mktsegment, ANY c_custkey): rowcount
= 300286.0, cumulative cost = {3111788.0 rows, 1.385426E7 cpu, 0.0 io, 0.0 network, 1359600.0
memory}, id = 3412
   00-13                            SelectionVectorRemover : rowType = RecordType(ANY l_orderkey,
ANY l_shipdate, ANY l_extendedprice, ANY l_discount): rowcount = 300286.0, cumulative cost
= {2102002.0 rows, 6906578.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3403
   00-16                              Filter(condition=[>($1, 1995-03-25)]) : rowType =
RecordType(ANY l_orderkey, ANY l_shipdate, ANY l_extendedprice, ANY l_discount): rowcount
= 300286.0, cumulative cost = {1801716.0 rows, 6606292.0 cpu, 0.0 io, 0.0 network, 0.0 memory},
id = 3402
   00-19                                RuntimeFilter : rowType = RecordType(ANY l_orderkey,
ANY l_shipdate, ANY l_extendedprice, ANY l_discount): rowcount = 600572.0, cumulative cost
= {1201144.0 rows, 3002860.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3401
   00-22                                  Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=hdfs://10.210.214.42:8020/tpch-parquet/lineitem]], selectionRoot=hdfs://10.210.214.42:8020/tpch-parquet/lineitem,
numFiles=1, numRowGroups=1, usedMetadataFile=false, columns=[`l_orderkey`, `l_shipdate`, `l_extendedprice`,
`l_discount`]]]) : rowType = RecordType(ANY l_orderkey, ANY l_shipdate, ANY l_extendedprice,
ANY l_discount): rowcount = 600572.0, cumulative cost = {600572.0 rows, 2402288.0 cpu, 0.0
io, 0.0 network, 0.0 memory}, id = 3400
   00-12                            HashJoin(condition=[=($5, $0)], joinType=[inner]) : rowType
= RecordType(ANY o_custkey, ANY o_orderkey, ANY o_orderdate, ANY o_shippriority, ANY c_mktsegment,
ANY c_custkey): rowcount = 75000.0, cumulative cost = {634500.0 rows, 2744250.0 cpu, 0.0 io,
0.0 network, 39600.0 memory}, id = 3411
   00-15                              SelectionVectorRemover : rowType = RecordType(ANY o_custkey,
ANY o_orderkey, ANY o_orderdate, ANY o_shippriority): rowcount = 75000.0, cumulative cost
= {525000.0 rows, 1725000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3407
   00-18                                Filter(condition=[<($2, 1995-03-25)]) : rowType
= RecordType(ANY o_custkey, ANY o_orderkey, ANY o_orderdate, ANY o_shippriority): rowcount
= 75000.0, cumulative cost = {450000.0 rows, 1650000.0 cpu, 0.0 io, 0.0 network, 0.0 memory},
id = 3406
   00-21                                  RuntimeFilter : rowType = RecordType(ANY o_custkey,
ANY o_orderkey, ANY o_orderdate, ANY o_shippriority): rowcount = 150000.0, cumulative cost
= {300000.0 rows, 750000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3405
   00-23                                    Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=hdfs://10.210.214.42:8020/tpch-parquet/orders]], selectionRoot=hdfs://10.210.214.42:8020/tpch-parquet/orders,
numFiles=1, numRowGroups=1, usedMetadataFile=false, columns=[`o_custkey`, `o_orderkey`, `o_orderdate`,
`o_shippriority`]]]) : rowType = RecordType(ANY o_custkey, ANY o_orderkey, ANY o_orderdate,
ANY o_shippriority): rowcount = 150000.0, cumulative cost = {150000.0 rows, 600000.0 cpu,
0.0 io, 0.0 network, 0.0 memory}, id = 3404
   00-14                              SelectionVectorRemover : rowType = RecordType(ANY c_mktsegment,
ANY c_custkey): rowcount = 2250.0, cumulative cost = {32250.0 rows, 101250.0 cpu, 0.0 io,
0.0 network, 0.0 memory}, id = 3410
   00-17                                Filter(condition=[=($0, 'HOUSEHOLD')]) : rowType =
RecordType(ANY c_mktsegment, ANY c_custkey): rowcount = 2250.0, cumulative cost = {30000.0
rows, 99000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3409
   00-20                                  Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=hdfs://10.210.214.42:8020/tpch-parquet/customer]], selectionRoot=hdfs://10.210.214.42:8020/tpch-parquet/customer,
numFiles=1, numRowGroups=1, usedMetadataFile=false, columns=[`c_mktsegment`, `c_custkey`]]])
: rowType  = RecordType(ANY c_mktsegment, ANY c_custkey): rowcount = 15000.0, cumulative cost
= {15000.0 rows, 30000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3408
   ```
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message