kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Erwin Joost BOLWIDT <erwinbolw...@dbs.com>
Subject Bug report: Topologies affecting each other through store: Partition XXX is not assigned to any tasks: {}
Date Tue, 17 Apr 2018 02:10:03 GMT
Hi,

It took me a long time to figure the problem out, and I think it may be a bug in kafka-streams,
or at least something that could have a better log warning message.

I’m using the high-level API. I get errors of the form “Partition XXX is not assigned
to any tasks: {}”, even though I was using that topic. In my original code, I was using
KTables, but I managed to reproduce the problem in the code below with simple KStream processing.

I have two seemingly independent subtopologies. The only shared element is that they share
a Store. (In my original code, one topology is reading a topic with update messages that get
applied to the store, while the other topology uses the data in the store to classify input
data)

Before I added the second subtopology, the first subtopology worked fine. After adding the
second subtopology, I started getting the dreaded “Partition XXX is not assigned to any
tasks: {}” for the topic of the first subtopology.
As the second subtopology was a new feature (I had already populated the store in another
way), I had not yet created the input topic for the second subtopology on my integration environment.
It turns out that this is the reason that the first subtopology fails. Because they share
the same store, somehow the subtopologies interact.

Below is a snippet from my log file, running the bug reporting code that I posted below it.
The reproduce the problem using the bug reporting code, ensure that the topics “number-in”
and “number-out” exist, but “unknown-in” doesn’t exist.

Once the problem is clear, it is very easy to fix it: create the “unknown-in” topic, and
the code runs fine. But the reported warnings make no reference to the real underlying problem.

I’ve tested the below code with kafka-streams version 1.0.1 on JDK8 1.8.0_132.

Best Regards.
Erwin Bolwidt

09:56:46.245 WARN  o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [FooBar-1eab35bd-5198-4d24-8ce4-9f1ac9687764-StreamThread-1-consumer]
No partitions found for topic unknown-in
09:56:46.246 WARN  o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [FooBar-1eab35bd-5198-4d24-8ce4-9f1ac9687764-StreamThread-1-consumer]
Partition number-in-0 is not assigned to any tasks: {}
09:56:46.247 INFO  o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [FooBar-1eab35bd-5198-4d24-8ce4-9f1ac9687764-StreamThread-1-consumer]
Assigned tasks to clients as {1eab35bd-5198-4d24-8ce4-9f1ac9687764=[activeTasks: ([]) standbyTasks:
([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([0_0, 1_0]) capacity: 1]}.
09:56:46.248 WARN  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=FooBar-1eab35bd-5198-4d24-8ce4-9f1ac9687764-StreamThread-1-consumer,
groupId=FooBar] The following subscribed topics are not assigned to any members: [number-in]


Code that reproduces the problem:

package bugreport;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

public class BugReportTwoTopologiesInteractionThroughStore {
    private static final String STORE_NAME = "store";

    static class TransformerAdapter<K, V, R> implements Transformer<K, V, R> {
        @Override
        public void init(ProcessorContext context) {
        }

        @Override
        public void close() {
        }

        @Override
        public R punctuate(long timestamp) {
            return null;
        }

        @Override
        public R transform(K key, V value) {
            return null;
        }
    }

    public static Map<String, Object> kafkaStreamsConfiguration() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.91.132.98:9092");
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "FooBar");
        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return properties;
    }

    public static void constructTopology(StreamsBuilder builder, String topicIn, String topicOut,
int base) {
        KStream<Long, Long> stream = builder.stream(topicIn, Consumed.with(Serdes.Long(),
Serdes.Long()));
        stream = stream.transform(new TransformerSupplier<Long, Long, KeyValue<Long,
Long>>() {
            @Override
            public Transformer<Long, Long, KeyValue<Long, Long>> get() {
                return new TransformerAdapter<Long, Long, KeyValue<Long, Long>>()
{
                    @Override
                    public KeyValue<Long, Long> transform(Long key, Long value) {
                        return new KeyValue<Long, Long>(key, base - value);
                    }
                };
            }
        }, STORE_NAME);
        stream.to(topicOut, Produced.with(Serdes.Long(), Serdes.Long()));
    }

    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();
        KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(STORE_NAME);
        StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier,
                Serdes.String(), Serdes.String());
        builder.addStateStore(storeBuilder);
        constructTopology(builder, "number-in", "number-out", 10);
        constructTopology(builder, "unknown-in", "unknown-out", 20);

        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, new StreamsConfig(kafkaStreamsConfiguration()));
        streams.start();
    }
}

CONFIDENTIAL NOTE:
The information contained in this email is intended only for the use of the individual or
entity named above and may contain information that is privileged, confidential and exempt
from disclosure under applicable law. If the reader of this message is not the intended recipient,
you are hereby notified that any dissemination, distribution or copying of this communication
is strictly prohibited. If you have received this message in error, please immediately notify
the sender and delete the mail. Thank you.
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message