kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ewen Cheslack-Postava <e...@confluent.io>
Subject Re: kafka connector for mongodb as a source
Date Mon, 10 Apr 2017 02:20:47 GMT
There is some log noise in there from Reflections, but it does look like
your connector & task are being created:

[2017-03-27 18:33:00,057] INFO Instantiated task mongodb-0 with version
0.10.0.1 of type org.apache.kafka.connect.mongodb.MongodbSourceTask
(org.apache.kafka.connect.runtime.Worker:264)

And I see the producer configs for the source task's underlying producer
being logged. Then we see the following, suggesting some sort of connection
is being made successfully:

[2017-03-27 18:33:00,397] INFO Source task WorkerSourceTask{id=mongodb-0}
finished initialization and start
(org.apache.kafka.connect.runtime.WorkerSourceTask:138)
[2017-03-27 18:33:00,442] INFO No server chosen by
ReadPreferenceServerSelector{readPreference=primary} from cluster
description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE,
all=[ServerDescription{address=localhost:27017, type=UNKNOWN,
state=CONNECTING}]}. Waiting for 30000 ms before timing out
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,455] INFO Opened connection
[connectionId{localValue:1, serverValue:4}] to localhost:27017
(org.mongodb.driver.connection:71)
[2017-03-27 18:33:00,457] INFO Monitor thread successfully connected to
server with description ServerDescription{address=localhost:27017,
type=STANDALONE, state=CONNECTED, ok=true,
version=ServerVersion{versionList=[3, 2, 12]}, minWireVersion=0,
maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=536169}
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,491] INFO Opened connection
[connectionId{localValue:2, serverValue:5}] to localhost:27017
(org.mongodb.driver.connection:71)

But then the logs stop. The framework should just be calling poll() on your
source task. Perhaps you could add some logging to your code to give some
hint as to where it is getting stuck? You could also try increasing the log
level for the framework to DEBUG or even TRACE.

-Ewen

On Mon, Mar 27, 2017 at 6:22 AM, VIVEK KUMAR MISHRA 13BIT0066 <
vivekkumar.mishra2013@vit.ac.in> wrote:

