kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Davide Icardi <davide.ica...@gmail.com>
Subject Event Sourcing with Kafka Streams and processing order of a re-entrant pipeline
Date Sun, 31 Jan 2021 10:37:24 GMT
I'm working on a project where I want to use Kafka Streams for Event
Sourcing.

General idea is that I have a "commands" topic/KStream, an "events"
topic/KStream and a "snapshots" topic/KTable.
Snapshots contains the current state of the entities. Commands are
validated using the "snapshots" and transformed to "events".

    Group EVENTS stream by key and aggregate them to a SNAPSHOTS table.
    Left join COMMANDS stream with the SNAPSHOTS table and output new
EVENTS.

For example, to apply this pattern to a simple bank-account scenario I can
have:
- operations stream as "commands" (requests to deposit or withdraw an
amount of money, eg. "deposit $10" => Operation(+10) )
- movements stream as "events" (actual deposit or withdraw event, eg. "$10
deposited" => Movement(+10) )
- account table as a "snapshots" (account balance, eg. "$20 in account
balance" => Account(20) )
- account id is used as key for all topics and tables

The topology can be written like:

    case class Operation(amount: Int)
    case class Movement(amount: Int, error: String = "")
    case class Account(balance: Int)

    // events
    val movementsStream = streamBuilder.stream[String,
Movement](Config.topicMovements)
    // snapshots
    val accountTable = movementsStream
      .groupByKey
      .aggregate(Account(0)){ (_, movement, account) =>
        account.copy(balance = account.balance + movement.amount)
      }
    accountTable.toStream.to(Config.topicAccounts)
    // commands
    val operationsStream = streamBuilder.stream[String,
Operation](Config.topicOperations)
    operationsStream
      .leftJoin(accountTable) { (operation, accountOrNull) =>
        val account = Option(accountOrNull).getOrElse(Account(0))
        if (account.balance >= -operation.amount) {
          Movement(operation.amount)
        } else {
          Movement(0, error = "insufficient funds")
        }
      }
      .to(Config.topicMovements)

(see full code here:
https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/main/scala/bank/StreamingPipeline.scala
)

Now let's imagine a scenario where I deposit $ 10, then I deposit again $
10 and then I withdraw $ 20:
        inOperations.pipeInput("alice", Operation(10))
        inOperations.pipeInput("alice", Operation(10))
        inOperations.pipeInput("alice", Operation(-20))

Can I assume that when processing the third message (withdraw $ 20) the
account table already processed the previous two movements (10+10)?
In other words, can I assume that:
- if operations are valid I never receive an "insufficent funds" error
event ?
- in the above topology, account KTable is always updated before processing
the next operation from KStream ?

>From my tests it seems to work, but I would like to have some advice if
this is a safe assumption.
(see test here:
https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/test/scala/bank/StreamingPipelineSpec.scala
)

thanks
Davide Icardi

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message