kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Issue facing, Aerospike Connect for Kafka - Inbound
Date Fri, 14 Jun 2019 21:52:14 GMT
I generate a simple pricing as follows:

 echo "${UUID}:${TICKER}" >> ${IN_FILE}

 UUID is the key and ticker is the name of security like “IBM”

 This is what I have in my shell script

 cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh \

                       --broker-list
rhes75:9092,rhes75:9093,rhes75:9094,rhes564:9092,rhes564:9093,rhes564:9094,rhes76:9092,rhes76:9093,rhes76:9094
\

                       --topic md \

                       --property "parse.key=true" \

                       --property "key.separator=:"

And this is what I get



${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper
rhes75:2181,rhes564:2181,rhes76:2181 --topic md --property print.key=true

 6af48cd4-c4ce-44a0-adb7-8d980b2b79a0    IBM

8436ac7f-d2ae-4762-bf4e-98242241290b    IBM

96bbb149-b11c-440d-a82c-f78e53edda49    IBM

5fa8682b-d81a-466a-abbd-ca80793405b1    IBM

a998446a-497f-4ac7-a5a7-21a524cfc4da    IBM

7920affe-6641-48ef-9e08-5a4b9f66da4d    IBM

Now I want to post this data into aerospike

My files are as follows:



*cat connect-standalone.properties*

# These are defaults. This file just demonstrates how to override some
settings.

bootstrap.servers=rhes75:9092, rhes564:9092,rhes76:9092

# The converters specify the format of data in Kafka and how to translate it

# into Connect data. Every Connect user will need to configure these based
on

# the format they want their data in when loaded from or stored into Kafka

#

key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=org.apache.kafka.connect.storage.StringConverter

##alue.converter=org.apache.kafka.connect.json.JsonConverter

#

# Converter-specific settings can be passed in by prefixing the Converter's

# setting with the converter we want to apply it to

key.converter.schemas.enable=true

value.converter.schemas.enable=false

# The internal converter used for offsets and config data is configurable
and

# must be specified, but most users will always want to use the built-in

# default. Offset and config data is never visible outside of Kafka Connect
in

# this format.

##internal.key.converter=org.apache.kafka.connect.json.JsonConverter

##internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter=org.apache.kafka.connect.storage.StringConverter

internal.value.converter=org.apache.kafka.connect.storage.StringConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

# Flush much faster than normal, which is useful for testing/debugging

offset.flush.interval.ms=10000



# Set to a list of filesystem paths separated by commas (,) to enable class

# loading isolation for plugins (connectors, converters, transformations).
The

# list should consist of top level directories that include any combination
of:

# a) directories immediately containing jars with plugins and their
dependencies

# b) uber-jars with plugins and their dependencies

# c) directories immediately containing the package directory structure of

# classes of plugins and their dependencies Note: symlinks will be followed
to

# discover dependencies or plugins.

# Examples:

plugin.path=/opt/aerospike-kafka-connect-sink/share/kafka



And my sink-properies file is



*cat aerospike-sink.properties*

##

# Copyright 2016 Aerospike, Inc.

#

# Portions may be licensed to Aerospike, Inc. under one or more contributor

# license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION
2.0.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may
not

# use this file except in compliance with the License. You may obtain a
copy of

# the License at http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations
under

# the License.

##

name=aerospike-sink

connector.class=com.aerospike.kafka.connect.sink.AerospikeSinkConnector

tasks.max=1

topics=md

cluster.hosts=rhes75:3000,rhes76:3000

policy.record_exists_action=replace

topic.namespace=trading

topic.set=MARKETDATAAEROSPIKEBATCH

feature_key.path=/etc/aerospike/features.conf

topic.key_field=key

topic.bins=ticker

##topic.bins=ticker:ticker,timeissued:timeissued,price:price

aerospike.username=trading_user_RW

aerospike.password=xxxxxxx



But I get this error when I do



$KAFKA_HOME/bin/connect-standalone.sh etc/connect-standalone.properties
etc/aerospike-sink.properties



[2019-06-14 20:52:24,585] INFO ConnectorConfig values:

        aerospike.password = [hidden]

        aerospike.username = trading_user_RW

        cluster.hosts = rhes75:3000,rhes76:3000

        feature_key.path = /etc/aerospike/features.conf

        policy.record_exists_action = replace

        topics = [md]

(com.aerospike.kafka.connect.sink.ConnectorConfig:279)

[2019-06-14 20:52:24,586] INFO TopicConfig values:

        bins = ticker

        key_field = key

        namespace = trading

        set = MARKETDATAAEROSPIKEBATCH

        set_field = null

(com.aerospike.kafka.connect.sink.TopicConfig:279)

[2019-06-14 20:52:24,723] INFO Connected to Aerospike cluster at rhes75
3000 (com.aerospike.kafka.connect.sink.AsyncWriter:73)

[2019-06-14 20:52:24,724] INFO WorkerSinkTask{id=aerospike-sink-0} Sink
task finished initialization and start
(org.apache.kafka.connect.runtime.WorkerSinkTask:282)

[2019-06-14 20:52:24,730] INFO Cluster ID: gPy8TvkhSsCijj3A8m2kzw
(org.apache.kafka.clients.Metadata:265)

[2019-06-14 20:52:24,730] INFO [Consumer clientId=consumer-1,
groupId=connect-aerospike-sink] Discovered group coordinator rhes564:9093
(id: 2147483642 rack: null)
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:605)

