drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vita...@apache.org
Subject [drill] 02/05: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf reference count bugs & tune the execution flow & support left deep tree
Date Thu, 29 Nov 2018 20:27:11 GMT
This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 9667e92e1e87ce1826f0eac3f2396187dbfa8aaa
Author: weijie.tong <weijie.tong@alipay.com>
AuthorDate: Sun Oct 14 19:41:51 2018 +0800

    DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf reference count bugs & tune the execution flow & support left deep tree
    
    closes #1504
---
 .../java/org/apache/drill/exec/ExecConstants.java  |   4 +
 .../org/apache/drill/exec/ops/FragmentContext.java |  20 +-
 .../apache/drill/exec/ops/FragmentContextImpl.java |  82 ++++-
 .../drill/exec/ops/OperatorMetricRegistry.java     |   2 +
 .../exec/physical/config/RuntimeFilterPOP.java     |  16 +-
 .../impl/filter/RuntimeFilterRecordBatch.java      |  82 +++--
 .../exec/physical/impl/join/HashJoinBatch.java     |  43 ++-
 .../exec/planner/physical/RuntimeFilterPrel.java   |  14 +-
 .../physical/visitor/RuntimeFilterVisitor.java     |  80 ++++-
 .../exec/server/options/SystemOptionManager.java   |   2 +
 .../org/apache/drill/exec/work/WorkManager.java    |  10 +-
 .../apache/drill/exec/work/filter/BloomFilter.java |  62 ++--
 .../drill/exec/work/filter/BloomFilterDef.java     |  12 +-
 .../drill/exec/work/filter/RuntimeFilterDef.java   |  21 +-
 .../exec/work/filter/RuntimeFilterReporter.java    |   5 +-
 .../exec/work/filter/RuntimeFilterRouter.java      | 280 ++++++++---------
 .../drill/exec/work/filter/RuntimeFilterSink.java  | 340 ++++++++++++---------
 .../exec/work/filter/RuntimeFilterWritable.java    |  21 ++
 .../apache/drill/exec/work/foreman/Foreman.java    |   2 +-
 .../java-exec/src/main/resources/drill-module.conf |   2 +
 .../exec/physical/impl/join/TestHashJoinJPPD.java  |  10 +-
 .../physical/impl/join/TestHashJoinJPPDPlan.java   |  52 ----
 .../drill/exec/work/filter/BloomFilterTest.java    |  19 +-
 .../org/apache/drill/test/OperatorFixture.java     |  27 +-
 .../apache/drill/test/PhysicalOpUnitTestBase.java  |  15 +-
 .../java/org/apache/drill/exec/proto/BitData.java  | 128 +++++++-
 .../org/apache/drill/exec/proto/SchemaBitData.java |   7 +
 .../drill/exec/proto/beans/RuntimeFilterBDef.java  |  22 ++
 protocol/src/main/protobuf/BitData.proto           |   1 +
 29 files changed, 876 insertions(+), 505 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index fb46572..c4d7652 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -153,6 +153,10 @@ public final class ExecConstants {
   public static final IntegerValidator HASHJOIN_BLOOM_FILTER_MAX_SIZE = new IntegerValidator(HASHJOIN_BLOOM_FILTER_MAX_SIZE_KEY, null);
   public static final String HASHJOIN_BLOOM_FILTER_FPP_KEY = "exec.hashjoin.bloom_filter.fpp";
   public static final DoubleValidator HASHJOIN_BLOOM_FILTER_FPP_VALIDATOR = new RangeDoubleValidator(HASHJOIN_BLOOM_FILTER_FPP_KEY, Double.MIN_VALUE, 1.0, null);
+  public static final String HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY = "exec.hashjoin.runtime_filter.waiting.enable";
+  public static final BooleanValidator HASHJOIN_ENABLE_RUNTIME_FILTER_WAITING = new BooleanValidator(HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY, null);
+  public static final String HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY = "exec.hashjoin.runtime_filter.max.waiting.time";
+  public static final PositiveLongValidator HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME = new PositiveLongValidator(HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY, Character.MAX_VALUE, null);
 
 
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 88c21d9..5125f72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -20,8 +20,7 @@ package org.apache.drill.exec.ops;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
+import java.util.concurrent.TimeUnit;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
@@ -159,18 +158,23 @@ public interface FragmentContext extends UdfUtilities, AutoCloseable {
 
   @Override
   void close();
-
-  /**
-   * @return
-   */
-  RuntimeFilterSink getRuntimeFilterSink();
-
   /**
    * add a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment
    * @param runtimeFilter
    */
   public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter);
 
+  public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier);
+
+  /**
+   * get the RuntimeFilter with a blocking wait, if the waiting option is enabled
+   * @param rfIdentifier
+   * @param maxWaitTime
+   * @param timeUnit
+   * @return the RFW or null
+   */
+  public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit);
+
   interface ExecutorState {
     /**
      * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index 6e40466..b740c92 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -21,7 +21,12 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
@@ -60,8 +65,6 @@ import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
-
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -115,6 +118,10 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
   private final BufferManager bufferManager;
   private ExecutorState executorState;
   private final ExecutionControls executionControls;
+  private boolean enableRuntimeFilter;
+  private boolean enableRFWaiting;
+  private Lock lock4RF;
+  private Condition condition4RF;
 
   private final SendingAccountor sendingAccountor = new SendingAccountor();
   private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
@@ -136,8 +143,8 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
   private final AccountingUserConnection accountingUserConnection;
   /** Stores constants and their holders by type */
   private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
-
-  private RuntimeFilterSink runtimeFilterSink;
+  private Map<Long, RuntimeFilterWritable> rfIdentifier2RFW = new ConcurrentHashMap<>();
+  private Map<Long, Boolean> rfIdentifier2fetched = new ConcurrentHashMap<>();
 
   /**
    * Create a FragmentContext instance for non-root fragment.
@@ -209,10 +216,11 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
     stats = new FragmentStats(allocator, fragment.getAssignment());
     bufferManager = new BufferManagerImpl(this.allocator);
     constantValueHolderCache = Maps.newHashMap();
-    boolean enableRF = context.getOptionManager().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
-    if (enableRF) {
-      ExecutorService executorService = context.getExecutor();
-      this.runtimeFilterSink = new RuntimeFilterSink(this.allocator, executorService);
+    enableRuntimeFilter = this.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val;
+    enableRFWaiting = this.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY).bool_val && enableRuntimeFilter;
+    if (enableRFWaiting) {
+      lock4RF = new ReentrantLock();
+      condition4RF = lock4RF.newCondition();
     }
   }
 
@@ -362,12 +370,50 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
 
   @Override
   public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-    this.runtimeFilterSink.aggregate(runtimeFilter);
+    long rfIdentifier = runtimeFilter.getRuntimeFilterBDef().getRfIdentifier();
+    //if the RF was sent directly from the HJ nodes, we don't need to retain the buffer again
+    // as the RuntimeFilterReporter has already retained the buffer
+    rfIdentifier2fetched.put(rfIdentifier, false);
+    rfIdentifier2RFW.put(rfIdentifier, runtimeFilter);
+    if (enableRFWaiting) {
+      lock4RF.lock();
+      try {
+        condition4RF.signal();
+      } catch (Exception e) {
+        logger.info("fail to signal the waiting thread.", e);
+      } finally {
+        lock4RF.unlock();
+      }
+    }
+  }
+
+  @Override
+  public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier) {
+    RuntimeFilterWritable runtimeFilterWritable = rfIdentifier2RFW.get(rfIdentifier);
+    if (runtimeFilterWritable != null) {
+      rfIdentifier2fetched.put(rfIdentifier, true);
+    }
+    return runtimeFilterWritable;
   }
 
   @Override
-  public RuntimeFilterSink getRuntimeFilterSink() {
-    return runtimeFilterSink;
+  public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit) {
+    if (rfIdentifier2RFW.get(rfIdentifier) != null) {
+      return getRuntimeFilter(rfIdentifier);
+    }
+    if (enableRFWaiting) {
+      lock4RF.lock();
+      try {
+        if (rfIdentifier2RFW.get(rfIdentifier) == null) {
+          condition4RF.await(maxWaitTime, timeUnit);
+        }
+      } catch (InterruptedException e) {
+        logger.info("Condition was interrupted", e);
+      } finally {
+        lock4RF.unlock();
+      }
+    }
+    return getRuntimeFilter(rfIdentifier);
   }
 
   /**
@@ -484,12 +530,11 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
     // Close the buffers before closing the operators; this is needed as buffer ownership
     // is attached to the receive operators.
     suppressingClose(buffers);
-
+    closeNotConsumedRFWs();
     // close operator context
     for (OperatorContextImpl opContext : contexts) {
       suppressingClose(opContext);
     }
-    suppressingClose(runtimeFilterSink);
     suppressingClose(bufferManager);
     suppressingClose(allocator);
   }
@@ -550,4 +595,15 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
   protected BufferManager getBufferManager() {
     return bufferManager;
   }
+
+  private void closeNotConsumedRFWs() {
+    for (RuntimeFilterWritable runtimeFilterWritable : rfIdentifier2RFW.values()){
+      long rfIdentifier = runtimeFilterWritable.getRuntimeFilterBDef().getRfIdentifier();
+      boolean fetchedByOperator = rfIdentifier2fetched.get(rfIdentifier);
+      if (!fetchedByOperator) {
+        //if the RF hasn't been consumed by the operator, we have to released it one more time.
+        runtimeFilterWritable.close();
+      }
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
index ac86702..da59068 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.physical.impl.ScreenCreator;
 import org.apache.drill.exec.physical.impl.SingleSenderCreator;
 import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
 import org.apache.drill.exec.physical.impl.broadcastsender.BroadcastSenderRootExec;
+import org.apache.drill.exec.physical.impl.filter.RuntimeFilterRecordBatch;
 import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
 import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
 import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
@@ -61,6 +62,7 @@ public class OperatorMetricRegistry {
     register(CoreOperatorType.LATERAL_JOIN_VALUE, AbstractBinaryRecordBatch.Metric.class);
     register(CoreOperatorType.UNNEST_VALUE, UnnestRecordBatch.Metric.class);
     register(CoreOperatorType.UNION_VALUE, AbstractBinaryRecordBatch.Metric.class);
+    register(CoreOperatorType.RUNTIME_FILTER_VALUE, RuntimeFilterRecordBatch.Metric.class);
   }
 
   private static void register(final int operatorType, final Class<? extends MetricDef> metricDef) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java
index 50c00d7..b35bf29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java
@@ -31,9 +31,12 @@ public class RuntimeFilterPOP extends AbstractSingle {
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterPOP.class);
 
+  private long identifier;
+
   @JsonCreator
-  public RuntimeFilterPOP(@JsonProperty("child") PhysicalOperator child) {
+  public RuntimeFilterPOP(@JsonProperty("child") PhysicalOperator child, @JsonProperty("identifier")long identifier) {
     super(child);
+    this.identifier = identifier;
   }
 
   @Override
@@ -43,7 +46,7 @@ public class RuntimeFilterPOP extends AbstractSingle {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new RuntimeFilterPOP(child);
+    return new RuntimeFilterPOP(child, identifier);
   }
 
   @Override
@@ -55,4 +58,13 @@ public class RuntimeFilterPOP extends AbstractSingle {
   public int getOperatorType() {
     return CoreOperatorType.RUNTIME_FILTER_VALUE;
   }
+
+
+  public long getIdentifier() {
+    return identifier;
+  }
+
+  public void setIdentifier(long identifier) {
+    this.identifier = identifier;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index 9248bbc..bf7ed79 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -22,11 +22,13 @@ import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.RuntimeFilterPOP;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -36,14 +38,13 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.work.filter.BloomFilter;
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
-
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch participates
@@ -59,12 +60,21 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
   private List<String> toFilterFields;
   private List<BloomFilter> bloomFilters;
   private RuntimeFilterWritable current;
-  private RuntimeFilterWritable previous;
   private int originalRecordCount;
+  private long filteredRows = 0l;
+  private long appliedTimes = 0l;
+  private int batchTimes = 0;
+  private boolean waited = false;
+  private boolean enableRFWaiting;
+  private long maxWaitingTime;
+  private long rfIdentifier;
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
 
   public RuntimeFilterRecordBatch(RuntimeFilterPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
     super(pop, context, incoming);
+    enableRFWaiting = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY).bool_val;
+    maxWaitingTime = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY).num_val;
+    this.rfIdentifier = pop.getIdentifier();
   }
 
   @Override
@@ -89,7 +99,6 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
 
   @Override
   protected IterOutcome doWork() {
-    container.transferIn(incoming.getContainer());
     originalRecordCount = incoming.getRecordCount();
     sv2.setBatchActualRecordCount(originalRecordCount);
     try {
@@ -97,6 +106,8 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
     } catch (SchemaChangeException e) {
       throw new UnsupportedOperationException(e);
     }
+    container.transferIn(incoming.getContainer());
+    updateStats();
     return getFinalOutcome(false);
   }
 
@@ -155,21 +166,11 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
    * schema change hash64 should be reset and this method needs to be called again.
    */
   private void setupHashHelper() {
-    final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
-    // Check if RuntimeFilterWritable was received by the minor fragment or not
-    if (!runtimeFilterSink.containOne()) {
+    current = context.getRuntimeFilter(rfIdentifier);
+    if (current == null) {
       return;
     }
-    if (runtimeFilterSink.hasFreshOne()) {
-      RuntimeFilterWritable freshRuntimeFilterWritable = runtimeFilterSink.fetchLatestDuplicatedAggregatedOne();
-      if (current == null) {
-        current = freshRuntimeFilterWritable;
-        previous = freshRuntimeFilterWritable;
-      } else {
-        previous = current;
-        current = freshRuntimeFilterWritable;
-        previous.close();
-      }
+    if (bloomFilters == null) {
       bloomFilters = current.unwrap();
     }
     // Check if HashHelper is initialized or not
@@ -189,8 +190,7 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
           ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId);
           hashFieldExps.add(toHashFieldExp);
         }
-        hash64 = hashHelper.getHash64(hashFieldExps.toArray(new LogicalExpression[hashFieldExps.size()]),
-          typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()]));
+        hash64 = hashHelper.getHash64(hashFieldExps.toArray(new LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()]));
       } catch (Exception e) {
         throw UserException.internalError(e).build(logger);
       }
