kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ewen Cheslack-Postava <e...@confluent.io>
Subject Re: how to ingest a database with a Kafka Connect cluster in parallel?
Date Tue, 03 Jan 2017 19:55:24 GMT
The unit of parallelism in connect is a task. It's only listing one task,
so you only have one process copying data. The connector can consume data
from within a single *database* in parallel, but each *table* must be
handled by a single task. Since your table whitelist only includes a single
table, the connector will only generate a single task. If you add more
tables to the whitelist then you'll see more tasks in the status API output.

-Ewen

On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yyz1989@gmail.com> wrote:

> Hi all,
>
> I am trying to run a Kafka Connect cluster to ingest data from a relational
> database with jdbc connector.
>
> I have been investigating many other solutions including Spark, Flink and
> Flume before using Kafka Connect, but none of them can be used to ingest
> relational databases in a clusterable way. With "cluster" I mean ingesting
> one database with several distributed processes in parallel, instead of
> each process in the cluster ingesting different databases. Kafka Connect is
> the option I am investigating currently. After reading the documentation, I
> have not found any clear statement about if my use case can be supported,
> so I have to make a test to figure it out.
>
> I created a cluster with the following docker container configuration:
>
> ---
> version: '2'
> services:
>  zookeeper:
>    image: confluentinc/cp-zookeeper
>    hostname: zookeeper
>    ports:
>      - "2181"
>    environment:
>      ZOOKEEPER_CLIENT_PORT: 2181
>      ZOOKEEPER_TICK_TIME: 2000
>
>   broker1:
>    image: confluentinc/cp-kafka
>    hostname: broker1
>    depends_on:
>      - zookeeper
>    ports:
>      - '9092'
>    environment:
>      KAFKA_BROKER_ID: 1
>      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'
>
>   broker2:
>    image: confluentinc/cp-kafka
>    hostname: broker2
>    depends_on:
>      - zookeeper
>    ports:
>      - '9092'
>    environment:
>      KAFKA_BROKER_ID: 2
>      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'
>
>   broker3:
>    image: confluentinc/cp-kafka
>    hostname: broker3
>    depends_on:
>      - zookeeper
>    ports:
>      - '9092'
>    environment:
>      KAFKA_BROKER_ID: 3
>      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'
>
>   schema_registry:
>    image: confluentinc/cp-schema-registry
>    hostname: schema_registry
>    depends_on:
>      - zookeeper
>      - broker1
>      - broker2
>      - broker3
>    ports:
>      - '8081'
>    environment:
>      SCHEMA_REGISTRY_HOST_NAME: schema_registry
>      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
>
>   connect1:
>    image: confluentinc/cp-kafka-connect
>    hostname: connect1
>    depends_on:
>      - zookeeper
>      - broker1
>      - broker2
>      - broker3
>      - schema_registry
>    ports:
>      - "8083"
>    environment:
>      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
>      CONNECT_REST_ADVERTISED_HOST_NAME: connect1
>      CONNECT_REST_PORT: 8083
>      CONNECT_GROUP_ID: compose-connect-group
>      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
>      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
>      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
>      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
>      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081
> '
>      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
>      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081'
>      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
>      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
>      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>
>   connect2:
>    image: confluentinc/cp-kafka-connect
>    hostname: connect2
>    depends_on:
>      - zookeeper
>      - broker1
>      - broker2
>      - broker3
>      - schema_registry
>    ports:
>      - "8083"
>    environment:
>      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
>      CONNECT_REST_ADVERTISED_HOST_NAME: connect2
>      CONNECT_REST_PORT: 8083
>      CONNECT_GROUP_ID: compose-connect-group
>      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
>      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
>      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
>      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
>      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081
> '
>      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
>      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081'
>      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
>      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
>      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>
>   connect3:
>    image: confluentinc/cp-kafka-connect
>    hostname: connect3
>    depends_on:
>      - zookeeper
>      - broker1
>      - broker2
>      - broker3
>      - schema_registry
>    ports:
>      - "8083"
>    environment:
>      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
>      CONNECT_REST_ADVERTISED_HOST_NAME: connect3
>      CONNECT_REST_PORT: 8083
>      CONNECT_GROUP_ID: compose-connect-group
>      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
>      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
>      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
>      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
>      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081
> '
>      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
>      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> http://schema_registry:8081'
>      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
>      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> JsonConverter
>      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>
>   control-center:
>    image: confluentinc/cp-enterprise-control-center
>    depends_on:
>      - zookeeper
>      - broker1
>      - broker2
>      - broker3
>      - schema_registry
>      - connect1
>      - connect2
>      - connect3
>    ports:
>      - "9021:9021"
>    environment:
>      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro
> ker3:9092'
>      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>      CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
> onnect3:8083'
>      CONTROL_CENTER_REPLICATION_FACTOR: 1
>      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
>      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
>      PORT: 9021
>
>   postgres:
>    image: postgres
>    hostname: postgres
>    ports:
>      - "5432"
>    environment:
>      POSTGRES_PASSWORD: postgres
>
> The Kafka cluster is running properly, but I don't know how to verify if
> the Kafka Connect cluster is running properly. I prepared some test data in
> the database, and created a source connector with the following
> configuration:
>
> {
>  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
>  "name": "test",
>  "tasks.max": 3,
>  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
>  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
>  "connection.url": "jdbc:postgresql://postgres:
> 5432/postgres?user=postgres&
> password=postgres",
>  "table.whitelist": "pgbench_accounts",
>  "batch.max.rows": 1,
>  "topic.prefix": "test",
>  "mode": "incrementing",
>  "incrementing.column.name": "aid"
> }
>
> The ingestion process is correct and I can consume the produced messages.
> But I still have no way to figure out if the ingestion is parallelized. I
> called the status API and received the following result:
>
> {
>    "name":"test",
>    "connector":{
>       "state":"RUNNING",
>       "worker_id":"connect2:8083"
>    },
>    "tasks":[
>       {
>          "state":"RUNNING",
>          "id":0,
>          "worker_id":"connect3:8083"
>       }
>    ]
> }
>
> This result is the same for all instances. Does it mean the ingestion tasks
> are not parallelized? Is there anything important I am missing or this type
> of clustering is simply not supported?
>
> Any comments and suggestions are highly appreciated. Have a nice day!
>
> Best regards,
> Yang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message