[2019-06-14 20:52:24,731] INFO [Consumer clientId=consumer-1,
groupId=connect-aerospike-sink] Revoking previously assigned partitions []
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:411)

[2019-06-14 20:52:24,732] INFO [Consumer clientId=consumer-1,
groupId=connect-aerospike-sink] (Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:442)

[2019-06-14 20:52:27,742] INFO [Consumer clientId=consumer-1,
groupId=connect-aerospike-sink] Successfully joined group with generation 3
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:409)

[2019-06-14 20:52:27,743] INFO [Consumer clientId=consumer-1,
groupId=connect-aerospike-sink] Setting newly assigned partitions [md-8,
md-7, md-6, md-5, md-4, md-3, md-2, md-1, md-0]
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)

[2019-06-14 20:52:27,753] INFO [Consumer clientId=consumer-1,
groupId=connect-aerospike-sink] Resetting offset for partition md-3 to
offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)

[2019-06-14 20:52:27,753] INFO [Consumer clientId=consumer-1,
groupId=connect-aerospike-sink] Resetting offset for partition md-4 to
offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)

[2019-06-14 20:52:27,753] INFO [Consumer clientId=consumer-1,
groupId=connect-aerospike-sink] Resetting offset for partition md-5 to
offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)

[2019-06-14 20:52:27,753] INFO [Consumer clientId=consumer-1,
groupId=connect-aerospike-sink] Resetting offset for partition md-2 to
offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)

[2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1,
groupId=connect-aerospike-sink] Resetting offset for partition md-6 to
offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)

[2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1,
groupId=connect-aerospike-sink] Resetting offset for partition md-7 to
offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)

[2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1,
groupId=connect-aerospike-sink] Resetting offset for partition md-8 to
offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)

[2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1,
groupId=connect-aerospike-sink] Resetting offset for partition md-0 to
offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)

[2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1,
groupId=connect-aerospike-sink] Resetting offset for partition md-1 to
offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)

[2019-06-14 20:52:27,774] ERROR WorkerSinkTask{id=aerospike-sink-0} Task
threw an uncaught and unrecoverable exception. Task is being killed and
will not recover until manually restarted.
(org.apache.kafka.connect.runtime.WorkerSinkTask:544)

*org.apache.kafka.connect.errors.DataException: No mapper for records of
type STRING*

        at
com.aerospike.kafka.connect.data.RecordMapperFactory.createMapper(RecordMapperFactory.java:58)

        at
com.aerospike.kafka.connect.data.RecordMapperFactory.getMapper(RecordMapperFactory.java:45)

        at
com.aerospike.kafka.connect.sink.AerospikeSinkTask.put(AerospikeSinkTask.java:58)

        at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)

        at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)

        at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)

        at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)

        at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)

        at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)

        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)



I have run out of ideas what is wrong here. Appreciate any hint please.



Regards,



Mich



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message