@@ -208,9 +208,11 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
       sv2.setRecordCount(0);
       return;
     }
-    final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
+    current = context.getRuntimeFilter(rfIdentifier);
+    timedWaiting();
+    batchTimes++;
     sv2.allocateNew(originalRecordCount);
-    if (!runtimeFilterSink.containOne()) {
+    if (current == null) {
       // means none of the rows are filtered out hence set all the indexes
       for (int i = 0; i < originalRecordCount; ++i) {
         sv2.setIndex(i, i);
@@ -227,21 +229,17 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
       String fieldName = toFilterFields.get(i);
       computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
     }
-
     int svIndex = 0;
-    int tmpFilterRows = 0;
     for (int i = 0; i < originalRecordCount; i++) {
       boolean contain = bitSet.get(i);
       if (contain) {
         sv2.setIndex(svIndex, i);
         svIndex++;
       } else {
-        tmpFilterRows++;
+        filteredRows++;
       }
     }
-
-    logger.debug("RuntimeFiltered has filtered out {} rows from incoming with {} rows",
-      tmpFilterRows, originalRecordCount);
+    appliedTimes++;
     sv2.setRecordCount(svIndex);
   }
 
@@ -263,4 +261,34 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
         + "originalRecordCount={}, batchSchema={}]",
         container, sv2, toFilterFields, originalRecordCount, incoming.getSchema());
   }
