kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paolo Patierno <ppatie...@live.com>
Subject RE: About AMQP connector and Kafka Connect framework
Date Fri, 01 Apr 2016 07:23:29 GMT
Hi Ewen,

thanks for your reply. 

My objective here is to access Kafka through AMQP protocol (now I'm working on a bridge from
scratch without using Kafka Connect).

Consider the following scenario ...

Producer side :

An AMQP client connects to the SourceConnector which is listening on AMQP port (5672) and
accept connections. The connection is opened and client execute an AMQP "attach" performative
on a specific address : this address is exactly the Kafka corresponding topic (i.e. /my_topic).
Starting from now the AMQP client starts to send messages ("transfer" frame in the AMQP jargon)
which are translated into SourceRecord returned by the poll() (inside my SourceTask)  : other
than the body, the AMQP message can specify which is the partition or the key to use for determining
partition destination.
Now ... as the connector is a sort of bridge between AMQP and Kafka, it has to return a feedback
to the client in a AMQP fashion : it means to send a "disposition" frame with acknowledgment
about the message injected into Kafka. The problem with connector is that after returning
SourceRecord (from poll() method) we can't have any feedback from the Connect framework if
that record is injected correctly into Kafka (i.e. partition wrong and so on) : it means we
can't send the correct "disposition" to the client.
It could be interesting to have the same callback as for the KafkaProducer client where you
know the result of message sent to Kafka.

Consumer side :

An AMQP client connect to the SinkConnector which is listening on AMQP port (5672) and accept
connections. The connection is opened and client execute an AMQP "attach" 
performative on a specific address : this address is exactly the Kafka 
corresponding topic (i.e. /my_topic).
This operation at runtime isn't possible with Connect because the SinkConnector must be configured
statically using the configuration file and specifying what's the list of topics to read.
With AMQP we have clients that connect and disconnect dinamically so we know the topic from
which have to read only at runtime.

After starting to use Connect and facing with these above problems I stopped to use it and
decided to do it from scratch.
I don't know if it could be other problems related to Connect adoption for AMQP.
Of course I see the need to use two AMQP server on both sides : source and sink connector.
With my current implementation I have only one and then use KafkaConsumer and KafkaProducer
client library.


Paolo PatiernoSenior Software Engineer @Red Hat
Windows Embedded & IoTMicrosoft Azure Advisor 
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience

> Date: Thu, 31 Mar 2016 21:20:15 -0700
> Subject: Re: About AMQP connector and Kafka Connect framework
> From: ewen@confluent.io
> To: users@kafka.apache.org
> On Thu, Mar 31, 2016 at 12:02 AM, Paolo Patierno <ppatierno@live.com> wrote:
> > Hi all,
> >
> > after the following Twitter conversation ...
> >
> > https://twitter.com/jfield/status/715299287479877632
> >
> > I'd like to explain better my concerns about using Kafka Connect for an
> > AMQP connector.
> > I started to develop it almost one month ago (only on source side,
> > https://github.com/ppatierno/kafka-connect-amqp) but then switched to
> > develop a bridge by myself without using Kafka Connect due to some AMQP
> > needs not well supported by Connect framework.
> > Following the first problems I noticed ...
> >
> > In my implementation, the so called "source connector" creates a
> > ProtonServer to accept AMQP connection from sender client and gives the
> > possibility to the related "source task" to receive message on an AMQP
> > connection, extract address, partition (that should be Kafka Topic and
> > related partition) and body and finally return a structured data injected
> > into Kafka.
> > The
> >  first big problem is that if the injected message has a problem (for
> > example the specified partition doesn't exist for the topic), my
> > application logic doesn't receive any exception or any called callback
> > to have such information for handling. This could work fine for
> > pre-settled message (At Most One) but not for At Least One delivery. I
> > need to return a "disposition" message to the client with the delivery
> > state of accepted or rejected.
> >
> This is a really good point and something that is not handled well today.
> Most connectors wouldn't include partition info in SourceRecords -- they
> would simply specify a key (or have no keys at all) and not worry about the
> partitioning in Kafka. It sounds like the way you are mapping things
> actually requires matching partitions. When producing data, you could
> potentially have two types of errors. The first I'd refer to as something
> like "functional" errors -- if Kafka is down, brokers fail, networks
> between connect and brokers fail, etc, you simply can't get the data into
> Kafka. The second we might refer to as semantic or mapping errors. The
> example you're giving (mismatched number of partitions) is one case.
> Another case I can think of is when the Converter you are using simply
> can't support the format of data provided (which we try to avoid by only
> supporting common formats, but inevitably, some serialization format won't
> work).
> You are right that feedback about either type of error are not well
> supported today. Functional errors we actually just try to address in the
> framework and don't really want the connector to be aware of. For the most
> part, retries and pausing consumption of new data will address this; you
> want it exposed via metrics so someone can discover the problem, but the
> connector probably can't do anything about it anyway, so providing that
> info to the connector is not helpful.
> For the semantic errors, we may need some more support. I'm curious what
> type of feedback you'd like and at what granularity (per record? per set of
> poll() records? something else?) and immediacy (sync? async?). If you
> encounter one of these errors, what would your connector do in response?
> Something more than logging an error? Adding something to Connect might
> make sense here, but it's worth thinking through the entire subsequent
> sequence of events to understand exactly what type of feedback is needed.
> >
> > If I want to use the same framework to receive from Kafka but on AMQP
> > connection as well, I have a bigger problem.
> > My current implementation hasn't the receiver side yet but it should be as
> > follow : the "sink connector" has to create a ProtonServer to accept AMQP
> > connection and start reading messages from a Kafka topic that should be the
> > AMQP address (node) specified by the remote AMQP connected receiver.
> > The big problem is that in the Kafka Connect
> >  framework architecture, the topic from which to read MUST be defined
> > statically in a configuration file used by the framework itself on
> > startup. In my scenario, I need to start reading from a Kafka topic
> > defined "dinamically" because it depends on the address/node specified by
> > the remote AMQP receiver. The receiver attach to a link on an address that
> > should be the Kafka topic to consume; the address is defined at runtime and
> > the pattern proposal (https://issues.apache.org/jira/browse/KAFKA-3073)
> > can't be enough.
> >
> I think this is the piece that I was confused by in the Twitter discussion,
> but admittedly 140 chars isn't great for explaining intent. Can we maybe
> take a step back from Kafka Connect and just describe the mapping you're
> looking for between Kafka and AMQP? How do you expect messages to be
> routed? What is the translation that you expect to happen? I think if we
> get rid of any assumptions Connect currently makes, we can get at what
> you're really trying to achieve and then figure out whether that's
> something Connect can currently support or if it's a gap in the current
> functionality that we should try to address.
> Thanks,
> Ewen
> >
> > Paolo
> >
> > Paolo PatiernoSenior Software Engineer
> >
> >
> > Windows Embedded & IoTMicrosoft Azure Advisor
> > Twitter : @ppatierno
> > Linkedin : paolopatierno
> > Blog : DevExperienceBlog : Embedded101
> >
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message