samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chris Riccomini" <criccom...@apache.org>
Subject Re: Review Request 13725: SAMZA-2
Date Wed, 02 Oct 2013 21:16:28 GMT


> On Oct. 2, 2013, 6:01 p.m., Sriram Subramanian wrote:
> > streams.md
> > 
> > 1.1 Need to remove this line. "Samza also provides a helper class called PriorityChooser
which lets you choose which message to process next based on a priority score."
> > 
> > MessageChooser.java
> > 
> > 1.1 comment fix - "lived systems"?
> > 1.2 comment fix - "checkpointd"
> > 
> > WrappedChooserConfig.java
> > 
> > 1.1 I think these names should not have anything to do with the chooser.
> >     "task.chooser.bootstrap.%s.%s", "task.chooser.priorities.%s.%s", "task.chooser.batch.size"
> > 
> > 1.2 I really think Wrapped is not a great name here. The name should give an idea
what the chooser does.
> > 
> > BatchingChooser.java
> > 
> > 1.1 comment fix - "patching"
> > 1.2 if you decide to keep the main choosers name as Wrapped, the context of wrapped
chooser in the comments is confusing
> > 
> > BootstrappingChooser.java
> > 
> > 1.1 comment fix - "at least on envelope for every"
> > 1.2 Why do we need to wait for every stream in latestMessageOffsets to get atleast
one event? We just need to wait till atleast one stream in latestMessageOffsets get an event.
No?
> > 1.3 Same comment here about the naming of wrapped. Confusing name with the main
class that orchestrates things.
> > 
> > WrappedChooser.java
> > 
> > 1.1 We currently override the priority of bootstrap streams with the explicit priority.
This is actually confusing. I think we should just fail with config error when user overrides
the bootstrap streams priority. We should not start anything without bootstrapping all the
streams.
> > 
> > KafkaSystemAdminCommand.java
> > 
> > 1.1 Why are you not using getTopicInfo to get the metadata info for the topics?

Regarding WrappedChooser.java:1.1, and BootstrappingChooser.java:1.2, they're actually related.
By default, all bootstrap streams are prioritized the same. When that's the case, then we
don't need to wait for all SystemStreams to have one message (1.2), since it's just as valid
to process any bootstrap stream. 

If, however, a non-random strategy is required when choosing between bootstrap streams, then
you need to provide a message from every SystemStream in order for the wrapped chooser to
make the correct decision. Two examples of a non-random choosing strategy would be to prioritize
streams based on time (keeping the bootstrap streams time aligned), or hard-coding a priority
for the bootstrap streams (e.g. bootstrap stream A = 2, bootstrap stream B = 1).

So, if we take (1.2) as a requirement to provide generic bootstrapping with non-random strategies,
the next question is, does it make sense to allow non-random picking between bootstrap streams?
I think it does. One example of this is bootstrap streams with differing priorities (mentioned
above). By prioritizing bootstrap stream A over bootstrap stream B, you bootstrap all of stream
A before bootstrapping any of stream B. This behavior might be desirable in cases where you're
doing a join from stream B to stream A and creating a materialized view of the join. Rather
than bootstrapping stream A and stream B simultaneously, and then doing the join after both
bootstraps are complete, you could bootstrap only stream A, then as messages from stream B
come in, join to the stream A data, and produce the materialized view on the fly. For example,
you might have a profiles bootstrap stream and a companies bootstrap stream, and you might
want to produce a table with a user's name and the compa
 ny they work for. If you set profiles to priority 2, and companies to priority 1, you can
