kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ewen Cheslack-Postava <e...@confluent.io>
Subject Re: how to ingest a database with a Kafka Connect cluster in parallel?
Date Tue, 03 Jan 2017 21:42:55 GMT
It's an implementation detail of the JDBC connector. You could potentially
write a connector that parallelizes at that level, but you lose other
potentially useful properties (e.g. ordering). To split at this level you'd
have to do something like have each task be responsible for a subset of
rowids in the database.

-Ewen

On Tue, Jan 3, 2017 at 1:24 PM, Yuanzhe Yang <yyz1989@gmail.com> wrote:

> Hi Ewen,
>
> Thanks a lot for your reply. So it means we cannot parallelize ingestion of
> one table with multiple processes. Is it because of Kafka Connect or the
> JDBC connector?
>
> Have a nice day.
>
> Best regards,
> Yang
>
>
> 2017-01-03 20:55 GMT+01:00 Ewen Cheslack-Postava <ewen@confluent.io>:
>
> > The unit of parallelism in connect is a task. It's only listing one task,
> > so you only have one process copying data. The connector can consume data
> > from within a single *database* in parallel, but each *table* must be
> > handled by a single task. Since your table whitelist only includes a
> single
> > table, the connector will only generate a single task. If you add more
> > tables to the whitelist then you'll see more tasks in the status API
> > output.
> >
> > -Ewen
> >
> > On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yyz1989@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > I am trying to run a Kafka Connect cluster to ingest data from a
> > relational
> > > database with jdbc connector.
> > >
> > > I have been investigating many other solutions including Spark, Flink
> and
> > > Flume before using Kafka Connect, but none of them can be used to
> ingest
> > > relational databases in a clusterable way. With "cluster" I mean
> > ingesting
> > > one database with several distributed processes in parallel, instead of
> > > each process in the cluster ingesting different databases. Kafka
> Connect
> > is
> > > the option I am investigating currently. After reading the
> > documentation, I
> > > have not found any clear statement about if my use case can be
> supported,
> > > so I have to make a test to figure it out.
> > >
> > > I created a cluster with the following docker container configuration:
> > >
> > > ---
> > > version: '2'
> > > services:
> > >  zookeeper:
> > >    image: confluentinc/cp-zookeeper
> > >    hostname: zookeeper
> > >    ports:
> > >      - "2181"
> > >    environment:
> > >      ZOOKEEPER_CLIENT_PORT: 2181
> > >      ZOOKEEPER_TICK_TIME: 2000
> > >
> > >   broker1:
> > >    image: confluentinc/cp-kafka
> > >    hostname: broker1
> > >    depends_on:
> > >      - zookeeper
> > >    ports:
> > >      - '9092'
> > >    environment:
> > >      KAFKA_BROKER_ID: 1
> > >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'
> > >
> > >   broker2:
> > >    image: confluentinc/cp-kafka
> > >    hostname: broker2
> > >    depends_on:
> > >      - zookeeper
> > >    ports:
> > >      - '9092'
> > >    environment:
> > >      KAFKA_BROKER_ID: 2
> > >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'
> > >
> > >   broker3:
> > >    image: confluentinc/cp-kafka
> > >    hostname: broker3
> > >    depends_on:
> > >      - zookeeper
> > >    ports:
> > >      - '9092'
> > >    environment:
> > >      KAFKA_BROKER_ID: 3
> > >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'
> > >
> > >   schema_registry:
> > >    image: confluentinc/cp-schema-registry
> > >    hostname: schema_registry
> > >    depends_on:
> > >      - zookeeper
> > >      - broker1
> > >      - broker2
> > >      - broker3
> > >    ports:
> > >      - '8081'
> > >    environment:
> > >      SCHEMA_REGISTRY_HOST_NAME: schema_registry
> > >      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
> > >
> > >   connect1:
> > >    image: confluentinc/cp-kafka-connect
> > >    hostname: connect1
> > >    depends_on:
> > >      - zookeeper
> > >      - broker1
> > >      - broker2
> > >      - broker3
> > >      - schema_registry
> > >    ports:
> > >      - "8083"
> > >    environment:
> > >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> broker3:9092'
> > >      CONNECT_REST_ADVERTISED_HOST_NAME: connect1
> > >      CONNECT_REST_PORT: 8083
> > >      CONNECT_GROUP_ID: compose-connect-group
> > >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081
> > > '
> > >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> > >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081'
> > >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > >      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >
> > >   connect2:
> > >    image: confluentinc/cp-kafka-connect
> > >    hostname: connect2
> > >    depends_on:
> > >      - zookeeper
> > >      - broker1
> > >      - broker2
> > >      - broker3
> > >      - schema_registry
> > >    ports:
> > >      - "8083"
> > >    environment:
> > >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> broker3:9092'
> > >      CONNECT_REST_ADVERTISED_HOST_NAME: connect2
> > >      CONNECT_REST_PORT: 8083
> > >      CONNECT_GROUP_ID: compose-connect-group
> > >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081
> > > '
> > >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> > >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081'
> > >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > >      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >
> > >   connect3:
> > >    image: confluentinc/cp-kafka-connect
> > >    hostname: connect3
> > >    depends_on:
> > >      - zookeeper
> > >      - broker1
> > >      - broker2
> > >      - broker3
> > >      - schema_registry
> > >    ports:
> > >      - "8083"
> > >    environment:
> > >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> broker3:9092'
> > >      CONNECT_REST_ADVERTISED_HOST_NAME: connect3
> > >      CONNECT_REST_PORT: 8083
> > >      CONNECT_GROUP_ID: compose-connect-group
> > >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081
> > > '
> > >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> > >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > http://schema_registry:8081'
> > >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > >      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > > JsonConverter
> > >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >
> > >   control-center:
> > >    image: confluentinc/cp-enterprise-control-center
> > >    depends_on:
> > >      - zookeeper
> > >      - broker1
> > >      - broker2
> > >      - broker3
> > >      - schema_registry
> > >      - connect1
> > >      - connect2
> > >      - connect3
> > >    ports:
> > >      - "9021:9021"
> > >    environment:
> > >      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro
> > > ker3:9092'
> > >      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > >      CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
> > > onnect3:8083'
> > >      CONTROL_CENTER_REPLICATION_FACTOR: 1
> > >      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
> > >      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
> > >      PORT: 9021
> > >
> > >   postgres:
> > >    image: postgres
> > >    hostname: postgres
> > >    ports:
> > >      - "5432"
> > >    environment:
> > >      POSTGRES_PASSWORD: postgres
> > >
> > > The Kafka cluster is running properly, but I don't know how to verify
> if
> > > the Kafka Connect cluster is running properly. I prepared some test
> data
> > in
> > > the database, and created a source connector with the following
> > > configuration:
> > >
> > > {
> > >  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
> > >  "name": "test",
> > >  "tasks.max": 3,
> > >  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> > >  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> > >  "connection.url": "jdbc:postgresql://postgres:
> > > 5432/postgres?user=postgres&
> > > password=postgres",
> > >  "table.whitelist": "pgbench_accounts",
> > >  "batch.max.rows": 1,
> > >  "topic.prefix": "test",
> > >  "mode": "incrementing",
> > >  "incrementing.column.name": "aid"
> > > }
> > >
> > > The ingestion process is correct and I can consume the produced
> messages.
> > > But I still have no way to figure out if the ingestion is
> parallelized. I
> > > called the status API and received the following result:
> > >
> > > {
> > >    "name":"test",
> > >    "connector":{
> > >       "state":"RUNNING",
> > >       "worker_id":"connect2:8083"
> > >    },
> > >    "tasks":[
> > >       {
> > >          "state":"RUNNING",
> > >          "id":0,
> > >          "worker_id":"connect3:8083"
> > >       }
> > >    ]
> > > }
> > >
> > > This result is the same for all instances. Does it mean the ingestion
> > tasks
> > > are not parallelized? Is there anything important I am missing or this
> > type
> > > of clustering is simply not supported?
> > >
> > > Any comments and suggestions are highly appreciated. Have a nice day!
> > >
> > > Best regards,
> > > Yang
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message