> Hi All,
>
> I am creating kafka connector for mongodb as a source .My connector is
> starting and connecting with kafka but it is not committing any offset.
>
> This is output after starting connector.
>
> [root@localhost kafka_2.11-0.10.1.1]# bin/connect-standalone.sh
> config/connect-standalone.properties config/mongodb.properties
> [2017-03-27 18:32:58,019] INFO StandaloneConfig values:
>         rest.advertised.host.name = null
>         task.shutdown.graceful.timeout.ms = 5000
>         rest.host.name = null
>         rest.advertised.port = null
>         bootstrap.servers = [localhost:9092]
>         offset.flush.timeout.ms = 5000
>         offset.flush.interval.ms = 10000
>         rest.port = 8083
>         internal.key.converter = class
> org.apache.kafka.connect.json.JsonConverter
>         access.control.allow.methods =
>         access.control.allow.origin =
>         offset.storage.file.filename = /tmp/connect.offsets
>         internal.value.converter = class
> org.apache.kafka.connect.json.JsonConverter
>         value.converter = class org.apache.kafka.connect.json.
> JsonConverter
>         key.converter = class org.apache.kafka.connect.json.JsonConverter
>  (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:178)
> [2017-03-27 18:32:58,162] INFO Logging initialized @609ms
> (org.eclipse.jetty.util.log:186)
> [2017-03-27 18:32:58,392] INFO Kafka Connect starting
> (org.apache.kafka.connect.runtime.Connect:52)
> [2017-03-27 18:32:58,392] INFO Herder starting
> (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70)
> [2017-03-27 18:32:58,393] INFO Worker starting
> (org.apache.kafka.connect.runtime.Worker:113)
> [2017-03-27 18:32:58,393] INFO Starting FileOffsetBackingStore with file
> /tmp/connect.offsets
> (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
> [2017-03-27 18:32:58,398] INFO Worker started
> (org.apache.kafka.connect.runtime.Worker:118)
> [2017-03-27 18:32:58,398] INFO Herder started
> (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
> [2017-03-27 18:32:58,398] INFO Starting REST server
> (org.apache.kafka.connect.runtime.rest.RestServer:98)
> [2017-03-27 18:32:58,493] INFO jetty-9.2.15.v20160210
> (org.eclipse.jetty.server.Server:327)
> [2017-03-27 18:32:59,621] INFO HV000001: Hibernate Validator 5.1.2.Final
> (org.hibernate.validator.internal.util.Version:27)
> Mar 27, 2017 6:32:59 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The
> (sub)resource method listConnectors in
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource
> contains
> empty path annotation.
> WARNING: The (sub)resource method createConnector in
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource
> contains
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains
> empty
> path annotation.
>
> [2017-03-27 18:33:00,015] INFO Started
> o.e.j.s.ServletContextHandler@44e3760b{/,null,AVAILABLE}
> (org.eclipse.jetty.server.handler.ContextHandler:744)
> [2017-03-27 18:33:00,042] INFO Started ServerConnector@7f58ad44{HTTP/1.1}{
> 0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
> [2017-03-27 18:33:00,043] INFO Started @2492ms
> (org.eclipse.jetty.server.Server:379)
> [2017-03-27 18:33:00,043] INFO REST server listening at
> http://127.0.0.1:8083/, advertising URL http://127.0.0.1:8083/
> (org.apache.kafka.connect.runtime.rest.RestServer:150)
> [2017-03-27 18:33:00,043] INFO Kafka Connect started
> (org.apache.kafka.connect.runtime.Connect:58)
> [2017-03-27 18:33:00,048] INFO ConnectorConfig values:
>         connector.class =
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
>         tasks.max = 1
>         name = mongodb
>         value.converter = null
>         key.converter = null
>  (org.apache.kafka.connect.runtime.ConnectorConfig:178)
> [2017-03-27 18:33:00,048] INFO Creating connector mongodb of type
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
> (org.apache.kafka.connect.runtime.Worker:159)
> [2017-03-27 18:33:00,051] INFO Instantiated connector mongodb with version
> 0.10.0.1 of type class
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
> (org.apache.kafka.connect.runtime.Worker:162)
> [2017-03-27 18:33:00,053] INFO Finished creating connector mongodb
> (org.apache.kafka.connect.runtime.Worker:173)
> [2017-03-27 18:33:00,053] INFO SourceConnectorConfig values:
>         connector.class =
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
>         tasks.max = 1
>         name = mongodb
>         value.converter = null
>         key.converter = null
>  (org.apache.kafka.connect.runtime.SourceConnectorConfig:178)
> [2017-03-27 18:33:00,056] INFO Creating task mongodb-0
> (org.apache.kafka.connect.runtime.Worker:252)
> [2017-03-27 18:33:00,056] INFO ConnectorConfig values:
>         connector.class =
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
>         tasks.max = 1
>         name = mongodb
>         value.converter = null
>         key.converter = null
>  (org.apache.kafka.connect.runtime.ConnectorConfig:178)
> [2017-03-27 18:33:00,057] INFO TaskConfig values:
>         task.class = class
> org.apache.kafka.connect.mongodb.MongodbSourceTask
>  (org.apache.kafka.connect.runtime.TaskConfig:178)
> [2017-03-27 18:33:00,057] INFO Instantiated task mongodb-0 with version
> 0.10.0.1 of type org.apache.kafka.connect.mongodb.MongodbSourceTask
> (org.apache.kafka.connect.runtime.Worker:264)
> [2017-03-27 18:33:00,066] INFO ProducerConfig values:
>         metric.reporters = []
>         metadata.max.age.ms = 300000
>         reconnect.backoff.ms = 50
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         bootstrap.servers = [localhost:9092]
>         ssl.keystore.type = JKS
>         sasl.mechanism = GSSAPI
>         max.block.ms = 9223372036854775807
>         interceptor.classes = null
>         ssl.truststore.password = null
>         client.id =
>         ssl.endpoint.identification.algorithm = null
>         request.timeout.ms = 2147483647
>         acks = all
>         receive.buffer.bytes = 32768
>         ssl.truststore.type = JKS
>         retries = 2147483647
>         ssl.truststore.location = null
>         ssl.keystore.password = null
>         send.buffer.bytes = 131072
>         compression.type = none
>         metadata.fetch.timeout.ms = 60000
>         retry.backoff.ms = 100
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         buffer.memory = 33554432
>         timeout.ms = 30000
>         key.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         ssl.trustmanager.algorithm = PKIX
>         block.on.buffer.full = false
>         ssl.key.password = null
>         sasl.kerberos.min.time.before.relogin = 60000
>         connections.max.idle.ms = 540000
>         max.in.flight.requests.per.connection = 1
>         metrics.num.samples = 2
>         ssl.protocol = TLS
>         ssl.provider = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         batch.size = 16384
>         ssl.keystore.location = null
>         ssl.cipher.suites = null
>         security.protocol = PLAINTEXT
>         max.request.size = 1048576
>         value.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
>         ssl.keymanager.algorithm = SunX509
>         metrics.sample.window.ms = 30000
>         partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>         linger.ms = 0
>  (org.apache.kafka.clients.producer.ProducerConfig:178)
> [2017-03-27 18:33:00,103] INFO ProducerConfig values:
>         metric.reporters = []
>         metadata.max.age.ms = 300000
>         reconnect.backoff.ms = 50
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         bootstrap.servers = [localhost:9092]
>         ssl.keystore.type = JKS
>         sasl.mechanism = GSSAPI
>         max.block.ms = 9223372036854775807
>         interceptor.classes = null
>         ssl.truststore.password = null
>         client.id = producer-1
>         ssl.endpoint.identification.algorithm = null
>         request.timeout.ms = 2147483647
>         acks = all
>         receive.buffer.bytes = 32768
>         ssl.truststore.type = JKS
>         retries = 2147483647
>         ssl.truststore.location = null
>         ssl.keystore.password = null
>         send.buffer.bytes = 131072
>         compression.type = none
>         metadata.fetch.timeout.ms = 60000
>         retry.backoff.ms = 100
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         buffer.memory = 33554432
>         timeout.ms = 30000
>         key.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         ssl.trustmanager.algorithm = PKIX
>         block.on.buffer.full = false
>         ssl.key.password = null
>         sasl.kerberos.min.time.before.relogin = 60000
>         connections.max.idle.ms = 540000
>         max.in.flight.requests.per.connection = 1
>         metrics.num.samples = 2
>         ssl.protocol = TLS
>         ssl.provider = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         batch.size = 16384
>         ssl.keystore.location = null
>         ssl.cipher.suites = null
>         security.protocol = PLAINTEXT
>         max.request.size = 1048576
>         value.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
>         ssl.keymanager.algorithm = SunX509
>         metrics.sample.window.ms = 30000
>         partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>         linger.ms = 0
>  (org.apache.kafka.clients.producer.ProducerConfig:178)
> [2017-03-27 18:33:00,104] INFO Kafka version : 0.10.0.1
> (org.apache.kafka.common.utils.AppInfoParser:83)
> [2017-03-27 18:33:00,104] INFO Kafka commitId : a7a17cdec9eaa6c5
> (org.apache.kafka.common.utils.AppInfoParser:84)
> [2017-03-27 18:33:00,121] INFO Created connector mongodb
> (org.apache.kafka.connect.cli.ConnectStandalone:93)
> [2017-03-27 18:33:00,319] INFO Cluster created with settings
> {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN,
> serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
> (org.mongodb.driver.cluster:71)
> [2017-03-27 18:33:00,397] INFO Source task WorkerSourceTask{id=mongodb-0}
> finished initialization and start
> (org.apache.kafka.connect.runtime.WorkerSourceTask:138)
> [2017-03-27 18:33:00,442] INFO No server chosen by
> ReadPreferenceServerSelector{readPreference=primary} from cluster
> description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE,
> all=[ServerDescription{address=localhost:27017, type=UNKNOWN,
> state=CONNECTING}]}. Waiting for 30000 ms before timing out
> (org.mongodb.driver.cluster:71)
> [2017-03-27 18:33:00,455] INFO Opened connection
> [connectionId{localValue:1, serverValue:4}] to localhost:27017
> (org.mongodb.driver.connection:71)
> [2017-03-27 18:33:00,457] INFO Monitor thread successfully connected to
> server with description ServerDescription{address=localhost:27017,
> type=STANDALONE, state=CONNECTED, ok=true,
> version=ServerVersion{versionList=[3, 2, 12]}, minWireVersion=0,
> maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=536169}
> (org.mongodb.driver.cluster:71)
> [2017-03-27 18:33:00,491] INFO Opened connection
> [connectionId{localValue:2, serverValue:5}] to localhost:27017
> (org.mongodb.driver.connection:71)
> [2017-03-27 18:33:02,162] WARN could not create Dir using directory from
> url
> file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/rdbms/jlib.
> skipping. (org.reflections.Reflections:104)
> java.lang.NullPointerException
>         at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239)
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:98)
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
>         at org.reflections.Reflections.scan(Reflections.java:237)
>         at org.reflections.Reflections.scan(Reflections.java:204)
>         at org.reflections.Reflections.<init>(Reflections.java:129)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(
> AbstractHerder.java:275)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder$1.run(
> AbstractHerder.java:384)
>         at java.lang.Thread.run(Thread.java:745)
> [2017-03-27 18:33:02,170] WARN could not create Vfs.Dir from url. ignoring
> the exception and continuing (org.reflections.Reflections:208)
> org.reflections.ReflectionsException: could not create Vfs.Dir from url,
> no
> matching UrlType was found
> [file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/rdbms/jlib]
> either use fromURL(final URL url, final List<UrlType> urlTypes) or use the
> static setDefaultURLTypes(final List<UrlType> urlTypes) or
> addDefaultURLTypes(UrlType urlType) with your specialized UrlType.
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:109)
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
>         at org.reflections.Reflections.scan(Reflections.java:237)
>         at org.reflections.Reflections.scan(Reflections.java:204)
>         at org.reflections.Reflections.<init>(Reflections.java:129)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(
> AbstractHerder.java:275)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder$1.run(
> AbstractHerder.java:384)
>         at java.lang.Thread.run(Thread.java:745)
> [2017-03-27 18:33:02,471] WARN could not create Dir using directory from
> url file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/jlib.
> skipping. (org.reflections.Reflections:104)
> java.lang.NullPointerException
>         at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239)
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:98)
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
>         at org.reflections.Reflections.scan(Reflections.java:237)
>         at org.reflections.Reflections.scan(Reflections.java:204)
>         at org.reflections.Reflections.<init>(Reflections.java:129)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(
> AbstractHerder.java:275)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder$1.run(
> AbstractHerder.java:384)
>         at java.lang.Thread.run(Thread.java:745)
> [2017-03-27 18:33:02,473] WARN could not create Vfs.Dir from url. ignoring
> the exception and continuing (org.reflections.Reflections:208)
> org.reflections.ReflectionsException: could not create Vfs.Dir from url,
> no
> matching UrlType was found
> [file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/jlib]
> either use fromURL(final URL url, final List<UrlType> urlTypes) or use the
> static setDefaultURLTypes(final List<UrlType> urlTypes) or
> addDefaultURLTypes(UrlType urlType) with your specialized UrlType.
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:109)
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
>         at org.reflections.Reflections.scan(Reflections.java:237)
>         at org.reflections.Reflections.scan(Reflections.java:204)
>         at org.reflections.Reflections.<init>(Reflections.java:129)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(
> AbstractHerder.java:275)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder$1.run(
> AbstractHerder.java:384)
>         at java.lang.Thread.run(Thread.java:745)
> [2017-03-27 18:33:04,338] INFO Reflections took 5826 ms to scan 187 urls,
> producing 6162 keys and 38674 values  (org.reflections.Reflections:229)
>
>
>
> After that it is not doing anything.
>
> This is my config file.
>
> name=mongodb
> connector.class=org.apache.kafka.connect.mongodb.MongodbSourceConnector
> tasks.max=1
> host=localhost
> port=27017
> batch.size=100
> schema.name=mongodbschema
> topic.prefix=mongo-prefix
> databases=sampledb.hero
>
>
>
> Please do suggest me the error ASAP.
>
> Thanks
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message