kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ewen Cheslack-Postava <e...@confluent.io>
Subject Re: About AMQP connector and Kafka Connect framework
Date Fri, 01 Apr 2016 04:20:15 GMT
On Thu, Mar 31, 2016 at 12:02 AM, Paolo Patierno <ppatierno@live.com> wrote:

> Hi all,
> after the following Twitter conversation ...
> https://twitter.com/jfield/status/715299287479877632
> I'd like to explain better my concerns about using Kafka Connect for an
> AMQP connector.
> I started to develop it almost one month ago (only on source side,
> https://github.com/ppatierno/kafka-connect-amqp) but then switched to
> develop a bridge by myself without using Kafka Connect due to some AMQP
> needs not well supported by Connect framework.
> Following the first problems I noticed ...
> In my implementation, the so called "source connector" creates a
> ProtonServer to accept AMQP connection from sender client and gives the
> possibility to the related "source task" to receive message on an AMQP
> connection, extract address, partition (that should be Kafka Topic and
> related partition) and body and finally return a structured data injected
> into Kafka.
> The
>  first big problem is that if the injected message has a problem (for
> example the specified partition doesn't exist for the topic), my
> application logic doesn't receive any exception or any called callback
> to have such information for handling. This could work fine for
> pre-settled message (At Most One) but not for At Least One delivery. I
> need to return a "disposition" message to the client with the delivery
> state of accepted or rejected.

This is a really good point and something that is not handled well today.
Most connectors wouldn't include partition info in SourceRecords -- they
would simply specify a key (or have no keys at all) and not worry about the
partitioning in Kafka. It sounds like the way you are mapping things
actually requires matching partitions. When producing data, you could
potentially have two types of errors. The first I'd refer to as something
like "functional" errors -- if Kafka is down, brokers fail, networks
between connect and brokers fail, etc, you simply can't get the data into
Kafka. The second we might refer to as semantic or mapping errors. The
example you're giving (mismatched number of partitions) is one case.
Another case I can think of is when the Converter you are using simply
can't support the format of data provided (which we try to avoid by only
supporting common formats, but inevitably, some serialization format won't

You are right that feedback about either type of error are not well
supported today. Functional errors we actually just try to address in the
framework and don't really want the connector to be aware of. For the most
part, retries and pausing consumption of new data will address this; you
want it exposed via metrics so someone can discover the problem, but the
connector probably can't do anything about it anyway, so providing that
info to the connector is not helpful.

For the semantic errors, we may need some more support. I'm curious what
type of feedback you'd like and at what granularity (per record? per set of
poll() records? something else?) and immediacy (sync? async?). If you
encounter one of these errors, what would your connector do in response?
Something more than logging an error? Adding something to Connect might
make sense here, but it's worth thinking through the entire subsequent
sequence of events to understand exactly what type of feedback is needed.

> If I want to use the same framework to receive from Kafka but on AMQP
> connection as well, I have a bigger problem.
> My current implementation hasn't the receiver side yet but it should be as
> follow : the "sink connector" has to create a ProtonServer to accept AMQP
> connection and start reading messages from a Kafka topic that should be the
> AMQP address (node) specified by the remote AMQP connected receiver.
> The big problem is that in the Kafka Connect
>  framework architecture, the topic from which to read MUST be defined
> statically in a configuration file used by the framework itself on
> startup. In my scenario, I need to start reading from a Kafka topic
> defined "dinamically" because it depends on the address/node specified by
> the remote AMQP receiver. The receiver attach to a link on an address that
> should be the Kafka topic to consume; the address is defined at runtime and
> the pattern proposal (https://issues.apache.org/jira/browse/KAFKA-3073)
> can't be enough.

I think this is the piece that I was confused by in the Twitter discussion,
but admittedly 140 chars isn't great for explaining intent. Can we maybe
take a step back from Kafka Connect and just describe the mapping you're
looking for between Kafka and AMQP? How do you expect messages to be
routed? What is the translation that you expect to happen? I think if we
get rid of any assumptions Connect currently makes, we can get at what
you're really trying to achieve and then figure out whether that's
something Connect can currently support or if it's a gap in the current
functionality that we should try to address.


> Paolo
> Paolo PatiernoSenior Software Engineer
> Windows Embedded & IoTMicrosoft Azure Advisor
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperienceBlog : Embedded101

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message