kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Noll <mich...@confluent.io>
Subject Re: enhancing KStream DSL
Date Fri, 09 Sep 2016 12:23:52 GMT
Ara,

you have shared this code snippet:

>    allRecords.branch(
>            (imsi, callRecord) -> "VOICE".equalsIgnoreCase(
callRecord.getCallCommType()),
>            (imsi, callRecord) -> "DATA".equalsIgnoreCase(
callRecord.getCallCommType()),
>            (imsi, callRecord) -> true
>    );

The branch() operation partitions the allRecords KStream into three
disjoint streams.

I'd suggest the following.

First, update the third predicate in your `branch()` step to be "everything
but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA
records are removed:


    KStream<String, CallRecord>[] branches = allRecords
        .branch(
            (imsi, callRecord) -> "VOICE".equalsIgnoreCase(
callRecord.getCallCommType()),
            (imsi, callRecord) -> "DATA".equalsIgnoreCase(
callRecord.getCallCommType()),
            (imsi, callRecord) -> !(callRecord.getCallCommType().
equalsIgnoreCase("VOICE") || callRecord.getCallCommType().
equalsIgnoreCase("DATA"))
        );

This would give you:

    KStream<String, CallRecord> voiceRecords = branches[0];
    KStream<String, CallRecord> dataRecords = branches[1];
    KStream<String, CallRecord> recordsThatAreNeitherVoiceNorData =
branches[2];

Then, to count "everything" (VOICE + DATA + everything else), simply reuse
the original `allRecords` stream.

-Michael





On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi <ara.ebrahimi@argyledata.com>
wrote:

> Let’s say I have this:
>
>
> KStream<String, CallRecord>[] branches = allRecords
>     .branch(
>             (imsi, callRecord) -> "VOICE".equalsIgnoreCase(
> callRecord.getCallCommType()),
>             (imsi, callRecord) -> "DATA".equalsIgnoreCase(
> callRecord.getCallCommType()),
>             (imsi, callRecord) -> true
>     );
> KStream<String, CallRecord> callRecords = branches[0];
> KStream<String, CallRecord> dataRecords = branches[1];
> KStream<String, CallRecord> callRecordCounter = branches[2];
>
> callRecordCounter
>         .map((imsi, callRecord) -> new KeyValue<>("", ""))
>         .countByKey(
>                 UnlimitedWindows.of("counter-window"),
>                 stringSerde
>         )
>         .print();
>
> Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1 if
> data is DATA. Branch 2 is supposed to get triggered regardless of type all
> the type so that then I can count stuff for a time window. BUT the problem
> is branch is implemented like this:
>
> private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
>     @Override
>     public void process(K key, V value) {
>         for (int i = 0; i < predicates.length; i++) {
>             if (predicates[i].test(key, value)) {
>                 // use forward with childIndex here and then break the loop
>                 // so that no record is going to be piped to multiple
> streams
>                 context().forward(key, value, i);
>                 break;
>             }
>         }
>     }
> }
>
> Note the break. So the counter branch is never reached. I’d like to change
> the behavior of branch so that all predicates are checked and no break
> happens, in say a branchAll() method. What’s the easiest way to this
> functionality to the DSL? I tried process() but it doesn’t return KStream.
>
> Ara.
>
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> ________________________________
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message