load profiles into a KV store, then take each company event, join it with the profiles KV
store (to get the user's name), and then write directly to the "name and company" KV store.
If you don't allow prioritized bootstrap streams, then you might get company events before
the profile, and you have to use two KV stores to do the join. It can still be done on the
fly, but it's more cumbersome.

The time-aligned case is similar. If you want to keep your bootstrap streams (roughly) time-aligned
with each other, you can do a much better job of this if you look at the next message from
each bootstrap stream, rather than taking the first one that comes in.

The trade-off of this style is that it's going to introduce more latency during bootstrap
in the common case, since it's valid to process any arbitrary bootstrap stream envelopes if
they're all the same priority.

It's unclear to me whether these two examples are going to be edge cases, or more common styles
of processing. It's also unclear if there are more use cases for non-random bootstrap prioritization.
Given these unknowns, I'd rather be more general, and impact latency a bit, rather than improve
latency, and disabling prioritized bootstrap streams out-right.


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/13725/#review26614
-----------------------------------------------------------


On Oct. 2, 2013, 12:49 a.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/13725/
> -----------------------------------------------------------
> 
> (Updated Oct. 2, 2013, 12:49 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> adding a first pass at KafkaSystemAdmin.getLastOffsets
> 
> 
> use the systemAdmins from samza container in wrapped chooser, rather than doing wiring
internally.
> 
> 
> changing ordering of wrapped chooser from bootstrap -> priority -> batching to
bootstrap -> batching -> priority.
> 
> 
> switch default chooser to WrappedChooser. Always use wrapped chooser. Remove task.chooser.wrapped.class.
> 
> 
> wrote default chooser test that bootstraps, prioritizes, and batches.
> 
> 
> turning on priority tests for default chooser. refactor default chooser a bit to make
it easier to test.
> 
> 
> test default chooser with just round robin.
> 
> 
> bug fix to use round robin chooser factory in default chooser
> 
> 
> change wiring for default chooser to make it more testable. add start/stop/register tests
to all choosers.
> 
> 
> add one more test in priority chooser.
> 
> 
> add unit test for tiered priority chooser
> 
> 
> add unit test for bootstrapping chooser.
> 
> 
> turn default chooser on in samza container. add license to test stateful task. add more
docs to default chooser. clean up batching unit test.
> 
> 
> add unit test for batching chooser.
> 
> 
> only wire in wrapping message choosers when we need them. add docs to the default chooser
factory.
> 
> 
> rename class to BootstrappingChooser.
> 
> 
> refactor to move into org.apache.samza.system.chooser.
> 
> 
> build latest message offset map.
> 
> 
> add a streams-behind-chooser that guarantees one message from each SystemStream before
choose is called.
> 
> 
> add start/stop/register back. all tests pass.
> 
> 
> fix bug -- should allow manual override if Int.MaxInt for bootstrap streams.
> 
> 
> minor bug in default chooser. was re-using same chooser everywhere.
> 
> 
> adding wiring in default chooser.
> 
> 
> initial pass adding composed message choosers.
> 
> 
> rebase to master, which includes SAMZA-25 metrics. fix several tests that were broken
after removing start/stop/register.
> 
> 
> adding more docs for round robin
> 
> 
> remove start/stop/register
> 
> 
> cleanup some wiki markdown in MessageChooser javadoc.
> 
> 
> add more javadocs to the message chooser.
> 
> 
> Merge branch 'SAMZA-2_fine-grain-control-over-stream-consumption' of github.com:criccomini/incubator-samza
into SAMZA-2_fine-grain-control-over-stream-consumption
> 
> 
> added start, stop, and register to message chooser.
> 
> 
> adding docs for message chooser. swiching round robin chooser back to a queue.
> 
> 
> missed license in message chooser factory
> 
> 
> add apache licensing
> 
> 
> samza container was using message chooser, not message chooser factory. fixed.
> 
> 
> add stream chooser test. update stream chooser to invert priority due to bug.
> 
> 
> add round robin test. fix compile error in round robin chooser.
> 
> 
> add priority chooser test. fix bug in priority chooser that was reversing ordering.
> 
> 
> adding stream chooser. adding message chooser factory.
> 
> 
> adding priority chooser. moving default chooser to round robin chooser. adding config
for chooser
> 
> 
> Diffs
> -----
> 
>   docs/learn/documentation/0.7.0/container/streams.md b15c34d3f68e32d56e0da8af91d78e47a5110f67

>   samza-api/src/main/java/org/apache/samza/system/MessageChooser.java 306b2902303c72f3d7a3eb313f55d7e88d21e00d

>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java c902d414484e05ae75c9ca58ad9629cb01120f62

>   samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooser.java PRE-CREATION

>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionSystemAdmin.java e4ed30bf363142d82a3c40909e160b5825fe60fd

>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 0c742d83c2f60d2448a79376677713a1ff0b11ec

>   samza-core/src/main/scala/org/apache/samza/config/WrappedChooserConfig.scala PRE-CREATION

>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 62bd243115e71612e00784124baa972b33e56cb7

>   samza-core/src/main/scala/org/apache/samza/system/DefaultChooser.scala 5a72e7a3bfba0f06a5a98c6ba26865800d7780b9

>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala d24671ec64a42ede6f779effe9c845e1cbbc5e51

>   samza-core/src/main/scala/org/apache/samza/system/chooser/BatchingChooser.scala PRE-CREATION

>   samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/system/chooser/RoundRobinChooser.scala PRE-CREATION

>   samza-core/src/main/scala/org/apache/samza/system/chooser/TieredPriorityChooser.scala
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/system/chooser/WrappedChooser.scala PRE-CREATION

>   samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala PRE-CREATION

>   samza-core/src/test/scala/org/apache/samza/system/chooser/MockMessageChooser.scala
PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/system/chooser/TestWrappedChooser.scala
PRE-CREATION 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 183c6ccce39dedaef9dba56d5b61ffdedfc9d08a

>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ba08af85892f0793d01e3118fa7ad6d569a0f238

>   samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
7d4e20a8bdc7a45b0a1b464a6f4b868d1d03eab0 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
68050528cd5f8acfe3a1f7563b4e7fe6c7473be5 
> 
> Diff: https://reviews.apache.org/r/13725/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message