kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ara Ebrahimi <ara.ebrah...@argyledata.com>
Subject enhancing KStream DSL
Date Thu, 08 Sep 2016 20:20:48 GMT
Let’s say I have this:

KStream<String, CallRecord>[] branches = allRecords
            (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callRecord.getCallCommType()),
            (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRecord.getCallCommType()),
            (imsi, callRecord) -> true
KStream<String, CallRecord> callRecords = branches[0];
KStream<String, CallRecord> dataRecords = branches[1];
KStream<String, CallRecord> callRecordCounter = branches[2];

        .map((imsi, callRecord) -> new KeyValue<>("", ""))

Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1 if data is DATA. Branch
2 is supposed to get triggered regardless of type all the type so that then I can count stuff
for a time window. BUT the problem is branch is implemented like this:

private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
    public void process(K key, V value) {
        for (int i = 0; i < predicates.length; i++) {
            if (predicates[i].test(key, value)) {
                // use forward with childIndex here and then break the loop
                // so that no record is going to be piped to multiple streams
                context().forward(key, value, i);

Note the break. So the counter branch is never reached. I’d like to change the behavior
of branch so that all predicates are checked and no break happens, in say a branchAll() method.
What’s the easiest way to this functionality to the DSL? I tried process() but it doesn’t
return KStream.



This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message