flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Greg Hogan (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (FLINK-2907) Bloom filter for Join
Date Fri, 14 Oct 2016 13:01:21 GMT

     [ https://issues.apache.org/jira/browse/FLINK-2907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Greg Hogan resolved FLINK-2907.
    Resolution: Later

Closing this issue with current thoughts:
- this would only be applicable for batch since the input must be fully materialized in order
to generate and distribute the bloom filter
- without slot affinity the bloom filters may not be readily shareable on a TaskManager
- even though the bloom filter is a compressed representation of the DataSet, for a large
quantity of data the partitioned DataSet may fit in memory whereas the bloom filter may not
- without a priori knowledge of the size of the DataSet either a large number of memory segments
must be reserved or multiple bloom filters created with increasing size
- the tradeoff with a bloom filter is additional lookups in memory but less shuffling of data;
it's not clear that these lookups (which for even a most data set will not be in CPU cache)
are more efficient than (de)serializing elements to/from network buffers (multiple bloom filters
exacerbates this problem)

I still think this would be very interesting to research but there are many other performance
improvements I would like to prioritize.

> Bloom filter for Join
> ---------------------
>                 Key: FLINK-2907
>                 URL: https://issues.apache.org/jira/browse/FLINK-2907
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataSet API
>    Affects Versions: 1.0.0
>            Reporter: Greg Hogan
>            Assignee: Greg Hogan
>              Labels: requires-design-doc
> A bloom filter can be a chainable operation for probe side Join elements. An element
not matched by the bloom filter will not be serialized, shipped, deserialized, and processed.
> Generating the bloom filter is a chainable operation over hash side elements. The bloom
filter created on each TaskManager must be the same size to allow combining by xor. The most
efficient means to distribute the bloom filter is to assign each TaskManager an equal partition
that it will receive from all other TaskManagers. This will be broadcast once all local elements
(by hashing) and remote partitions (by xor) have been processed into that part of the bloom
> An example with numbers: triangle listing/counting joining 2B edges on 149B two-paths
resulting in 21B triangles (this is using the optimal algorithm). At 8 bits per element the
bloom filter will have a false-positive rate of ~2% and require a 2 GB bloom filter (stored
once and shared per TaskManager). Each TaskManager both sends and receives data equivalent
to the size of the bloom filter (minus the local partition, the size of which trends towards
zero as the number of TaskManagers increases). The number of matched elements is 21B (true
positive) + ~0.02*(149B-21B) = 23.5B, a reduction of 84% or 1.5 TB (at 12 bytes per element).
With 4 TaskManagers only 12 GB of bloom filter would be transmitted, a savings of 99.2%.
> Key issues are determining the size of the bloom filter (dependent on the count of hash
side elements, the available memory segments, and the error rate) and whether this can be
integrated with Join or must be a separate operator. This also depends on dynamic memory allocation
as spilling to disk would perform the serialization, write, read, and deserialization we are
looking to avoid.

This message was sent by Atlassian JIRA

View raw message