samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
Date Wed, 20 Feb 2019 01:03:47 GMT
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor
for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r258295838
 
 

 ##########
 File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##########
 @@ -60,16 +74,22 @@
   private final Config config;
   private final boolean fetchThresholdBytesEnabled;
   private final KafkaSystemConsumerMetrics metrics;
+  private final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler;
 
   // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap.
   final KafkaConsumerMessageSink messageSink;
 
   // This proxy contains a separate thread, which reads kafka messages (with consumer.poll())
and populates
   // BlockingEnvelopMap's buffers.
-  final private KafkaConsumerProxy proxy;
+  @VisibleForTesting
+  KafkaConsumerProxy proxy;
 
   // keep registration data until the start - mapping between registered SSPs and topicPartitions,
and their offsets
   final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
 
 Review comment:
   2A. I agree that it is odd for specific `SystemConsumer` implementations to each compare
offsets. It seems outside of the scope of this PR though. Can we move that comparator logic
refactoring to a separate PR so that we can cleanly separate the changes? That PR could be
committed before this PR.
   2B. I looked at some other code which uses `register` (e.g. `ContainerStorageManager`,
`CoordinatorStreamStore`, and `CoordinatorStreamSystemConsumer`). Those don't really seem
to do any comparisons to make sure they are registering the lowest offset. How can you tell
all of the upper-layers already pass the lowest offset? I just want to double check that there
wasn't a bug in an upper-layer which happened to be prevented by having offset comparator
logic directly in `SystemConsumer` impls.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message