+
+  public enum Metric implements MetricDef {
+    FILTERED_ROWS, APPLIED_TIMES;
+
+    @Override
+    public int metricId() {
+      return ordinal();
+    }
+  }
+
+  public void updateStats() {
+    stats.setLongStat(Metric.FILTERED_ROWS, filteredRows);
+    stats.setLongStat(Metric.APPLIED_TIMES, appliedTimes);
+  }
+
+  private void timedWaiting() {
+    if (!enableRFWaiting || waited) {
+      return;
+    }
+    //Downstream HashJoinBatch prefetch first batch from both sides in buildSchema phase hence waiting is done post that phase
+    if (current == null && batchTimes > 0) {
+      waited = true;
+      try {
+        stats.startWait();
+        current = context.getRuntimeFilter(rfIdentifier, maxWaitingTime, TimeUnit.MILLISECONDS);
+      } finally {
+        stats.stopWait();
+      }
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 88eadf2..0ac0809 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -19,10 +19,13 @@ package org.apache.drill.exec.physical.impl.join;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
@@ -203,11 +206,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
   private int originalPartition = -1; // the partition a secondary reads from
   IntVector read_right_HV_vector; // HV vector that was read from the spilled batch
   private int maxBatchesInMemory;
-  private List<BloomFilter> bloomFilters = new ArrayList<>();
   private List<String> probeFields = new ArrayList<>(); // keep the same sequence with the bloomFilters
   private boolean enableRuntimeFilter;
   private RuntimeFilterReporter runtimeFilterReporter;
   private ValueVectorHashHelper.Hash64 hash64;
+  private Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
+  private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
+  private List<BloomFilter> bloomFilters = new ArrayList<>();
 
   /**
    * This holds information about the spilled partitions for the build and probe side.
@@ -757,6 +762,24 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
       enableRuntimeFilter = false;
       return;
     }
+    RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
+    List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
+    for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
+      String buildField = bloomFilterDef.getBuildField();
+      SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN);
+      TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
+      if (typedFieldId == null) {
+        missingField = true;
+        break;
+      }
+      int fieldId = typedFieldId.getFieldIds()[0];
+      bloomFilterDef2buildId.put(bloomFilterDef, fieldId);
+    }
+    if (missingField) {
+      logger.info("As some build side join key fields not found, runtime filter was disabled");
+      enableRuntimeFilter = false;
+      return;
+    }
     ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch, context);
     try {
       hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds);
@@ -799,9 +822,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
     if (!enableRuntimeFilter) {
       return;
     }
-    if (runtimeFilterReporter != null) {
-      return;
-    }
     runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context);
     RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
     //RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the
@@ -809,11 +829,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
     if (runtimeFilterDef != null) {
       List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
       for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
+        int buildFieldId = bloomFilterDef2buildId.get(bloomFilterDef);
         int numBytes = bloomFilterDef.getNumBytes();
         String probeField =  bloomFilterDef.getProbeField();
         probeFields.add(probeField);
         BloomFilter bloomFilter = new BloomFilter(numBytes, context.getAllocator());
         bloomFilters.add(bloomFilter);
+        bloomFilter2buildId.put(bloomFilter, buildFieldId);
       }
     }
   }
@@ -992,13 +1014,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
         //create runtime filter
         if (spilledState.isFirstCycle() && enableRuntimeFilter) {
           //create runtime filter and send out async
-          int condFieldIndex = 0;
-          for (BloomFilter bloomFilter : bloomFilters) {
+          for (BloomFilter bloomFilter : bloomFilter2buildId.keySet()) {
+            int fieldId = bloomFilter2buildId.get(bloomFilter);
             for (int ind = 0; ind < currentRecordCount; ind++) {
-              long hashCode = hash64.hash64Code(ind, 0, condFieldIndex);
+              long hashCode = hash64.hash64Code(ind, 0, fieldId);
               bloomFilter.insert(hashCode);
             }
-            condFieldIndex++;
           }
         }
 
@@ -1027,9 +1048,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
     }
 
     if (spilledState.isFirstCycle() && enableRuntimeFilter) {
-      if (bloomFilters.size() > 0) {
+      if (bloomFilter2buildId.size() > 0) {
         int hashJoinOpId = this.popConfig.getOperatorId();
-        runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId);
+        runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef(), hashJoinOpId);
       }
     }
 
@@ -1237,7 +1258,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
     RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
       "configured output batch size: %d", configuredBatchSize);
 
-    enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
+    enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER) && popConfig.getRuntimeFilterDef() != null;
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java
index 59e1622..1729027 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java
@@ -27,25 +27,29 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import java.io.IOException;
 import java.util.List;
 
-public class RuntimeFilterPrel extends SinglePrel{
+public class RuntimeFilterPrel extends SinglePrel {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterPrel.class);
 
-  public RuntimeFilterPrel(Prel child){
+  private long identifier;
+
+  public RuntimeFilterPrel(Prel child, long identifier){
     super(child.getCluster(), child.getTraitSet(), child);
+    this.identifier = identifier;
   }
 
-  public RuntimeFilterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) {
+  public RuntimeFilterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, long identifier) {
     super(cluster, traits, child);
+    this.identifier = identifier;
   }
 
   @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    return new RuntimeFilterPrel(this.getCluster(), traitSet, inputs.get(0));
+    return new RuntimeFilterPrel(this.getCluster(), traitSet, inputs.get(0), identifier);
   }
 
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
-    RuntimeFilterPOP r =  new RuntimeFilterPOP( ((Prel)getInput()).getPhysicalOperator(creator));
+    RuntimeFilterPOP r =  new RuntimeFilterPOP( ((Prel)getInput()).getPhysicalOperator(creator), identifier);
     return creator.addMetadata(this, r);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
index fcfa2bc..4d309ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.planner.physical.visitor;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinInfo;
@@ -28,7 +27,6 @@ import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.physical.BroadcastExchangePrel;
 import org.apache.drill.exec.planner.physical.ExchangePrel;
 import org.apache.drill.exec.planner.physical.HashAggPrel;
@@ -43,11 +41,14 @@ import org.apache.drill.exec.planner.physical.TopNPrel;
 import org.apache.drill.exec.work.filter.BloomFilter;
 import org.apache.drill.exec.work.filter.BloomFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
-
+import org.apache.drill.shaded.guava.com.google.common.collect.HashMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Multimap;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This visitor does two major things:
@@ -58,9 +59,14 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
 
   private Set<ScanPrel> toAddRuntimeFilter = new HashSet<>();
 
+  private Multimap<ScanPrel, HashJoinPrel> probeSideScan2hj = HashMultimap.create();
+
   private double fpp;
+
   private int bloomFilterMaxSizeInBytesDef;
 
+  private static final AtomicLong rfIdCounter = new AtomicLong();
+
   private RuntimeFilterVisitor(QueryContext queryContext) {
     this.bloomFilterMaxSizeInBytesDef = queryContext.getOption(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE_KEY).num_val.intValue();
     this.fpp = queryContext.getOption(ExecConstants.HASHJOIN_BLOOM_FILTER_FPP_KEY).float_val;
@@ -76,7 +82,7 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
   }
 
   public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
-    List<RelNode> children = Lists.newArrayList();
+    List<RelNode> children = new ArrayList<>();
     for (Prel child : prel) {
       child = child.accept(this, value);
       children.add(child);
@@ -100,8 +106,18 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
   @Override
   public Prel visitScan(ScanPrel prel, Void value) throws RuntimeException {
     if (toAddRuntimeFilter.contains(prel)) {
-      //Spawn a fresh RuntimeFilterPrel over the previous identified probe side scan node.
-      RuntimeFilterPrel runtimeFilterPrel = new RuntimeFilterPrel(prel);
+      //Spawn a fresh RuntimeFilterPrel over the previous identified probe side scan node or a runtime filter node.
+      Collection<HashJoinPrel> hashJoinPrels = probeSideScan2hj.get(prel);
+      RuntimeFilterPrel runtimeFilterPrel = null;
+      for (HashJoinPrel hashJoinPrel : hashJoinPrels) {
+        long identifier = rfIdCounter.incrementAndGet();
+        hashJoinPrel.getRuntimeFilterDef().setRuntimeFilterIdentifier(identifier);
+        if (runtimeFilterPrel == null) {
+          runtimeFilterPrel = new RuntimeFilterPrel(prel, identifier);
+        } else {
+          runtimeFilterPrel = new RuntimeFilterPrel(runtimeFilterPrel, identifier);
+        }
+      }
       return runtimeFilterPrel;
     } else {
       return prel;
@@ -134,13 +150,24 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
 
     List<BloomFilterDef> bloomFilterDefs = new ArrayList<>();
     //find the possible left scan node of the left join key
-    GroupScan groupScan = null;
+    ScanPrel probeSideScanPrel = null;
     RelNode left = hashJoinPrel.getLeft();
+    RelNode right = hashJoinPrel.getRight();
+    ExchangePrel exchangePrel = findRightExchangePrel(right);
+    if (exchangePrel == null) {
+      //Does not support the single fragment mode ,that is the right build side
+      //can only be BroadcastExchangePrel or HashToRandomExchangePrel
+      return null;
+    }
     List<String> leftFields = left.getRowType().getFieldNames();
+    List<String> rightFields = right.getRowType().getFieldNames();
     List<Integer> leftKeys = hashJoinPrel.getLeftKeys();
     RelMetadataQuery metadataQuery = left.getCluster().getMetadataQuery();
+    int i = 0;
     for (Integer leftKey : leftKeys) {
       String leftFieldName = leftFields.get(leftKey);
+      String rightFieldName = rightFields.get(i);
+      i++;
       //This also avoids the left field of the join condition with a function call.
       ScanPrel scanPrel = findLeftScanPrel(leftFieldName, left);
       if (scanPrel != null) {
@@ -160,17 +187,17 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
         int bloomFilterSizeInBytes = BloomFilter.optimalNumOfBytes(ndv.longValue(), fpp);
         bloomFilterSizeInBytes = bloomFilterSizeInBytes > bloomFilterMaxSizeInBytesDef ? bloomFilterMaxSizeInBytesDef : bloomFilterSizeInBytes;
         //left the local parameter to be set later.
-        BloomFilterDef bloomFilterDef = new BloomFilterDef(bloomFilterSizeInBytes, false, leftFieldName);
+        BloomFilterDef bloomFilterDef = new BloomFilterDef(bloomFilterSizeInBytes, false, leftFieldName, rightFieldName);
         bloomFilterDef.setLeftNDV(ndv);
         bloomFilterDefs.add(bloomFilterDef);
         toAddRuntimeFilter.add(scanPrel);
-        groupScan = scanPrel.getGroupScan();
+        probeSideScanPrel = scanPrel;
       }
     }
     if (bloomFilterDefs.size() > 0) {
       //left sendToForeman parameter to be set later.
-      RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false);
-      runtimeFilterDef.setProbeSideGroupScan(groupScan);
+      RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false, -1);
+      probeSideScan2hj.put(probeSideScanPrel, hashJoinPrel);
       return runtimeFilterDef;
     }
     return null;
@@ -265,6 +292,30 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
     }
   }
 
+  private ExchangePrel findRightExchangePrel(RelNode rightRelNode) {
+    if (rightRelNode instanceof ExchangePrel) {
+      return (ExchangePrel) rightRelNode;
+    }
+    if (rightRelNode instanceof ScanPrel) {
+      return null;
+    } else if (rightRelNode instanceof RelSubset) {
+      RelNode bestNode = ((RelSubset) rightRelNode).getBest();
+      if (bestNode != null) {
+        return findRightExchangePrel(bestNode);
+      } else {
+        return null;
+      }
+    } else {
+      List<RelNode> relNodes = rightRelNode.getInputs();
+      if (relNodes.size() == 1) {
+        RelNode leftNode = relNodes.get(0);
+        return findRightExchangePrel(leftNode);
+      } else {
+        return null;
+      }
+    }
+  }
+
   private boolean containBlockNode(Prel startNode, Prel endNode) {
     BlockNodeVisitor blockNodeVisitor = new BlockNodeVisitor();
     startNode.accept(blockNodeVisitor, endNode);
@@ -311,6 +362,11 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
         return null;
       }
 
+      if (currentPrel instanceof HashJoinPrel) {
+        encounteredBlockNode = true;
+        return null;
+      }
+
       for (Prel subPrel : currentPrel) {
         visitPrel(subPrel, endValue);
       }
@@ -349,4 +405,4 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
     }
 
   }
-}
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 37934c8..c97220c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -129,6 +129,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER),
       new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE),
       new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_FPP_VALIDATOR),
+      new OptionDefinition(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME),
+      new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_WAITING),
       // ------------------------------------------- Index planning related options BEGIN --------------------------------------------------------------
       new OptionDefinition(PlannerSettings.USE_SIMPLE_OPTIMIZER),
       new OptionDefinition(PlannerSettings.INDEX_PLANNING),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 0d97e0a..7915843 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -379,11 +379,16 @@ public class WorkManager implements AutoCloseable {
       return runningFragments.get(handle);
     }
 
+    /**
+     * receive the RuntimeFilter thorough the wire
+     * @param runtimeFilter
+     */
     public void receiveRuntimeFilter(final RuntimeFilterWritable runtimeFilter) {
       BitData.RuntimeFilterBDef runtimeFilterDef = runtimeFilter.getRuntimeFilterBDef();
       boolean toForeman = runtimeFilterDef.getToForeman();
       QueryId queryId = runtimeFilterDef.getQueryId();
       String queryIdStr = QueryIdHelper.getQueryId(queryId);
+      runtimeFilter.retainBuffers(1);
       //to foreman
       if (toForeman) {
         Foreman foreman = queries.get(queryId);
@@ -393,13 +398,14 @@ public class WorkManager implements AutoCloseable {
             public void run() {
               final Thread currentThread = Thread.currentThread();
               final String originalName = currentThread.getName();
-              currentThread.setName(queryIdStr + ":foreman:registerRuntimeFilter");
+              currentThread.setName(queryIdStr + ":foreman:routeRuntimeFilter");
               try {
-                foreman.getRuntimeFilterRouter().registerRuntimeFilter(runtimeFilter);
+                foreman.getRuntimeFilterRouter().register(runtimeFilter);
               } catch (Exception e) {
                 logger.warn("Exception while registering the RuntimeFilter", e);
               } finally {
                 currentThread.setName(originalName);
+                runtimeFilter.close();
               }
             }
           });
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
index dc6cc2f..afbc56a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
@@ -34,6 +34,7 @@ import java.util.Arrays;
 public class BloomFilter {
   // Bytes in a bucket.
   private static final int BYTES_PER_BUCKET = 32;
+
   // Minimum bloom filter data size.
   private static final int MINIMUM_BLOOM_SIZE_IN_BYTES = 256;
 
@@ -41,16 +42,14 @@ public class BloomFilter {
 
   private int numBytes;
 
-  private int mask[] = new int[8];
-
-  private byte[] tempBucket = new byte[32];
-
+  private int bucketMask[] = new int[8];
 
   public BloomFilter(int numBytes, BufferAllocator bufferAllocator) {
     int size = BloomFilter.adjustByteSize(numBytes);
     this.byteBuf = bufferAllocator.buffer(size);
     this.numBytes = byteBuf.capacity();
-    this.byteBuf.writerIndex(numBytes);
+    this.byteBuf.writeZero(this.numBytes);
+    this.byteBuf.writerIndex(this.numBytes);
   }
 
   public BloomFilter(int ndv, double fpp, BufferAllocator bufferAllocator) {
@@ -74,26 +73,27 @@ public class BloomFilter {
   }
 
   private void setMask(int key) {
-    //8 odd numbers act as salt value to participate in the computation of the mask.
+    //8 odd numbers act as salt value to participate in the computation of the bucketMask.
     final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
 
-    Arrays.fill(mask, 0);
+    Arrays.fill(bucketMask, 0);
 
     for (int i = 0; i < 8; ++i) {
-      mask[i] = key * SALT[i];
+      bucketMask[i] = key * SALT[i];
     }
 
     for (int i = 0; i < 8; ++i) {
-      mask[i] = mask[i] >> 27;
+      bucketMask[i] = bucketMask[i] >>> 27;
     }
 
     for (int i = 0; i < 8; ++i) {
-      mask[i] = 0x1 << mask[i];
+      bucketMask[i] = 0x1 << bucketMask[i];
     }
   }
 
   /**
    * Add an element's hash value to this bloom filter.
+   *
    * @param hash hash result of element.
    */
   public void insert(long hash) {
@@ -101,16 +101,13 @@ public class BloomFilter {
     int key = (int) hash;
     setMask(key);
     int initialStartIndex = bucketIndex * BYTES_PER_BUCKET;
-    byteBuf.getBytes(initialStartIndex, tempBucket);
     for (int i = 0; i < 8; i++) {
+      int index = initialStartIndex + i * 4;
       //every iterate batch,we set 32 bits
-      int bitsetIndex = i * 4;
-      tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) (mask[i] >>> 24));
-      tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | (byte) (mask[i] >>> 16));
-      tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | (byte) (mask[i] >>> 8));
-      tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | (byte) (mask[i]));
+      int a = byteBuf.getInt(index);
+      a |= bucketMask[i];
+      byteBuf.setInt(index, a);
     }
-    byteBuf.setBytes(initialStartIndex, tempBucket);
   }
 
   /**
@@ -123,17 +120,12 @@ public class BloomFilter {
     int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1);
     int key = (int) hash;
     setMask(key);
-
     int startIndex = bucketIndex * BYTES_PER_BUCKET;
-    byteBuf.getBytes(startIndex, tempBucket);
     for (int i = 0; i < 8; i++) {
-      byte set = 0;
-      int bitsetIndex = i * 4;
-      set |= tempBucket[bitsetIndex] & ((byte) (mask[i] >>> 24));
-      set |= tempBucket[(bitsetIndex + 1)] & ((byte) (mask[i] >>> 16));
-      set |= tempBucket[(bitsetIndex + 2)] & ((byte) (mask[i] >>> 8));
-      set |= tempBucket[(bitsetIndex + 3)] & ((byte) mask[i]);
-      if (0 == set) {
+      int index = startIndex + i * 4;
+      int a = byteBuf.getInt(index);
+      int b = a & bucketMask[i];
+      if (b == 0) {
         return false;
       }
     }
@@ -142,6 +134,7 @@ public class BloomFilter {
 
   /**
    * Merge this bloom filter with other one
+   *
    * @param other
    */
   public void or(BloomFilter other) {
@@ -150,20 +143,19 @@ public class BloomFilter {
     Preconditions.checkArgument(otherLength == thisLength);
     Preconditions.checkState(otherLength % BYTES_PER_BUCKET == 0);
     Preconditions.checkState(thisLength % BYTES_PER_BUCKET == 0);
-    byte[] otherTmpBucket = new byte[BYTES_PER_BUCKET];
-    for (int i = 0; i < thisLength / BYTES_PER_BUCKET; i++) {
-      byteBuf.getBytes(i * BYTES_PER_BUCKET, tempBucket);
-      other.byteBuf.getBytes(i * BYTES_PER_BUCKET, otherTmpBucket);
-      for (int j = 0; j < BYTES_PER_BUCKET; j++) {
-        tempBucket[j] = (byte) (tempBucket[j] | otherTmpBucket[j]);
-      }
-      this.byteBuf.setBytes(i, tempBucket);
+    for (int i = 0; i < thisLength / 8; i++) {
+      int index = i * 8;
+      long a = byteBuf.getLong(index);
+      long b = other.byteBuf.getLong(index);
+      long c = a | b;
+      byteBuf.setLong(index, c);
     }
   }
 
   /**
    * Calculate optimal size according to the number of distinct values and false positive probability.
    * See http://en.wikipedia.org/wiki/Bloom_filter#Probability_of_false_positives for the formula.
+   *
    * @param ndv: The number of distinct values.
    * @param fpp: The false positive probability.
    * @return optimal number of bytes of given ndv and fpp.
@@ -177,7 +169,7 @@ public class BloomFilter {
     bits |= bits >> 8;
     bits |= bits >> 16;
     bits++;
-    int bytes = bits/8;
+    int bytes = bits / 8;
     return bytes;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java
index 9a6df57..b2a9bd7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java
@@ -28,6 +28,8 @@ public class BloomFilterDef {
   private boolean local;
 
   private String probeField;
+
+  private String buildField;
   //TODO
   @JsonIgnore
   private Double leftNDV;
@@ -37,10 +39,11 @@ public class BloomFilterDef {
 
   @JsonCreator
   public BloomFilterDef(@JsonProperty("numBytes") int numBytes, @JsonProperty("local") boolean local, @JsonProperty("probeField")
-                        String probeField){
+                        String probeField, @JsonProperty("buildField") String buildField){
     this.numBytes = numBytes;
     this.local = local;
     this.probeField = probeField;
+    this.buildField = buildField;
   }
 
 
@@ -61,7 +64,7 @@ public class BloomFilterDef {
   }
 
   public String toString() {
-    return "BF:{numBytes=" + numBytes + ",send2Foreman=" + !local + ",probeField= " + probeField + " }";
+    return "BF:{numBytes=" + numBytes + ",send2Foreman=" + !local + ",probeField= " + probeField + ",buildField= " + buildField + " }";
   }
 
   @JsonIgnore
@@ -82,4 +85,9 @@ public class BloomFilterDef {
     this.rightNDV = rightNDV;
   }
 
+  public String getBuildField()
+  {
+    return buildField;
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java
index 5fb51bf..efe300f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java
@@ -18,13 +18,8 @@
 package org.apache.drill.exec.work.filter;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.drill.exec.physical.base.GroupScan;
-
-
 import java.util.List;
 
 @JsonIgnoreProperties(ignoreUnknown = true)
@@ -37,17 +32,18 @@ public class RuntimeFilterDef {
   private List<BloomFilterDef> bloomFilterDefs;
 
   private boolean sendToForeman;
-  @JsonIgnore
-  private GroupScan probeSideGroupScan;
 
+  private long runtimeFilterIdentifier;
 
   @JsonCreator
   public RuntimeFilterDef(@JsonProperty("generateBloomFilter") boolean generateBloomFilter, @JsonProperty("generateMinMaxFilter") boolean generateMinMaxFilter,
-                          @JsonProperty("bloomFilterDefs") List<BloomFilterDef> bloomFilterDefs, @JsonProperty("sendToForeman") boolean sendToForeman) {
+                          @JsonProperty("bloomFilterDefs") List<BloomFilterDef> bloomFilterDefs, @JsonProperty("sendToForeman") boolean sendToForeman,
+                          @JsonProperty("runtimeFilterIdentifier") long runtimeFilterIdentifier) {
     this.generateBloomFilter = generateBloomFilter;
     this.generateMinMaxFilter = generateMinMaxFilter;
     this.bloomFilterDefs = bloomFilterDefs;
     this.sendToForeman = sendToForeman;
+    this.runtimeFilterIdentifier = runtimeFilterIdentifier;
   }
 
 
@@ -84,12 +80,11 @@ public class RuntimeFilterDef {
     this.sendToForeman = sendToForeman;
   }
 
-  @JsonIgnore
-  public GroupScan getProbeSideGroupScan() {
-    return probeSideGroupScan;
+  public long getRuntimeFilterIdentifier() {
+    return runtimeFilterIdentifier;
   }
 
-  public void setProbeSideGroupScan(GroupScan probeSideGroupScan) {
-    this.probeSideGroupScan = probeSideGroupScan;
+  public void setRuntimeFilterIdentifier(long runtimeFilterIdentifier) {
+    this.runtimeFilterIdentifier = runtimeFilterIdentifier;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
index 6e4a9a8..93736c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
@@ -39,7 +39,9 @@ public class RuntimeFilterReporter {
     this.context = context;
   }
 
-  public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman, int hashJoinOpId) {
+  public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, RuntimeFilterDef runtimeFilterDef, int hashJoinOpId) {
+    boolean sendToForeman = runtimeFilterDef.isSendToForeman();
+    long rfIdentifier = runtimeFilterDef.getRuntimeFilterIdentifier();
     ExecProtos.FragmentHandle fragmentHandle = context.getHandle();
     DrillBuf[] data = new DrillBuf[bloomFilters.size()];
     List<Integer> bloomFilterSizeInBytes = new ArrayList<>();
@@ -64,6 +66,7 @@ public class RuntimeFilterReporter {
       .setMinorFragmentId(minorFragmentId)
       .setToForeman(sendToForeman)
       .setHjOpId(hashJoinOpId)
+      .setRfIdentifier(rfIdentifier)
       .addAllBloomFilterSizeInBytes(bloomFilterSizeInBytes)
       .build();
     RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterB, data);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
index 5a8c6fc..a4946a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
@@ -17,39 +17,24 @@
  */
 package org.apache.drill.exec.work.filter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import io.netty.buffer.DrillBuf;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.drill.exec.ops.AccountingDataTunnel;
-import org.apache.drill.exec.ops.Consumer;
 import org.apache.drill.exec.ops.SendingAccountor;
-import org.apache.drill.exec.ops.StatusHandler;
 import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.Exchange;
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.RuntimeFilterPOP;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.Wrapper;
-import org.apache.drill.exec.proto.BitData;
 import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.proto.GeneralRPCProtos;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * This class manages the RuntimeFilter routing information of the pushed down join predicate
@@ -69,29 +54,24 @@ import java.util.concurrent.ConcurrentHashMap;
 public class RuntimeFilterRouter {
 
   private Wrapper rootWrapper;
-  //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
-  private Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probdeScanEps = new HashMap<>();
-  //HashJoin node's major fragment id to its corresponding probe side nodes's number
-  private Map<Integer, Integer> joinMjId2scanSize = new ConcurrentHashMap<>();
-  //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
-  private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
-
-  private DrillbitContext drillbitContext;
 
   private SendingAccountor sendingAccountor = new SendingAccountor();
 
+  private RuntimeFilterSink runtimeFilterSink;
+
   private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRouter.class);
 
   /**
    * This class maintains context for the runtime join push down's filter management. It
    * does a traversal of the physical operators by leveraging the root wrapper which indirectly
    * holds the global PhysicalOperator tree and contains the minor fragment endpoints.
+   *
    * @param workUnit
    * @param drillbitContext
    */
   public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext drillbitContext) {
     this.rootWrapper = workUnit.getRootWrapper();
-    this.drillbitContext = drillbitContext;
+    runtimeFilterSink = new RuntimeFilterSink(drillbitContext, sendingAccountor);
   }
 
   /**
@@ -99,6 +79,12 @@ public class RuntimeFilterRouter {
    * record the relationship between the RuntimeFilter producers and consumers.
    */
   public void collectRuntimeFilterParallelAndControlInfo() {
+    //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
+    Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps = new HashMap<>();
+    //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
+    Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
+    Map<Integer, Integer> joinMjId2rfNumber = new HashMap<>();
+
     RuntimeFilterParallelismCollector runtimeFilterParallelismCollector = new RuntimeFilterParallelismCollector();
     rootWrapper.getNode().getRoot().accept(runtimeFilterParallelismCollector, null);
     List<RFHelperHolder> holders = runtimeFilterParallelismCollector.getHolders();
@@ -107,67 +93,33 @@ public class RuntimeFilterRouter {
       List<CoordinationProtos.DrillbitEndpoint> probeSideEndpoints = holder.getProbeSideScanEndpoints();
       int probeSideScanMajorId = holder.getProbeSideScanMajorId();
       int joinNodeMajorId = holder.getJoinMajorId();
+      int buildSideRfNumber = holder.getBuildSideRfNumber();
       RuntimeFilterDef runtimeFilterDef = holder.getRuntimeFilterDef();
       boolean sendToForeman = runtimeFilterDef.isSendToForeman();
       if (sendToForeman) {
         //send RuntimeFilter to Foreman
-        joinMjId2probdeScanEps.put(joinNodeMajorId, probeSideEndpoints);
-        joinMjId2scanSize.put(joinNodeMajorId, probeSideEndpoints.size());
+        joinMjId2probeScanEps.put(joinNodeMajorId, probeSideEndpoints);
         joinMjId2ScanMjId.put(joinNodeMajorId, probeSideScanMajorId);
+        joinMjId2rfNumber.put(joinNodeMajorId, buildSideRfNumber);
       }
     }
+    runtimeFilterSink.setJoinMjId2probeScanEps(joinMjId2probeScanEps);
+    runtimeFilterSink.setJoinMjId2rfNumber(joinMjId2rfNumber);
+    runtimeFilterSink.setJoinMjId2ScanMjId(joinMjId2ScanMjId);
   }
 
-
   public void waitForComplete() {
     sendingAccountor.waitForSendComplete();
+    runtimeFilterSink.close();
   }
 
   /**
    * This method is passively invoked by receiving a runtime filter from the network
-   * @param runtimeFilterWritable
+   *
+   * @param srcRuntimeFilterWritable
    */
-  public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) {
-    broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
-  }
-
-
-  private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) {
-    BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
-    int joinMajorId = runtimeFilterB.getMajorFragmentId();
-    UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
-    List<String> probeFields = runtimeFilterB.getProbeFieldsList();
-    DrillBuf[] data = srcRuntimeFilterWritable.getData();
-    List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probdeScanEps.get(joinMajorId);
-    int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
-    for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
-      BitData.RuntimeFilterBDef.Builder builder = BitData.RuntimeFilterBDef.newBuilder();
-      for (String probeField : probeFields) {
-        builder.addProbeFields(probeField);
-      }
-      BitData.RuntimeFilterBDef runtimeFilterBDef = builder
-        .setQueryId(queryId)
-        .setMajorFragmentId(scanNodeMjId)
-        .setMinorFragmentId(minorId)
-        .build();
-      RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data);
-      CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId);
-      DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint);
-      Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
-        @Override
-        public void accept(final RpcException e) {
-          logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
-        }
-
-        @Override
-        public void interrupt(final InterruptedException e) {
-          logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
-        }
-      };
-      RpcOutcomeListener<GeneralRPCProtos.Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
-      AccountingDataTunnel accountingDataTunnel = new AccountingDataTunnel(dataTunnel, sendingAccountor, statusHandler);
-      accountingDataTunnel.sendRuntimeFilter(runtimeFilterWritable);
-    }
+  public void register(RuntimeFilterWritable srcRuntimeFilterWritable) {
+    runtimeFilterSink.add(srcRuntimeFilterWritable);
   }
 
   /**
@@ -183,18 +135,29 @@ public class RuntimeFilterRouter {
       boolean isHashJoinOp = op instanceof HashJoinPOP;
       if (isHashJoinOp) {
         HashJoinPOP hashJoinPOP = (HashJoinPOP) op;
+        int hashJoinOpId = hashJoinPOP.getOperatorId();
         RuntimeFilterDef runtimeFilterDef = hashJoinPOP.getRuntimeFilterDef();
-        if (runtimeFilterDef != null) {
-          if (holder == null) {
-            holder = new RFHelperHolder();
+        if (runtimeFilterDef != null && runtimeFilterDef.isSendToForeman()) {
+          if (holder == null || holder.getJoinOpId() != hashJoinOpId) {
+            holder = new RFHelperHolder(hashJoinOpId);
             holders.add(holder);
           }
           holder.setRuntimeFilterDef(runtimeFilterDef);
-          GroupScan probeSideScanOp = runtimeFilterDef.getProbeSideGroupScan();
-          Wrapper container = findPhysicalOpContainer(rootWrapper, hashJoinPOP);
+          long runtimeFilterIdentifier = runtimeFilterDef.getRuntimeFilterIdentifier();
+          WrapperOperatorsVisitor operatorsVisitor = new WrapperOperatorsVisitor(hashJoinPOP);
+          Wrapper container = findTargetWrapper(rootWrapper, operatorsVisitor);
+          if (container == null) {
+            throw new IllegalStateException(String.format("No valid Wrapper found for HashJoinPOP with id=%d", hashJoinPOP.getOperatorId()));
+          }
+          int buildSideRFNumber = container.getAssignedEndpoints().size();
+          holder.setBuildSideRfNumber(buildSideRFNumber);
           int majorFragmentId = container.getMajorFragmentId();
           holder.setJoinMajorId(majorFragmentId);
-          Wrapper probeSideScanContainer = findPhysicalOpContainer(rootWrapper, probeSideScanOp);
+          WrapperRuntimeFilterOperatorsVisitor runtimeFilterOperatorsVisitor = new WrapperRuntimeFilterOperatorsVisitor(runtimeFilterIdentifier);
+          Wrapper probeSideScanContainer = findTargetWrapper(container, runtimeFilterOperatorsVisitor);
+          if (probeSideScanContainer == null) {
+            throw new IllegalStateException(String.format("No valid Wrapper found for RuntimeFilterPOP with id=%d", op.getOperatorId()));
+          }
           int probeSideScanMjId = probeSideScanContainer.getMajorFragmentId();
           List<CoordinationProtos.DrillbitEndpoint> probeSideScanEps = probeSideScanContainer.getAssignedEndpoints();
           holder.setProbeSideScanEndpoints(probeSideScanEps);
@@ -209,59 +172,63 @@ public class RuntimeFilterRouter {
     }
   }
 
-  private class WrapperOperatorsVisitor extends AbstractPhysicalVisitor<Void, Void, RuntimeException> {
+  private Wrapper findTargetWrapper(Wrapper wrapper, TargetPhysicalOperatorVisitor targetOpVisitor) {
+    targetOpVisitor.setCurrentFragment(wrapper.getNode());
+    wrapper.getNode().getRoot().accept(targetOpVisitor, null);
+    boolean contain = targetOpVisitor.isContain();
+    if (contain) {
+      return wrapper;
+    }
+    List<Wrapper> dependencies = wrapper.getFragmentDependencies();
+    if (CollectionUtils.isEmpty(dependencies)) {
+      return null;
+    }
+    for (Wrapper dependencyWrapper : dependencies) {
+      Wrapper opContainer = findTargetWrapper(dependencyWrapper, targetOpVisitor);
+      if (opContainer != null) {
+        return opContainer;
+      }
+    }
+    return null;
+  }
 
-    private Fragment fragment;
+  private abstract class TargetPhysicalOperatorVisitor<T, X, E extends Throwable> extends AbstractPhysicalVisitor<T, X, E> {
 
-    private boolean contain = false;
+    protected Exchange sendingExchange;
 
-    private boolean targetIsGroupScan;
+    public void setCurrentFragment(Fragment fragment) {
+      sendingExchange = fragment.getSendingExchange();
+    }
 
-    private boolean targetIsHashJoin;
+    public abstract boolean isContain();
+  }
 
-    private String targetGroupScanDigest;
+  private class WrapperOperatorsVisitor extends TargetPhysicalOperatorVisitor<Void, Void, RuntimeException> {
 
-    private String targetHashJoinJson;
+    private boolean contain = false;
 
+    private PhysicalOperator targetOp;
 
-    public WrapperOperatorsVisitor(PhysicalOperator targetOp, Fragment fragment) {
-      this.fragment = fragment;
-      this.targetIsGroupScan = targetOp instanceof GroupScan;
-      this.targetIsHashJoin = targetOp instanceof HashJoinPOP;
-      this.targetGroupScanDigest = targetIsGroupScan ? ((GroupScan) targetOp).getDigest() : null;
-      this.targetHashJoinJson = targetIsHashJoin ? jsonOfPhysicalOp(targetOp) : null;
+    public WrapperOperatorsVisitor(PhysicalOperator targetOp) {
+      this.targetOp = targetOp;
     }
 
     @Override
     public Void visitExchange(Exchange exchange, Void value) throws RuntimeException {
-      List<Fragment.ExchangeFragmentPair> exchangeFragmentPairs = fragment.getReceivingExchangePairs();
-      for (Fragment.ExchangeFragmentPair exchangeFragmentPair : exchangeFragmentPairs) {
-        boolean same = exchange == exchangeFragmentPair.getExchange();
-        if (same) {
-          return null;
-        }
+      if (exchange != sendingExchange) {
+        return null;
       }
       return exchange.getChild().accept(this, value);
     }
 
     @Override
     public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException {
-      boolean same = false;
-      if (targetIsGroupScan && op instanceof GroupScan) {
-        //Since GroupScan may be rewrite during the planing, here we use the digest to identify it.
-        String currentDigest = ((GroupScan) op).getDigest();
-        same = targetGroupScanDigest.equals(currentDigest);
-      }
-      if (targetIsHashJoin && op instanceof HashJoinPOP) {
-        String currentOpJson = jsonOfPhysicalOp(op);
-        same = targetHashJoinJson.equals(currentOpJson);
-      }
-      if (!same) {
+      if (op == targetOp) {
+        contain = true;
+      } else {
         for (PhysicalOperator child : op) {
           child.accept(this, value);
         }
-      } else {
-        contain = true;
       }
       return null;
     }
@@ -269,42 +236,57 @@ public class RuntimeFilterRouter {
     public boolean isContain() {
       return contain;
     }
+  }
+
+  private class WrapperRuntimeFilterOperatorsVisitor extends TargetPhysicalOperatorVisitor<Void, Void, RuntimeException> {
+
+    private boolean contain = false;
+
+    private long identifier;
+
+
+    public WrapperRuntimeFilterOperatorsVisitor(long identifier) {
+      this.identifier = identifier;
+    }
 
-    public String jsonOfPhysicalOp(PhysicalOperator operator) {
-      try {
-        ObjectMapper objectMapper = new ObjectMapper();
-        StringWriter stringWriter = new StringWriter();
-        objectMapper.writeValue(stringWriter, operator);
-        return stringWriter.toString();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
+    @Override
+    public Void visitExchange(Exchange exchange, Void value) throws RuntimeException {
+      if (exchange != sendingExchange) {
+        return null;
       }
+      return exchange.getChild().accept(this, value);
     }
-  }
 
-  private boolean containsPhysicalOperator(Wrapper wrapper, PhysicalOperator op) {
-    WrapperOperatorsVisitor wrapperOpsVistitor = new WrapperOperatorsVisitor(op, wrapper.getNode());
-    wrapper.getNode().getRoot().accept(wrapperOpsVistitor, null);
-    return wrapperOpsVistitor.isContain();
-  }
+    @Override
+    public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException {
+      boolean same;
+      boolean isRuntimeFilterPop = op instanceof RuntimeFilterPOP;
+      boolean isHashJoinPop = op instanceof HashJoinPOP;
 
-  private Wrapper findPhysicalOpContainer(Wrapper wrapper, PhysicalOperator op) {
-    boolean contain = containsPhysicalOperator(wrapper, op);
-    if (contain) {
-      return wrapper;
-    }
-    List<Wrapper> dependencies = wrapper.getFragmentDependencies();
-    if (CollectionUtils.isEmpty(dependencies)) {
+      if (isHashJoinPop) {
+        HashJoinPOP hashJoinPOP = (HashJoinPOP) op;
+        PhysicalOperator leftPop = hashJoinPOP.getLeft();
+        leftPop.accept(this, value);
+        return null;
+      }
+
+      if (isRuntimeFilterPop) {
+        RuntimeFilterPOP runtimeFilterPOP = (RuntimeFilterPOP) op;
+        same = this.identifier == runtimeFilterPOP.getIdentifier();
+        if (same) {
+          contain = true;
+          return null;
+        }
+      }
+      for (PhysicalOperator child : op) {
+        child.accept(this, value);
+      }
       return null;
     }
-    for (Wrapper dependencyWrapper : dependencies) {
-      Wrapper opContainer = findPhysicalOpContainer(dependencyWrapper, op);
-      if (opContainer != null) {
-        return opContainer;
-      }
+
+    public boolean isContain() {
+      return contain;
     }
-    //should not be here
-    throw new IllegalStateException(String.format("No valid Wrapper found for physicalOperator with id=%d", op.getOperatorId()));
   }
 
   /**
@@ -320,6 +302,22 @@ public class RuntimeFilterRouter {
 
     private RuntimeFilterDef runtimeFilterDef;
 
+    private int joinOpId;
+
+    private int buildSideRfNumber;
+
+    public RFHelperHolder(int joinOpId) {
+      this.joinOpId = joinOpId;
+    }
+
+    public int getJoinOpId() {
+      return joinOpId;
+    }
+
+    public void setJoinOpId(int joinOpId) {
+      this.joinOpId = joinOpId;
+    }
+
     public List<CoordinationProtos.DrillbitEndpoint> getProbeSideScanEndpoints() {
       return probeSideScanEndpoints;
     }
@@ -352,5 +350,13 @@ public class RuntimeFilterRouter {
     public void setRuntimeFilterDef(RuntimeFilterDef runtimeFilterDef) {
       this.runtimeFilterDef = runtimeFilterDef;
     }
+
+    public int getBuildSideRfNumber() {
+      return buildSideRfNumber;
+    }
+
+    public void setBuildSideRfNumber(int buildSideRfNumber) {
+      this.buildSideRfNumber = buildSideRfNumber;
+    }
   }
-}
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
index 1468625..f69a44e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -17,206 +17,250 @@
  */
 package org.apache.drill.exec.work.filter;
 
