flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StephanEwen <...@git.apache.org>
Subject [GitHub] flink pull request: Out-of-core state backend for JDBC databases
Date Tue, 03 Nov 2015 08:44:57 GMT
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153289042
  
    Cool stuff, really! This is very much in line with what I had in mind for a SQL backend.
    
    Let me check if I understood everything correct (and see where my understanding is wrong),
because I think we should be able to make an "exactly once" version of this based that mechanism.
I am basically rephrasing what you describe in a different model.
    
    ### Basic Mode
    
    What this is effectively doing is a batched and asynchronous version of distributed 2-phase
commit transactions. The phases look basically like this:
    
      - **Adding data**: Pipe all modifications into the database, but not commit the transaction.
They are tagged with the timestamp of the upcoming checkpoint (or any coordinated increasing
version counter). This can happen in the background thread, for as long as the in-operator
cache holds all edits that are not in the database yet.
    
      - **Pre-commit**: This is when the checkpoint is triggered. All pending edits are written
into the database and then the transaction is committed. The state handle only includes the
timestamp used on the elements. In the classical 2-phase transactions, after a task acks the
pre-commit, it has to be able to recover to that state, which is given here. The checkpoint
is not immediately valid for recovery though, which means that recovery has to have either
a filter, or issue a query that deletes all records with timestamps larger than the version
given during recovery. After the pre-commit, the timestamp is locally incremented and work
can continue.
    
      - **Full commit**: This happens implicitly when the checkpoint coordinator marks the
checkpoint as complete.
    
      - **Recovery**: The timestamp (or version counter) of the last successful checkpoint
is restored, the deletion of records that were committed (but where the checkpoint did not
succeed as a whole) happens, then records are lazily fetched. 
    
    So far, this should give exactly once guarantees, or am I overlooking something?
    
    ### Compacting
    
    Whenever the "checkpoint complete" notification comes (or every so many changes) you trigger
a clean-up query in the background. Given that the SQL database has a not completely terrible
query planner, this SQL statement would be okay efficient (single semi join).
    ```
    DELETE FROM "table name" t1
    WHERE EXISTS 
      (SELECT *
         FROM "table name" t2
        WHERE t2.handle_id = t1.handle_id
          AND t2.timestamp > t1.timestamp    //-- a newer version exists for the same handle
          AND t2.timestamp <= GLOBAL_VERSION //-- and the newer version is globally committed
      )
    ```
    The good thing is that by virtue of having the incrementing global versions, we can set
the isolation level for the query to "read uncommitted", which means that it will not lock
anything and thus not compete with any other ongoing modifications.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message