flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Greg Hogan (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-2907) Bloom filter for Join
Date Fri, 23 Oct 2015 16:29:27 GMT
Greg Hogan created FLINK-2907:

             Summary: Bloom filter for Join
                 Key: FLINK-2907
                 URL: https://issues.apache.org/jira/browse/FLINK-2907
             Project: Flink
          Issue Type: New Feature
          Components: Java API, Scala API
    Affects Versions: 1.0
            Reporter: Greg Hogan
            Assignee: Greg Hogan

A bloom filter can be a chainable operation for probe side Join elements. An element not matched
by the bloom filter will not be serialized, shipped, deserialized, and processed.

Generating the bloom filter is a chainable operation over hash side elements. The bloom filter
created on each TaskManager must be the same size to allow combining by xor. The most efficient
means to distribute the bloom filter is to assign each TaskManager an equal partition that
it will receive from all other TaskManagers. This will be broadcast once all local elements
(by hashing) and remote partitions (by xor) have been processed into that part of the bloom

An example with numbers: triangle listing/counting joining 2B edges on 149B two-paths resulting
in 21B triangles (this is using the optimal algorithm). At 8 bits per element the bloom filter
will have a false-positive rate of ~2% and require a 2 GB bloom filter (stored once and shared
per TaskManager). Each TaskManager both sends and receives data equivalent to the size of
the bloom filter (minus the local partition, the size of which trends towards zero as the
number of TaskManagers increases). The number of matched elements is 21B (true positive) +
~0.02*(149B-21B) = 23.5B, a reduction of 84% or 1.5 TB (at 12 bytes per element). With 4 TaskManagers
only 12 GB of bloom filter would be transmitted, a savings of 99.2%.

Key issues are determining the size of the bloom filter (dependent on the count of hash side
elements, the available memory segments, and the error rate) and whether this can be integrated
with Join or must be a separate operator. This also depends on dynamic memory allocation as
spilling to disk would perform the serialization, write, read, and deserialization we are
looking to avoid.

This message was sent by Atlassian JIRA

View raw message