samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yan Fang" <yanfang...@gmail.com>
Subject Re: Review Request 32155: SAMZA-458: Close in KafkaSystemProducer should flush all source buffers
Date Wed, 18 Mar 2015 23:44:41 GMT


> On March 18, 2015, 9:01 p.m., Navina Ramesh wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala,
line 49
> > <https://reviews.apache.org/r/32155/diff/1/?file=897641#file897641line49>
> >
> >     Can you explain the purpose of "failedSources"?

It is used to record failed sources. Because maybe there are more than one source failed,
we want to know all of them and only stop the producer after flushing all the sources. Does
that make sense?


> On March 18, 2015, 9:01 p.m., Navina Ramesh wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala,
line 148
> > <https://reviews.apache.org/r/32155/diff/1/?file=897641#file897641line148>
> >
> >     Why can't you do the failedSources empty check here instead of Line 153 ?

This is because, if there are more than one sources, such as source1, source2. If source1
fails and source2 succeed, and we flush each of them. We do not close the producer after flushing
source1, instead, we close the producer after flushing source2. When flushing source2, the
sendFailed.get() is not run. That's why the stop is outside of the conditions.


- Yan


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32155/#review76944
-----------------------------------------------------------


On March 17, 2015, 9:37 a.m., Yan Fang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32155/
> -----------------------------------------------------------
> 
> (Updated March 17, 2015, 9:37 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-458
>     https://issues.apache.org/jira/browse/SAMZA-458
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> add flush in close method
> only throw exceptions after flushing all sources
> add unit test
> 
> 
> Diffs
> -----
> 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
83668dd 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
ca10ea5 
> 
> Diff: https://reviews.apache.org/r/32155/diff/
> 
> 
> Testing
> -------
> 
> ran unit tests and integration test
> 
> 
> Thanks,
> 
> Yan Fang
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message