samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Navina Ramesh <nram...@linkedin.com>
Subject Re: Review Request 44405: SAMZA-882 - Detect partition count changes in input streams
Date Tue, 15 Mar 2016 18:49:06 GMT


> On March 14, 2016, 11:46 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala,
line 78
> > <https://reviews.apache.org/r/44405/diff/1/?file=1281324#file1281324line78>
> >
> >     Won't this be redundant w/ the declaration here?
> >     {code}
> >     35  private var thread: Thread = getMonitorThread()
> >     {code}
> >     
> >     This will create a completely new thread object here.

Oh yes. This is redundant during initialization. I can fix that. 

I want to make sure that we start and stop the thread each time the JobCoordinator starts
and stop. If I start a Thread object that was previously stopped, I will get an IllegalMonitorException.
So, the only good way to start and stop the thread is to create new thread object. Why do
you think this will be an issue? It will still be the same instance of the StreamPartitionCountMonitor.


> On March 14, 2016, 11:46 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala, line
101
> > <https://reviews.apache.org/r/44405/diff/1/?file=1281325#file1281325line101>
> >
> >     I am a bit confused here. I thought that the final goal is: replace the cache
entry w/ updatedMetadata? If partitionsMetadataOnly == true, what's the purpose of this section
of code within if?

Yes. I am doing the replacement. The issue with the "partitionsMetadataOnly" call is that
it doesn't have offset info in SystemStreamPartitionMetadata it creates. So, if I overwrite
the cache blindly, it will remove the offsets of the partitions which might have been fetched
in the past. So, I am just making sure that if the offsets are also written to the cache based
on what was known before. Line 101 actually overwrite the cache entry.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/44405/#review123527
-----------------------------------------------------------


On March 4, 2016, 8:14 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/44405/
> -----------------------------------------------------------
> 
> (Updated March 4, 2016, 8:14 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Jake Maes, Jagadish Venkatraman, Xinyu Liu,
and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-882
>     https://issues.apache.org/jira/browse/SAMZA-882
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Adding loop.done in stream metadata cache
> 
> 
> Adding comments in Test
> 
> 
> Adding configuration to the docs
> 
> 
> Fixing the swallowed exception from Scala immutable map
> 
> 
> Updating config docs
> 
> 
> Fixed some comments and javadoc
> 
> 
> Diffs
> -----
> 
>   docs/learn/documentation/versioned/jobs/configuration-table.html 2745a22daf3626db56da2bedad07690751a34a27

>   samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java PRE-CREATION

>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 4f3e9a297ce2c0df0f5f25e0aad62f7bed774cd6

>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 06a96ad6ed786c22924017f894413bfa1ea34c06

>   samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala 155c3d16d33d9bb9cd5410d786004c1bf2a57ed3

>   samza-core/src/main/scala/org/apache/samza/util/Util.scala bd0fe5fc8128c59fa6d08941ad88eed66dda622b

>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 9ab1dd516871b1755ef64fa25cea47491ad781e2

>   samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
PRE-CREATION 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 9dc436a40afd7190626a8be0d716c70e0fe83c7a

>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java e2b45d7577dea5a4a71af22521c93a7fd75eaefc

>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
269d82479650b3bc2890d250da0391d34104b1eb 
>   samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java 88d9f24d16fc3d9842b387cfc22edaf1dfa6fd06

> 
> Diff: https://reviews.apache.org/r/44405/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> Tested with a simple Samza job using hello-samza -> Verified that the metrics gauge
is getting updated and published correctly.
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message