metron-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ali Nazemian <alinazem...@gmail.com>
Subject Re: Heterogeneous indexing batch size for different Metron feeds
Date Mon, 11 Dec 2017 23:46:08 GMT
Hi Michael,

I think you misunderstood my concern. We don't have any problem regarding
the tuning of the indexing topology at this moment. However, based on our
observations, it looks like there is some kind of bug in the indexing
topology about the way it is handling the batch size, so I was trying to
find an explanation for our observations. Our observations are as follows.

- As soon as we start using different indexing batch sizes for different
Metron feeds the rate of failure on indexing topology increases
significantly. For example, if we use 1000 Elasticsearch batch size for all
the feeds throughput is reasonable. If we use 500 Elasticsearch batch size
for all the feeds, still throughput is reasonable and not bad. However, if
we use 1000 batch size for one feed and 500 batch size for another one,
throughput drops and error rate increases significantly. I don't think it
is something produced by design. It feels like a bug to me.

- If we change the ES indexing batch size, it doesn't apply spontaneously
if there are some messages still in Kafka indexing topics waiting the get
processed. I am not sure the time it takes to accept new batch size is
completely related to the required time to process all the old messages in
indexing topic, or it is lazy by design to update the batch size.

Regards,
Ali


On Mon, Dec 11, 2017 at 5:15 AM, Michael Miklavcic <
michael.miklavcic@gmail.com> wrote:

> I know there are a number of questions in this thread, but I have some
> specific comments on dealing with your failed tuples in the indexing
> topology. For starters, I can almost guarantee that your problem is going
> to be with tuning ES and not HDFS. Not to say you won't need to tune HDFS,
> but look at the # emitted tuples, capacity, and # acked in Storm for both
> the hdfsIndexingBolt and the elasticsearchIndexingBolt. See which one is
> falling behind and start there. Turn off HDFS or ES and see what the
> difference is. That at least helps with narrowing focus in the short term.
> Having been through this a couple times now, I'd like to tell you there's a
> fast and easy silver bullet, but there isn't one. There are a lot of moving
> parts and you need to be careful of making small incremental changes
> towards a stable stream.
>
> If you're not seeing exceptions in the logs, e.g. OOM issues, but you're
> seeing a lot of failed tuples it sounds like your kafka spout speed might
> be ok. So let's focus on the ES indexing portion for now. Check out the
> workflow I've outlined below and you'll see my thoughts on one potential
> ordering of steps to perf tuning the indexing topology.
>
>    - Look at the # emitted tuples, capacity, and # acked in Storm for both
>    the hdfsIndexingBolt and the elasticsearchIndexingBolt. Either of these
> way
>    ahead or way behind?
>    - Try disabling HDFS indexing (enabled: false in the indexing configs).
>    HDFS is probably not your bottleneck, but if it is, consider disabling
> ES
>    across all sensors for indexing instead. Push the config changes and
>    restart your topologies. I don't think you technically need to restart
> the
>    topologies for ZK config changes here, but it makes debugging a lot
>    simpler. If you find changing it on the fly works well, then I'm sure
>    myself and the community will be happy to hear the results!
>    - Adjust your # of workers up along with # ackers. When we were testing
>    just HDFS (no ES) we had 4 workers and 24 ackers for a million
>    records/second. Restart the topology. Any difference?
>    - Now increasing ES indexing bolt parallelism. Restart topology.
>    - Increase batch sizes for ES - try 5, then 50, then 100. Restart after
>    each test.
>    - Once you've increased parallelism and removed variables like HDFS from
>    the equation, you can narrow in on the real culprit. One method I found
>    extremely effective for getting stability is to adjust the
>    *indexing_topology_max_spout_pending* in Ambari, under Indexing. By
>    default, the Kafka spout will suck data as fast as the spout can take
> it.
>    This in turn can cause timeouts when ES is not tuned/able to keep up.
>    Basically, the indexing topology will get further and further behind,
>    regardless of how much parallelism (Workers and Executors) you throw at
> it
>    because it's unable to get data into ES at the rate matching what's
> coming
>    from your spouts. You might even think to up the number of partitions in
>    your indexing Kafka topic, which helps overall consumption rate, but
> that
>    will only firehouse the downstream bottleneck, eventually resulting in
> the
>    failed tuples you're seeing. You need to first find a stable rate that
> ES
>    can handle given its current tuning configuration and then work up from
>    there. I just went through this the other day and I started by lowering
> the
>    setting to a conservative 1000.
>    - From this point, you can work on tuning ES or HDFS, whichever the
>    culprit is. Realistically, you're going to need to do a few tuning
> cycles
>    to tune that end point, increase the upstream throughput, tune again,
> and
>    repeat until you get the rate of throughput necessary to handle your
> live
>    feeds.
>
> The other recommendation I have that goes hand in hand with the approach
> above is to look at the lag offsets for your indexing topic [1]. This is
> invaluable, and it's really, really easy to use from the CLI. There are 2
> ways to approach this problem:
>
>    1. live sensor data streaming with the indexing topology keeping pace
>    e2e. e.g. bro sensor to bro topic to enrichment topic to indexing topic.
>    You would expect to see your kafka partition consumer offsets
> maintaining a
>    fairly consistent lag after a few minutes of activity and stabilizing.
>    2. reading a large quantity of data already landed in the indexing topic
>    (start indexing topology with Kafka offset = EARLIEST). The goal would
> be
>    to see your offset lag catch up eventually. This allows you to stress
> test
>    your indexing topology different from a normal streaming experience.
> One,
>    it simplifies the moving parts (no parsers, enrichments are live at this
>    point - you're replaying data already in the topic). But two, you can
>    easily tune the levers outlined above, iterate with each change in rapid
>    succession, and record your results.
>
> 1.
> https://github.com/apache/metron/blob/master/metron-
> platform/Performance-tuning-guide.md
>
> Sample command without Kerberos enabled (see link [1] for more detail with
> Kerberos):
>
> watch -n 10 -d ${KAFKA_HOME}/bin/kafka-consumer-groups.sh \
>     --describe \
>     --group indexing \
>     --bootstrap-server $BROKERLIST \
>     --new-consumer
>
> Hope this helps.
>
> Cheers,
> Michael Miklavcic
>
>
> On Sun, Dec 10, 2017 at 5:38 AM, Ali Nazemian <alinazemian@gmail.com>
> wrote:
>
> > This seems not the same as our observations. Whenever there are some
> > messages in the indexing or enrichments backlog, the new configurations
> (at
> > least related to the batch size) won't be applied to the new messages. It
> > will remain as the previous state until it processes all the old
> messages.
> > This scenario can be produced very easily.
> >
> > Create a feed with an inefficient batch size to create a backlog on
> > indexing topic. Then change the batch size to an effective value and wait
> > to see how long it will take to process the backlog. Based on our
> > observations, it takes a while to process messages in a back-log even if
> > you fix the batch size. It feels batch size changes are not synchronised
> > instantly.
> >
> > On Thu, Dec 7, 2017 at 11:45 PM, Otto Fowler <ottobackwards@gmail.com>
> > wrote:
> >
> > > We use TreeCache
> > > <https://curator.apache.org/apidocs/org/apache/curator/
> > framework/recipes/cache/TreeCache.html>
> > > .
> > >
> > > When the configuration is updated in zookeeper, the configuration
> object
> > > in the bolt is updated. This configuration is read on each message, so
> I
> > > think from what I see new configurations should get picked up for the
> > next
> > > message.
> > >
> > > I could be wrong though.
> > >
> > >
> > >
> > >
> > > On December 7, 2017 at 06:47:15, Ali Nazemian (alinazemian@gmail.com)
> > > wrote:
> > >
> > > Thank you very much. Unfortunately, reproducing all the situations are
> > > very costly for us at this moment. We are kind of avoiding to hit that
> > > issue by using the same batch size for all the feeds. Hopefully, with
> the
> > > new PR Casey provided for the segregation of ES and HDFS, it will be
> very
> > > much clear to tune them.
> > >
> > > Do you know how the synchronization of indexing config will happen with
> > > the topology? Does the topology gets synchronised by pulling the last
> > > configs from ZK based on some background mechanism or it is based on an
> > > update trigger? As I mentioned, based on our observation it looks like
> > the
> > > synchronization doesn't work until all the old messages in Kafka queue
> > get
> > > processed based on the old indexing configs.
> > >
> > > Regards,
> > > Ali
> > >
> > > On Thu, Dec 7, 2017 at 12:33 AM, Otto Fowler <ottobackwards@gmail.com>
> > > wrote:
> > >
> > >> Sorry,
> > >> We flush for timeouts on every storm ‘tick’ message, not on every
> > message.
> > >>
> > >>
> > >>
> > >> On December 6, 2017 at 08:29:51, Otto Fowler (ottobackwards@gmail.com
> )
> > >> wrote:
> > >>
> > >> I have looked at it.
> > >>
> > >> We maintain batch lists for each sensor which gather messages to
> index.
> > >> When we get a message that puts it over the batch size the messages
> are
> > >> flushed and written to the target.
> > >> There is also a timeout component, where the batch would be flushed
> > based
> > >> on timeout.
> > >>
> > >> While batch size checking occurs on a per sensor-message receipt
> basis,
> > >> each message, regardless of sensor will trigger a check of the batch
> > >> timeout for all the lists.
> > >>
> > >> At least that is what I think I see.
> > >>
> > >> Without understanding what the failures are for it is hard to see what
> > >> the issue is.
> > >>
> > >> Do we have timing issues where all the lists are timing out all the
> time
> > >> causing some kind of cascading failure for example?
> > >> Does the number of sensors matter?  For example if only one sensor
> > >> topology is running with batch setup X, is everything fine?  Do
> failures
> > >> start after adding Nth additional sensor?
> > >>
> > >> Hopefully someone else on the list may have an idea.
> > >> That code does not have any logging to speak of… well debug / trace
> > >> logging that would help here either.
> > >>
> > >>
> > >>
> > >> On December 6, 2017 at 08:18:01, Ali Nazemian (alinazemian@gmail.com)
> > >> wrote:
> > >>
> > >> Everything looks normal except the high number of failed tuples. Do
> you
> > >> know how the indexing batch size works? Based on our observations it
> > seems
> > >> it doesn't update the messages that are in enrichments and indexing
> > topics.
> > >>
> > >> On Thu, Dec 7, 2017 at 12:13 AM, Otto Fowler <ottobackwards@gmail.com
> >
> > >> wrote:
> > >>
> > >>> What do you see in the storm ui for the indexing topology?
> > >>>
> > >>>
> > >>> On December 6, 2017 at 07:10:17, Ali Nazemian (alinazemian@gmail.com
> )
> > >>> wrote:
> > >>>
> > >>> Both hdfs and Elasticsearch batch sizes. There is no error in the
> logs.
> > >>> It mpacts topology error rate and cause almost 90% error rate on
> > indexing
> > >>> tuples.
> > >>>
> > >>> On 6 Dec. 2017 00:20, "Otto Fowler" <ottobackwards@gmail.com>
wrote:
> > >>>
> > >>> Where are you seeing the errors?  Screenshot?
> > >>>
> > >>>
> > >>> On December 5, 2017 at 08:03:46, Otto Fowler (
> ottobackwards@gmail.com)
> > >>> wrote:
> > >>>
> > >>> Which of the indexing options are you changing the batch size for?
> > >>> HDFS?  Elasticsearch?  Both?
> > >>>
> > >>> Can you give an example?
> > >>>
> > >>>
> > >>>
> > >>> On December 5, 2017 at 02:09:29, Ali Nazemian (alinazemian@gmail.com
> )
> > >>> wrote:
> > >>>
> > >>> No specific error in the logs. I haven't enabled debug/trace, though.
> > >>>
> > >>> On Tue, Dec 5, 2017 at 11:54 AM, Otto Fowler <
> ottobackwards@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> My first thought is what are the errors when you get a high error
> > rate?
> > >>>>
> > >>>>
> > >>>> On December 4, 2017 at 19:34:29, Ali Nazemian (
> alinazemian@gmail.com)
> > >>>> wrote:
> > >>>>
> > >>>> Any thoughts?
> > >>>>
> > >>>> On Sun, Dec 3, 2017 at 11:27 PM, Ali Nazemian <
> alinazemian@gmail.com>
> > >>>> wrote:
> > >>>>
> > >>>> > Hi,
> > >>>> >
> > >>>> > We have noticed recently that no matter what batch size we
use for
> > >>>> Metron
> > >>>> > indexing feeds, as long as we start using different batch
size for
> > >>>> > different Metron feeds, indexing topology throughput will
start
> > >>>> dropping
> > >>>> > due to the high error rate! So I was wondering whether based
on
> the
> > >>>> current
> > >>>> > indexing topology design, we have to choose the same batch
size
> for
> > >>>> all the
> > >>>> > feeds or not. Otherwise, throughout will be dropped. I assume
> since
> > >>>> it is
> > >>>> > acceptable to use different batch sizes for different feeds,
it is
> > not
> > >>>> > expected by design.
> > >>>> >
> > >>>> > Moreover, I have noticed in practice that even if we change
the
> > batch
> > >>>> > size, it will not affect the messages that are already in
> > enrichments
> > >>>> or
> > >>>> > indexing topics, and it will only affect the new messages
that are
> > >>>> coming
> > >>>> > to the parser. Therefore, we need to let all the messages
pass the
> > >>>> indexing
> > >>>> > topology so that we can change the batch size!
> > >>>> >
> > >>>> > It would be great if we can have more details regarding the
design
> > of
> > >>>> this
> > >>>> > section so we can understand our observations are based on
the
> > design
> > >>>> or
> > >>>> > some kind of bug.
> > >>>> >
> > >>>> > Regards,
> > >>>> > Ali
> > >>>> >
> > >>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> A.Nazemian
> > >>>>
> > >>>>
> > >>>
> > >>>
> > >>> --
> > >>> A.Nazemian
> > >>>
> > >>>
> > >>>
> > >>
> > >>
> > >> --
> > >> A.Nazemian
> > >>
> > >>
> > >
> > >
> > > --
> > > A.Nazemian
> > >
> > >
> >
> >
> > --
> > A.Nazemian
> >
>



-- 
A.Nazemian

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message