-import org.apache.drill.exec.memory.BufferAllocator;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * This sink receives the RuntimeFilters from the netty thread,
- * aggregates them in an async thread, supplies the aggregated
- * one to the fragment running thread.
+ * aggregates them in an async thread, broadcast the final aggregated
+ * one to the RuntimeFilterRecordBatch.
  */
-public class RuntimeFilterSink implements AutoCloseable {
-
-  private AtomicInteger currentBookId = new AtomicInteger(0);
+public class RuntimeFilterSink implements Closeable
+{
 
-  private int staleBookId = 0;
+  private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
 
-  /**
-   * RuntimeFilterWritable holding the aggregated version of all the received filter
-   */
-  private RuntimeFilterWritable aggregated = null;
+  private Map<Integer, Integer> joinMjId2rfNumber;
 
-  private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
+  private Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps = new HashMap<>();
 
-  /**
-   * Flag used by Minor Fragment thread to indicate it has encountered error
-   */
-  private AtomicBoolean running = new AtomicBoolean(true);
+  //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
+  private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
 
-  /**
-   * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this
-   * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at
-   * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to
-   * indicate producer not to put any new elements in it.
-   */
-  private ReentrantLock queueLock = new ReentrantLock();
+  //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
+  private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new HashMap<>();
+  //for debug usage
+  private Map<Integer, Stopwatch> joinMjId2Stopwatch = new HashMap<>();
 
-  private Condition notEmpty = queueLock.newCondition();
+  private DrillbitContext drillbitContext;
 
-  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+  private SendingAccountor sendingAccountor;
 
-  private BufferAllocator bufferAllocator;
+  private  AsyncAggregateWorker asyncAggregateWorker;
 
-  private Future future;
+  private AtomicBoolean running = new AtomicBoolean(true);
 
   private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class);
 
 
-  public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) {
-    this.bufferAllocator = bufferAllocator;
-    AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-    future = executorService.submit(asyncAggregateWorker);
+  public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor)
+  {
+    this.drillbitContext = drillbitContext;
+    this.sendingAccountor = sendingAccountor;
+    asyncAggregateWorker = new AsyncAggregateWorker();
+    drillbitContext.getExecutor().submit(asyncAggregateWorker);
   }
 
