kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: Running cluster of stream processing application
Date Mon, 12 Dec 2016 10:54:04 GMT
In the scenario you mention above about max.poll.interval.ms, yes if the
timeout was reached then there would be a rebalance and one of the standby
tasks would take over. However the original task may still be processing
the data when the rebalance occurs and would throw an exception when it
tries to commit the offsets (as it would no longer be the owner of the
partitions). In this case the StreamThread would terminate, so you would
want to have set up an UncaughtExceptionHandler such that you can be
alerted and take any necessary actions, i.e., shutdown the app so it can be
auto-restarted.

On Mon, 12 Dec 2016 at 10:27 Sachin Mittal <sjmittal@gmail.com> wrote:

> Understood.
>
> Also the line
> Thanks for the application. It is not clear that clustering depends on how
> source topics are partitioned.
>
> Should be read as
> Thanks for the explanation. It is now clear that clustering depends on how
> source topics are partitioned.
>
> Apologies for auto-correct.
>
> One think I want to know is say streams applications consumer has following
> defaults set
> max.poll.interval.ms 300000
> max.poll.records 500
>
> So it is picking 500 records for one batch and pushing it downstream and
> waiting for 300 sec before fetching next set of records.
> It may be possible that for that batch of 500 it may take more than 300
> sec.
>
> So broker will consider that consumer failed and will re-balance for that
> partition and move on to other consumer in that application.
>
> Would this mean that my current streams application would come into standby
> mode and one of the standby would be woken up and become the main
> application?
>
> Also I suppose if that is true then it would still process the complete
> downstream before becoming standby.
>
> Thanks
> Sachin
>
>
>
> On Mon, Dec 12, 2016 at 3:19 PM, Damian Guy <damian.guy@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > The KafkaStreams StreamsPartitionAssignor will take care of assigning the
> > Standby Tasks to the other instances of your Kafka Streams application.
> The
> > state store updates are all handled by reading from the change-logs and
> > updating local copies, there is no communication required between the
> > individual application instances to do this as all data is flowing
> through
> > Kafka, i.e., they are just reading from the topic that backs the state
> > store.
> >
> > The State Dir is independent on each machine, but that path must exist.
> > Kafka Streams doesn't try and keep the directories themselves in sync,
> > rather it will update local copies of the State Stores that happen to be
> > under that directory path. The idea being that if the active task fails
> it
> > can quickly fail over to one of the standby tasks and not have to spend
> too
> > much time catching up to the head of the change-log.
> >
> > Yes, you should keep the num.standby.replicas value the same on all
> > instances.
> >
> > Yes, if you deploy three instances and one crashes, then one of the
> others
> > will take over the tasks.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 12 Dec 2016 at 09:15 Sachin Mittal <sjmittal@gmail.com> wrote:
> >
> > > Hi,
> > > Thanks for the application. It is not clear that clustering depends on
> > how
> > > source topics are partitioned.
> > > In our case I guess num.standby.replicas settings is best suited.
> > >
> > > If say I set this to 2 and run two more same application in two
> different
> > > machines, how would my original instance know in which two machines it
> > > needs to be keep the state store up to dated.
> > >
> > > My application has a setting
> > > props.put(StreamsConfig.STATE_DIR_CONFIG, "/data01/kafka-streams");
> > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test";
> > >
> > > So when you say it keeps state store up to date, means it will keep the
> > > dirs /data01/kafka-streams/test in sync in all the three machines?
> > >
> > > Also in the other two standby application do I need to keep the same
> > value
> > > (2) for num.standby.replicas.
> > >
> > > So basically need to deploy three instances on same application and if
> > one
> > > fails and crashes one of the standby takes over.
> > >
> > > Please let me know if my understanding is correct so far.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Fri, Dec 9, 2016 at 1:34 PM, Damian Guy <damian.guy@gmail.com>
> wrote:
> > >
> > > > Hi Sachin,
> > > >
> > > > What you have suggested will never happen. If there is only 1
> partition
> > > > there will only ever be one consumer of that partition. So if you
> had 2
> > > > instances of your streams application, and only a single input
> > partition,
> > > > only 1 instance would be processing the data.
> > > > If you are running like this, then you might want to set
> > > > StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG to 1 - this will mean that
> > the
> > > > State Store that is generated by the aggregation is kept up to date
> on
> > > the
> > > > instance that is not processing the data. So in the event that the
> > active
> > > > instance fails, the standby instance should be able to continue
> without
> > > too
> > > > much of a gap in processing time.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Fri, 9 Dec 2016 at 04:55 Sachin Mittal <sjmittal@gmail.com>
> wrote:
> > > >
> > > > > Hi,
> > > > > I followed the document and I have few questions.
> > > > > Say I have a single partition input key topic and say I run 2
> streams
> > > > > application from machine1 and machine2.
> > > > > Both the application have same application id are have identical
> > code.
> > > > > Say topic1 has messages like
> > > > > (k1, v11)
> > > > > (k1, v12)
> > > > > (k1, v13)
> > > > > (k2, v21)
> > > > > (k2, v22)
> > > > > (k2, v23)
> > > > > When I was running single application I was getting results like
> > > > > (k1, agg(v11, v12, v13))
> > > > > (k2, agg(v21, v22, v23))
> > > > >
> > > > > Now when 2 applications are run and say messages are read in round
> > > robin
> > > > > fashion.
> > > > > v11 v13 v22 - machine 1
> > > > > v12 v21 v23 - machine 2
> > > > >
> > > > > The aggregation at machine 1 would be
> > > > > (k1, agg(v11, v13))
> > > > > (k2, agg(v22))
> > > > >
> > > > > The aggregation at machine 2 would be
> > > > > (k1, agg(v12))
> > > > > (k2, agg(v21, v23))
> > > > >
> > > > > So now where do I join the independent results of these 2
> aggregation
> > > to
> > > > > get the final result as expected when single instance was running.
> > > > >
> > > > > Note my high level dsl is sometime like
> > > > > srcSTopic.aggragate(...).foreach(key, aggregation) {
> > > > >     //process aggragated value and push it to some external storage
> > > > > }
> > > > >
> > > > > So I want this each to be running against the final set of
> aggregated
> > > > > value. Do I need to add another step before foreach to make sure
> the
> > > > > different results from 2 machines are joined to get the final one
> as
> > > > > expected. If yes what does that step 2.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak <
> > > > > mathieu.fenniak@replicon.com> wrote:
> > > > >
> > > > > > Hi Sachin,
> > > > > >
> > > > > > Some quick answers, and a link to some documentation to read
> more:
> > > > > >
> > > > > > - If you restart the application, it will start from the point
it
> > > > crashed
> > > > > > (possibly reprocessing a small window of records).
> > > > > >
> > > > > > - You can run more than one instance of the application.  They'll
> > > > > > coordinate by virtue of being part of a Kafka consumer group;
if
> > one
> > > > > > crashes, the partitions that it was reading from will be picked
> up
> > by
> > > > > other
> > > > > > instances.
> > > > > >
> > > > > > - When running more than one instance, the tasks will be
> > distributed
> > > > > > between the instances.
> > > > > >
> > > > > > Confluent's docs on the Kafka Streams architecture goes into
a
> lot
> > > more
> > > > > > detail: http://docs.confluent.io/3.0.0/streams/architecture.html
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal <
> sjmittal@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > > We were able to run a stream processing application against
a
> > > fairly
> > > > > > decent
> > > > > > > load of messages in production environment.
> > > > > > >
> > > > > > > To make the system robust say the stream processing application
> > > > > crashes,
> > > > > > is
> > > > > > > there a way to make it auto start from the point when it
> crashed?
> > > > > > >
> > > > > > > Also is there any concept like running the same application
in
> a
> > > > > cluster,
> > > > > > > where one fails, other takes over, until we bring back
up the
> > > failed
> > > > > node
> > > > > > > of streams application.
> > > > > > >
> > > > > > > If yes, is there any guidelines or some knowledge base
we can
> > look
> > > at
> > > > > to
> > > > > > > understand how this would work.
> > > > > > >
> > > > > > > Is there way like in spark, where the driver program
> distributes
> > > the
> > > > > > tasks
> > > > > > > across various nodes in a cluster, is there something similar
> in
> > > > kafka
> > > > > > > streaming too.
> > > > > > >
> > > > > > > Thanks
> > > > > > > Sachin
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message