-  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-    if (running.get()) {
-      try {
-        aggregatedRFLock.lock();
-        if (containOne()) {
-          boolean same = aggregated.equals(runtimeFilterWritable);
-          if (!same) {
-            // This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
-            // share the same FragmentContext.
-            aggregated.close();
-            currentBookId.set(0);
-            staleBookId = 0;
-            clearQueued(false);
-          }
-        }
-      } finally {
-        aggregatedRFLock.unlock();
-      }
+  public void add(RuntimeFilterWritable runtimeFilterWritable)
+  {
+    if (!running.get()) {
+      runtimeFilterWritable.close();
+      return;
+    }
+    runtimeFilterWritable.retainBuffers(1);
+    int joinMjId = runtimeFilterWritable.getRuntimeFilterBDef().getMajorFragmentId();
+    if (joinMjId2Stopwatch.get(joinMjId) == null) {
+      Stopwatch stopwatch = Stopwatch.createStarted();
+      joinMjId2Stopwatch.put(joinMjId, stopwatch);
+    }
+    synchronized (rfQueue) {
+      rfQueue.add(runtimeFilterWritable);
+      rfQueue.notify();
+    }
+  }
 
+  public void close() {
+    running.set(false);
+    if (asyncAggregateWorker != null) {
+      synchronized (rfQueue) {
+        rfQueue.notify();
+      }
+    }
+    while (!asyncAggregateWorker.over.get()) {
       try {
-        queueLock.lock();
-        if (rfQueue != null) {
-          rfQueue.add(runtimeFilterWritable);
-          notEmpty.signal();
-        } else {
-          runtimeFilterWritable.close();
-        }
-      } finally {
-        queueLock.unlock();
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        logger.error("interrupted while sleeping to wait for the aggregating worker thread to exit", e);
       }
-    } else {
+    }
+    for (RuntimeFilterWritable runtimeFilterWritable : joinMjId2AggregatedRF.values()) {
       runtimeFilterWritable.close();
     }
   }
 
-  public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() {
-    try {
-      aggregatedRFLock.lock();
-      return aggregated.duplicate(bufferAllocator);
-    } finally {
-      aggregatedRFLock.unlock();
+  private void aggregate(RuntimeFilterWritable srcRuntimeFilterWritable)
+  {
+    BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
+    int joinMajorId = runtimeFilterB.getMajorFragmentId();
+    int buildSideRfNumber;
+    RuntimeFilterWritable toAggregated = null;
+    buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId);
+    buildSideRfNumber--;
+    joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber);
+    toAggregated = joinMjId2AggregatedRF.get(joinMajorId);
+    if (toAggregated == null) {
+      toAggregated = srcRuntimeFilterWritable;
+      toAggregated.retainBuffers(1);
+    } else {
+      toAggregated.aggregate(srcRuntimeFilterWritable);
     }
-  }
-
-  /**
-   * whether there's a fresh aggregated RuntimeFilter
-   *
-   * @return
-   */
-  public boolean hasFreshOne() {
-    if (currentBookId.get() > staleBookId) {
-      staleBookId = currentBookId.get();
-      return true;
+    joinMjId2AggregatedRF.put(joinMajorId, toAggregated);
+    if (buildSideRfNumber == 0) {
+      joinMjId2AggregatedRF.remove(joinMajorId);
+      route(toAggregated);
+      joinMjId2rfNumber.remove(joinMajorId);
+      Stopwatch stopwatch = joinMjId2Stopwatch.get(joinMajorId);
+      logger.info(
+          "received all the RFWs belonging to the majorId {}'s HashJoin nodes and flushed aggregated RFW out elapsed {} ms",
+          joinMajorId,
+          stopwatch.elapsed(TimeUnit.MILLISECONDS)
+      );
     }
-    return false;
-  }
-
-  /**
-   * whether there's a usable RuntimeFilter.
-   *
-   * @return
-   */
-  public boolean containOne() {
-    return aggregated != null;
   }
 
-  private void doCleanup() {
-    running.compareAndSet(true, false);
-    try {
-      aggregatedRFLock.lock();
-      if (containOne()) {
-        aggregated.close();
-        aggregated = null;
+  private void route(RuntimeFilterWritable srcRuntimeFilterWritable)
+  {
+    BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
+    int joinMajorId = runtimeFilterB.getMajorFragmentId();
+    UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
+    List<String> probeFields = runtimeFilterB.getProbeFieldsList();
+    List<Integer> sizeInBytes = runtimeFilterB.getBloomFilterSizeInBytesList();
+    long rfIdentifier = runtimeFilterB.getRfIdentifier();
+    DrillBuf[] data = srcRuntimeFilterWritable.getData();
+    List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probeScanEps.get(joinMajorId);
+    int scanNodeSize = scanNodeEps.size();
+    srcRuntimeFilterWritable.retainBuffers(scanNodeSize - 1);
+    int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
+    for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
+      BitData.RuntimeFilterBDef.Builder builder = BitData.RuntimeFilterBDef.newBuilder();
+      for (String probeField : probeFields) {
+        builder.addProbeFields(probeField);
       }
-    } finally {
-      aggregatedRFLock.unlock();
+      BitData.RuntimeFilterBDef runtimeFilterBDef = builder.setQueryId(queryId)
+                                                           .setMajorFragmentId(scanNodeMjId)
+                                                           .setMinorFragmentId(minorId)
+                                                           .setToForeman(false)
+                                                           .setRfIdentifier(rfIdentifier)
+                                                           .addAllBloomFilterSizeInBytes(sizeInBytes)
+                                                           .build();
+      RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data);
+      CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId);
+
+      DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint);
+      Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>()
+      {
+        @Override
+        public void accept(final RpcException e)
+        {
+          logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
+        }
+
+        @Override
+        public void interrupt(final InterruptedException e)
+        {
+          logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
+        }
+      };
+      RpcOutcomeListener<GeneralRPCProtos.Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
+      AccountingDataTunnel accountingDataTunnel = new AccountingDataTunnel(dataTunnel, sendingAccountor, statusHandler);
+      accountingDataTunnel.sendRuntimeFilter(runtimeFilterWritable);
     }
   }
 
-  @Override
-  public void close() throws Exception {
-    future.cancel(true);
-    doCleanup();
+  public void setJoinMjId2rfNumber(Map<Integer, Integer> joinMjId2rfNumber)
+  {
+    this.joinMjId2rfNumber = joinMjId2rfNumber;
   }
 
-  private void clearQueued(boolean setToNull) {
-    RuntimeFilterWritable toClear;
-    try {
-      queueLock.lock();
-      while (rfQueue != null && (toClear = rfQueue.poll()) != null) {
-        toClear.close();
-      }
-      rfQueue = (setToNull) ? null : rfQueue;
-    } finally {
-      queueLock.unlock();
-    }
+  public void setJoinMjId2probeScanEps(Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps)
+  {
+    this.joinMjId2probeScanEps = joinMjId2probeScanEps;
   }
 
-  private class AsyncAggregateWorker implements Runnable {
+  public void setJoinMjId2ScanMjId(Map<Integer, Integer> joinMjId2ScanMjId)
+  {
+    this.joinMjId2ScanMjId = joinMjId2ScanMjId;
+  }
+
+  private class AsyncAggregateWorker implements Runnable
+  {
+    private AtomicBoolean over = new AtomicBoolean(false);
 
     @Override
-    public void run() {
-      try {
+    public void run()
+    {
+      while ((joinMjId2rfNumber == null || !joinMjId2rfNumber.isEmpty() ) && running.get()) {
         RuntimeFilterWritable toAggregate = null;
-        while (running.get()) {
+        synchronized (rfQueue) {
           try {
-            queueLock.lock();
-            toAggregate = (rfQueue != null) ? rfQueue.poll() :  null;
-            if (toAggregate == null) {
-              notEmpty.await();
-              continue;
+            toAggregate = rfQueue.poll();
+            while (toAggregate == null && running.get()) {
+              rfQueue.wait();
+              toAggregate = rfQueue.poll();
             }
-          } finally {
-            queueLock.unlock();
+          } catch (InterruptedException ex) {
+            logger.error("RFW_Aggregator thread being interrupted", ex);
+            continue;
           }
-
-          try {
-            aggregatedRFLock.lock();
-            if (containOne()) {
-              aggregated.aggregate(toAggregate);
-
-              // Release the byteBuf referenced by toAggregate since aggregate will not do it
-              toAggregate.close();
-            } else {
-              aggregated = toAggregate;
-            }
-          } finally {
-            aggregatedRFLock.unlock();
+        }
+        if (toAggregate == null) {
+          continue;
+        }
+        // perform aggregate outside the sync block.
+        try {
+          aggregate(toAggregate);
+        } catch (Exception ex) {
+          logger.error("Failed to aggregate or route the RFW", ex);
+          throw new DrillRuntimeException(ex);
+        } finally {
+          if (toAggregate != null) {
+            toAggregate.close();
           }
-          currentBookId.incrementAndGet();
         }
-      } catch (InterruptedException e) {
-        logger.info("RFAggregating Thread : {} was interrupted.", Thread.currentThread().getName());
-        Thread.currentThread().interrupt();
-      } finally {
-        doCleanup();
-        clearQueued(true);
       }
+
+      if (!running.get()) {
+        RuntimeFilterWritable toClose;
+        while ((toClose = rfQueue.poll()) != null) {
+          toClose.close();
+        }
+      }
+      over.set(true);
     }
   }
 }
-
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index 9a971e9..f8c2701 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -103,6 +103,27 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
     return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
   }
 
+  public void retainBuffers(final int increment) {
+    if (increment <= 0) {
+      return;
+    }
+    for (final DrillBuf buf : data) {
+      buf.retain(increment);
+    }
+  }
+  //TODO: Not used currently because of DRILL-6826
+  public RuntimeFilterWritable newRuntimeFilterWritable(BufferAllocator bufferAllocator) {
+    int bufNum = data.length;
+    DrillBuf [] newBufs = new DrillBuf[bufNum];
+    int i = 0;
+    for (DrillBuf buf : data) {
+      DrillBuf transferredBuffer = buf.transferOwnership(bufferAllocator).buffer;
+      newBufs[i] = transferredBuffer;
+      i++;
+    }
+    return new RuntimeFilterWritable(this.runtimeFilterBDef, newBufs);
+  }
+
   public String toString() {
     return identifier;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 42b76f2..a379db1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -151,7 +151,7 @@ public class Foreman implements Runnable {
     this.fragmentsRunner = new FragmentsRunner(bee, initiatingClient, drillbitContext, this);
     this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult());
     this.profileOption = setProfileOption(queryContext.getOptions());
-    this.enableRuntimeFilter = drillbitContext.getOptionManager().getBoolean(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY);
+    this.enableRuntimeFilter = queryContext.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val;
   }
 
 
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 8aa3233..a2d3cdc 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -469,6 +469,8 @@ drill.exec.options: {
     exec.hashjoin.enable.runtime_filter: false,
     exec.hashjoin.bloom_filter.fpp: 0.75,
     exec.hashjoin.bloom_filter.max.size: 33554432, #32 MB
+    exec.hashjoin.runtime_filter.waiting.enable: true,
+    exec.hashjoin.runtime_filter.max.waiting.time: 300, #400 ms
     exec.hashagg.mem_limit: 0,
     exec.hashagg.min_batches_per_partition: 2,
     exec.hashagg.num_partitions: 32,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
index 5eae12e..a5fc5ba 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
@@ -40,9 +40,9 @@ public class TestHashJoinJPPD extends PhysicalOpUnitTestBase {
   public void testBroadcastHashJoin1Cond() {
     List<BloomFilterDef> bloomFilterDefs = new ArrayList<>();
     int numBytes = BloomFilter.optimalNumOfBytes(2600, 0.01);
-    BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft");
+    BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft", "rgt");
     bloomFilterDefs.add(bloomFilterDef);
-    RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false );
+    RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false, -1);
     HashJoinPOP joinConf = new HashJoinPOP(null, null,
       Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.INNER, runtimeFilterDef);
     operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 4);
@@ -71,11 +71,11 @@ public class TestHashJoinJPPD extends PhysicalOpUnitTestBase {
   public void testBroadcastHashJoin2Cond() {
     List<BloomFilterDef> bloomFilterDefs = new ArrayList<>();
     int numBytes = BloomFilter.optimalNumOfBytes(2600, 0.01);
-    BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft");
-    BloomFilterDef bloomFilterDef1 = new BloomFilterDef(numBytes, true, "a");
+    BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft", "rgt");
+    BloomFilterDef bloomFilterDef1 = new BloomFilterDef(numBytes, true, "a", "b");
     bloomFilterDefs.add(bloomFilterDef);
     bloomFilterDefs.add(bloomFilterDef1);
-    RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false );
+    RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false, -1);
     HashJoinPOP joinConf = new HashJoinPOP(null, null,
       Lists.newArrayList(joinCond("lft", "EQUALS", "rgt"), joinCond("a", "EQUALS", "b")), JoinRelType.INNER, runtimeFilterDef);
     operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 4);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java
index 2370ffa..ac174d1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java
@@ -34,38 +34,6 @@ public class TestHashJoinJPPDPlan extends JoinTestBase {
   }
 
   @Test
-  public void testInnnerHashJoin() throws Exception {
-    String sql = "SELECT nations.N_NAME, count(*)"
-      + "FROM\n"
-      + " dfs.`sample-data/nation.parquet` nations\n"
-      + "JOIN\n"
-      + "  dfs.`sample-data/region.parquet` regions\n"
-      + "  on nations.N_REGIONKEY = regions.R_REGIONKEY "
-      + "WHERE nations.N_NAME = 'A' "
-      + "group by nations.N_NAME";
-    String expectedColNames1 =  "\"runtimeFilterDef\"";
-    String expectedColNames2 =  "\"bloomFilterDefs\"";
-    String expectedColNames3 =  "\"runtime-filter\"";
-    testPhysicalPlan(sql, expectedColNames1, expectedColNames2, expectedColNames3);
-  }
-
-  @Test
-  public void testRightHashJoin() throws Exception {
-    String sql = "SELECT nations.N_NAME, count(*)"
-      + "FROM\n"
-      + " dfs.`sample-data/nation.parquet` nations\n"
-      + "RIGHT JOIN\n"
-      + "  dfs.`sample-data/region.parquet` regions\n"
-      + "  on nations.N_REGIONKEY = regions.R_REGIONKEY "
-      + "WHERE nations.N_NAME = 'A' "
-      + "group by nations.N_NAME";
-    String expectedColNames1 =  "\"runtimeFilterDef\"";
-    String expectedColNames2 =  "\"bloomFilterDefs\"";
-    String expectedColNames3 =  "\"runtime-filter\"";
-    testPhysicalPlan(sql, expectedColNames1, expectedColNames2, expectedColNames3);
-  }
-
-  @Test
   public void testLeftHashJoin() throws Exception {
     String sql = "SELECT nations.N_NAME, count(*)"
       + "FROM\n"
@@ -95,24 +63,4 @@ public class TestHashJoinJPPDPlan extends JoinTestBase {
     String excludedColNames3 =  "\"runtime-filter\"";
     testPlanWithAttributesMatchingPatterns(sql, null, new String[]{excludedColNames1, excludedColNames2, excludedColNames3});
   }
-
-  @Test
-  public void testInnnerHashJoinWithRightDeepTree() throws Exception {
-    String sql = "SELECT nations.N_NAME, count(*)"
-      + "FROM\n"
-      + " cp.`tpch/nation.parquet` nations\n"
-      + "JOIN\n"
-      + "  cp.`tpch/region.parquet` regions\n"
-      + "  on nations.N_REGIONKEY = regions.R_REGIONKEY "
-      + "JOIN cp.`tpch/customer.parquet` customers\n"
-      + " on nations.N_NATIONKEY = customers.C_NATIONKEY "
-      + "WHERE nations.N_NAME = 'A' "
-      + "group by nations.N_NAME";
-    String expectedColNames1 =  "\"runtimeFilterDef\"";
-    String expectedColNames2 =  "\"bloomFilterDefs\"";
-    String expectedColNames3 =  "\"runtime-filter\"";
-    testPhysicalPlan(sql, expectedColNames1, expectedColNames2, expectedColNames3);
-  }
-
-
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
index c05cdfd..c1d1576 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
@@ -135,7 +135,6 @@ public class BloomFilterTest {
 
   @Test
   public void testNotExist() throws Exception {
-
     Drillbit bit = new Drillbit(c, RemoteServiceSet.getLocalServiceSet(), ClassPathScanner.fromPrescan(c));
     bit.run();
     DrillbitContext bitContext = bit.getContext();
@@ -192,6 +191,12 @@ public class BloomFilterTest {
     long hashCode = probeHash64.hash64Code(0, 0, 0);
     boolean contain = bloomFilter.find(hashCode);
     Assert.assertFalse(contain);
+    bloomFilter.getContent().close();
+    vectorContainer.clear();
+    probeVectorContainer.clear();
+    context.close();
+    bitContext.close();
+    bit.close();
   }
 
 
@@ -254,6 +259,12 @@ public class BloomFilterTest {
     long hashCode = probeHash64.hash64Code(0, 0, 0);
     boolean contain = bloomFilter.find(hashCode);
     Assert.assertTrue(contain);
+    bloomFilter.getContent().close();
+    vectorContainer.clear();
+    probeVectorContainer.clear();
+    context.close();
+    bitContext.close();
+    bit.close();
   }
 
 
@@ -324,5 +335,11 @@ public class BloomFilterTest {
     long hashCode = probeHash64.hash64Code(0, 0, 0);
     boolean contain = bloomFilter.find(hashCode);
     Assert.assertTrue(contain);
+    bloomFilter.getContent().close();
+    vectorContainer.clear();
+    probeVectorContainer.clear();
+    context.close();
+    bitContext.close();
+    bit.close();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index a1e7d0d..b41798d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.test;
 
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -26,10 +25,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Function;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import io.netty.buffer.DrillBuf;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
@@ -76,12 +71,7 @@ import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.hadoop.security.UserGroupInformation;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Test fixture for operator and (especially) "sub-operator" tests.
@@ -182,7 +172,6 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
 
     private ExecutorState executorState = new OperatorFixture.MockExecutorState();
     private ExecutionControls controls;
-    private RuntimeFilterSink runtimeFilterSink;
 
     public MockFragmentContext(final DrillConfig config,
                                final OptionManager options,
@@ -198,7 +187,6 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
       this.controls = new ExecutionControls(options);
       compiler = new CodeCompiler(config, options);
       bufferManager = new BufferManagerImpl(allocator);
-      this.runtimeFilterSink = new RuntimeFilterSink(allocator, Executors.newCachedThreadPool());
     }
 
     private static FunctionImplementationRegistry newFunctionRegistry(
@@ -319,13 +307,18 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
     }
 
     @Override
-    public RuntimeFilterSink getRuntimeFilterSink() {
-      return runtimeFilterSink;
+    public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
     }
 
     @Override
-    public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-      runtimeFilterSink.aggregate(runtimeFilter);
+    public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier) {
+      return null;
+    }
+
+    @Override
+    public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit)
+    {
+      return null;
     }
 
     @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index 84a7c78..b0820e9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -38,7 +38,6 @@ import org.apache.drill.exec.server.QueryProfileStoreContext;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.easy.json.JSONRecordReader;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -74,6 +73,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 public class PhysicalOpUnitTestBase extends ExecTest {
   protected MockExecutorFragmentContext fragContext;
@@ -198,12 +198,10 @@ public class PhysicalOpUnitTestBase extends ExecTest {
    * </p>
    */
   protected static class MockExecutorFragmentContext extends OperatorFixture.MockFragmentContext implements ExecutorFragmentContext {
-    private RuntimeFilterSink runtimeFilterSink;
 
     public MockExecutorFragmentContext(final FragmentContext fragmentContext) {
       super(fragmentContext.getConfig(), fragmentContext.getOptions(), fragmentContext.getAllocator(),
         fragmentContext.getScanExecutor(), fragmentContext.getScanDecodeExecutor());
-      this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator(), Executors.newCachedThreadPool());
     }
 
     @Override
@@ -301,12 +299,17 @@ public class PhysicalOpUnitTestBase extends ExecTest {
 
     @Override
     public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-      this.runtimeFilterSink.aggregate(runtimeFilter);
     }
 
     @Override
-    public RuntimeFilterSink getRuntimeFilterSink() {
-      return runtimeFilterSink;
+    public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier) {
+      return null;
+    }
+
+    @Override
+    public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit)
+    {
+      return null;
     }
   }
 
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
index d7921fc..e43380d 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
@@ -2536,6 +2536,24 @@ public final class BitData {
      * </pre>
      */
     int getHjOpId();
+
+    // optional int64 rf_identifier = 8;
+    /**
+     * <code>optional int64 rf_identifier = 8;</code>
+     *
+     * <pre>
+     * the runtime filter identifier
+     * </pre>
+     */
+    boolean hasRfIdentifier();
+    /**
+     * <code>optional int64 rf_identifier = 8;</code>
+     *
+     * <pre>
+     * the runtime filter identifier
+     * </pre>
+     */
+    long getRfIdentifier();
   }
   /**
    * Protobuf type {@code exec.bit.data.RuntimeFilterBDef}
@@ -2650,6 +2668,11 @@ public final class BitData {
               hjOpId_ = input.readInt32();
               break;
             }
+            case 64: {
+              bitField0_ |= 0x00000020;
+              rfIdentifier_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2867,6 +2890,30 @@ public final class BitData {
       return hjOpId_;
     }
 
+    // optional int64 rf_identifier = 8;
+    public static final int RF_IDENTIFIER_FIELD_NUMBER = 8;
+    private long rfIdentifier_;
+    /**
+     * <code>optional int64 rf_identifier = 8;</code>
+     *
+     * <pre>
+     * the runtime filter identifier
+     * </pre>
+     */
+    public boolean hasRfIdentifier() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional int64 rf_identifier = 8;</code>
+     *
+     * <pre>
+     * the runtime filter identifier
+     * </pre>
+     */
+    public long getRfIdentifier() {
+      return rfIdentifier_;
+    }
+
     private void initFields() {
       queryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
       majorFragmentId_ = 0;
@@ -2875,6 +2922,7 @@ public final class BitData {
       bloomFilterSizeInBytes_ = java.util.Collections.emptyList();
       probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY;
       hjOpId_ = 0;
+      rfIdentifier_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -2909,6 +2957,9 @@ public final class BitData {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeInt32(7, hjOpId_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeInt64(8, rfIdentifier_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -2956,6 +3007,10 @@ public final class BitData {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(7, hjOpId_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(8, rfIdentifier_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3091,6 +3146,8 @@ public final class BitData {
         bitField0_ = (bitField0_ & ~0x00000020);
         hjOpId_ = 0;
         bitField0_ = (bitField0_ & ~0x00000040);
+        rfIdentifier_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000080);
         return this;
       }
 
@@ -3154,6 +3211,10 @@ public final class BitData {
           to_bitField0_ |= 0x00000010;
         }
         result.hjOpId_ = hjOpId_;
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.rfIdentifier_ = rfIdentifier_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -3205,6 +3266,9 @@ public final class BitData {
         if (other.hasHjOpId()) {
           setHjOpId(other.getHjOpId());
         }
+        if (other.hasRfIdentifier()) {
+          setRfIdentifier(other.getRfIdentifier());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -3708,6 +3772,55 @@ public final class BitData {
         return this;
       }
 
+      // optional int64 rf_identifier = 8;
+      private long rfIdentifier_ ;
+      /**
+       * <code>optional int64 rf_identifier = 8;</code>
+       *
+       * <pre>
+       * the runtime filter identifier
+       * </pre>
+       */
+      public boolean hasRfIdentifier() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      /**
+       * <code>optional int64 rf_identifier = 8;</code>
+       *
+       * <pre>
+       * the runtime filter identifier
+       * </pre>
+       */
+      public long getRfIdentifier() {
+        return rfIdentifier_;
+      }
+      /**
+       * <code>optional int64 rf_identifier = 8;</code>
+       *
+       * <pre>
+       * the runtime filter identifier
+       * </pre>
+       */
+      public Builder setRfIdentifier(long value) {
+        bitField0_ |= 0x00000080;
+        rfIdentifier_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 rf_identifier = 8;</code>
+       *
+       * <pre>
+       * the runtime filter identifier
+       * </pre>
+       */
+      public Builder clearRfIdentifier() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        rfIdentifier_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.bit.data.RuntimeFilterBDef)
     }
 
@@ -3761,16 +3874,17 @@ public final class BitData {
       " \003(\005\022!\n\031sending_major_fragment_id\030\004 \001(\005\022" +
       "!\n\031sending_minor_fragment_id\030\005 \001(\005\022(\n\003de" +
       "f\030\006 \001(\0132\033.exec.shared.RecordBatchDef\022\023\n\013" +
-      "isLastBatch\030\007 \001(\010\"\321\001\n\021RuntimeFilterBDef\022" +
+      "isLastBatch\030\007 \001(\010\"\350\001\n\021RuntimeFilterBDef\022" +
       "&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId\022" +
       "\031\n\021major_fragment_id\030\002 \001(\005\022\031\n\021minor_frag" +
       "ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 \001(\010\022\"\n\032blo" +
       "om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f" +
-      "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005*n\n\007RpcType" +
-      "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n",
-      "\020REQ_RECORD_BATCH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n" +
-      "\022REQ_RUNTIME_FILTER\020\005B(\n\033org.apache.dril" +
-      "l.exec.protoB\007BitDataH\001"
+      "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005\022\025\n\rrf_iden" +
+      "tifier\030\010 \001(\003*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007",
+      "\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH" +
+      "\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILT" +
+      "ER\020\005B(\n\033org.apache.drill.exec.protoB\007Bit" +
+      "DataH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3800,7 +3914,7 @@ public final class BitData {
           internal_static_exec_bit_data_RuntimeFilterBDef_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_data_RuntimeFilterBDef_descriptor,
-              new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", });
+              new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", "RfIdentifier", });
           return null;
         }
       };
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
index 3c88ffc..ecf0f18 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
@@ -443,6 +443,8 @@ public final class SchemaBitData
                     output.writeString(6, probeFields, true);
                 if(message.hasHjOpId())
                     output.writeInt32(7, message.getHjOpId(), false);
+                if(message.hasRfIdentifier())
+                    output.writeInt64(8, message.getRfIdentifier(), false);
             }
             public boolean isInitialized(org.apache.drill.exec.proto.BitData.RuntimeFilterBDef message)
             {
@@ -504,6 +506,9 @@ public final class SchemaBitData
                         case 7:
                             builder.setHjOpId(input.readInt32());
                             break;
+                        case 8:
+                            builder.setRfIdentifier(input.readInt64());
+                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -551,6 +556,7 @@ public final class SchemaBitData
                 case 5: return "bloomFilterSizeInBytes";
                 case 6: return "probeFields";
                 case 7: return "hjOpId";
+                case 8: return "rfIdentifier";
                 default: return null;
             }
         }
@@ -569,6 +575,7 @@ public final class SchemaBitData
             fieldMap.put("bloomFilterSizeInBytes", 5);
             fieldMap.put("probeFields", 6);
             fieldMap.put("hjOpId", 7);
+            fieldMap.put("rfIdentifier", 8);
         }
     }
 
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
index 2d1c2a7..3b2c102 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
@@ -56,6 +56,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
     private List<Integer> bloomFilterSizeInBytes;
     private List<String> probeFields;
     private int hjOpId;
+    private long rfIdentifier;
 
     public RuntimeFilterBDef()
     {
@@ -155,6 +156,19 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
         return this;
     }
 
+    // rfIdentifier
+
+    public long getRfIdentifier()
+    {
+        return rfIdentifier;
+    }
+
+    public RuntimeFilterBDef setRfIdentifier(long rfIdentifier)
+    {
+        this.rfIdentifier = rfIdentifier;
+        return this;
+    }
+
     // java serialization
 
     public void readExternal(ObjectInput in) throws IOException
@@ -235,6 +249,9 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
                 case 7:
                     message.hjOpId = input.readInt32();
                     break;
+                case 8:
+                    message.rfIdentifier = input.readInt64();
+                    break;
                 default:
                     input.handleUnknownField(number, this);
             }   
@@ -277,6 +294,9 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
 
         if(message.hjOpId != 0)
             output.writeInt32(7, message.hjOpId, false);
+
+        if(message.rfIdentifier != 0)
+            output.writeInt64(8, message.rfIdentifier, false);
     }
 
     public String getFieldName(int number)
@@ -290,6 +310,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
             case 5: return "bloomFilterSizeInBytes";
             case 6: return "probeFields";
             case 7: return "hjOpId";
+            case 8: return "rfIdentifier";
             default: return null;
         }
     }
@@ -310,6 +331,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
         __fieldMap.put("bloomFilterSizeInBytes", 5);
         __fieldMap.put("probeFields", 6);
         __fieldMap.put("hjOpId", 7);
+        __fieldMap.put("rfIdentifier", 8);
     }
     
 }
diff --git a/protocol/src/main/protobuf/BitData.proto b/protocol/src/main/protobuf/BitData.proto
index 15c7230..ae9c4c7 100644
--- a/protocol/src/main/protobuf/BitData.proto
+++ b/protocol/src/main/protobuf/BitData.proto
@@ -48,4 +48,5 @@ message RuntimeFilterBDef{
   repeated int32 bloom_filter_size_in_bytes = 5;
   repeated string probe_fields = 6; // probe fields with corresponding BloomFilters
   optional int32 hj_op_id = 7; // the operator id of the HashJoin which generates this RuntimeFilter
+  optional int64 rf_identifier = 8; // the runtime filter identifier
 }


Mime
View raw message