kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Issue facing, Aerospike Connect for Kafka - Inbound
Date Sat, 15 Jun 2019 22:24:16 GMT
Hi,

I sorted this one out.

It was to do with the format of published prices.

Regards,

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 14 Jun 2019 at 22:52, Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> I generate a simple pricing as follows:
>
>  echo "${UUID}:${TICKER}" >> ${IN_FILE}
>
>  UUID is the key and ticker is the name of security like “IBM”
>
>  This is what I have in my shell script
>
>  cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh \
>
>                        --broker-list
> rhes75:9092,rhes75:9093,rhes75:9094,rhes564:9092,rhes564:9093,rhes564:9094,rhes76:9092,rhes76:9093,rhes76:9094
> \
>
>                        --topic md \
>
>                        --property "parse.key=true" \
>
>                        --property "key.separator=:"
>
> And this is what I get
>
>
>
> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper
> rhes75:2181,rhes564:2181,rhes76:2181 --topic md --property print.key=true
>
>  6af48cd4-c4ce-44a0-adb7-8d980b2b79a0    IBM
>
> 8436ac7f-d2ae-4762-bf4e-98242241290b    IBM
>
> 96bbb149-b11c-440d-a82c-f78e53edda49    IBM
>
> 5fa8682b-d81a-466a-abbd-ca80793405b1    IBM
>
> a998446a-497f-4ac7-a5a7-21a524cfc4da    IBM
>
> 7920affe-6641-48ef-9e08-5a4b9f66da4d    IBM
>
> Now I want to post this data into aerospike
>
> My files are as follows:
>
>
>
> *cat connect-standalone.properties*
>
> # These are defaults. This file just demonstrates how to override some
> settings.
>
> bootstrap.servers=rhes75:9092, rhes564:9092,rhes76:9092
>
> # The converters specify the format of data in Kafka and how to translate
> it
>
> # into Connect data. Every Connect user will need to configure these based
> on
>
> # the format they want their data in when loaded from or stored into Kafka
>
> #
>
> key.converter=org.apache.kafka.connect.storage.StringConverter
>
> value.converter=org.apache.kafka.connect.storage.StringConverter
>
> ##alue.converter=org.apache.kafka.connect.json.JsonConverter
>
> #
>
> # Converter-specific settings can be passed in by prefixing the Converter's
>
> # setting with the converter we want to apply it to
>
> key.converter.schemas.enable=true
>
> value.converter.schemas.enable=false
>
> # The internal converter used for offsets and config data is configurable
> and
>
> # must be specified, but most users will always want to use the built-in
>
> # default. Offset and config data is never visible outside of Kafka
> Connect in
>
> # this format.
>
> ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>
> ##internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>
> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
>
> internal.value.converter=org.apache.kafka.connect.storage.StringConverter
>
> internal.key.converter.schemas.enable=false
>
> internal.value.converter.schemas.enable=false
>
> offset.storage.file.filename=/tmp/connect.offsets
>
> # Flush much faster than normal, which is useful for testing/debugging
>
> offset.flush.interval.ms=10000
>
>
>
> # Set to a list of filesystem paths separated by commas (,) to enable class
>
> # loading isolation for plugins (connectors, converters, transformations).
> The
>
> # list should consist of top level directories that include any
> combination of:
>
> # a) directories immediately containing jars with plugins and their
> dependencies
>
> # b) uber-jars with plugins and their dependencies
>
> # c) directories immediately containing the package directory structure of
>
> # classes of plugins and their dependencies Note: symlinks will be
> followed to
>
> # discover dependencies or plugins.
>
> # Examples:
>
> plugin.path=/opt/aerospike-kafka-connect-sink/share/kafka
>
>
>
> And my sink-properies file is
>
>
>
> *cat aerospike-sink.properties*
>
> ##
>
> # Copyright 2016 Aerospike, Inc.
>
> #
>
> # Portions may be licensed to Aerospike, Inc. under one or more contributor
>
> # license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION
> 2.0.
>
> #
>
> # Licensed under the Apache License, Version 2.0 (the "License"); you may
> not
>
> # use this file except in compliance with the License. You may obtain a
> copy of
>
> # the License at http://www.apache.org/licenses/LICENSE-2.0
>
> #
>
> # Unless required by applicable law or agreed to in writing, software
>
> # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
>
> # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
>
> # License for the specific language governing permissions and limitations
> under
>
> # the License.
>
> ##
>
> name=aerospike-sink
>
> connector.class=com.aerospike.kafka.connect.sink.AerospikeSinkConnector
>
> tasks.max=1
>
> topics=md
>
> cluster.hosts=rhes75:3000,rhes76:3000
>
> policy.record_exists_action=replace
>
> topic.namespace=trading
>
> topic.set=MARKETDATAAEROSPIKEBATCH
>
> feature_key.path=/etc/aerospike/features.conf
>
> topic.key_field=key
>
> topic.bins=ticker
>
> ##topic.bins=ticker:ticker,timeissued:timeissued,price:price
>
> aerospike.username=trading_user_RW
>
> aerospike.password=xxxxxxx
>
>
>
> But I get this error when I do
>
>
>
> $KAFKA_HOME/bin/connect-standalone.sh etc/connect-standalone.properties
> etc/aerospike-sink.properties
>
>
>
> [2019-06-14 20:52:24,585] INFO ConnectorConfig values:
>
>         aerospike.password = [hidden]
>
>         aerospike.username = trading_user_RW
>
>         cluster.hosts = rhes75:3000,rhes76:3000
>
>         feature_key.path = /etc/aerospike/features.conf
>
>         policy.record_exists_action = replace
>
>         topics = [md]
>
> (com.aerospike.kafka.connect.sink.ConnectorConfig:279)
>
> [2019-06-14 20:52:24,586] INFO TopicConfig values:
>
>         bins = ticker
>
>         key_field = key
>
>         namespace = trading
>
>         set = MARKETDATAAEROSPIKEBATCH
>
>         set_field = null
>
> (com.aerospike.kafka.connect.sink.TopicConfig:279)
>
> [2019-06-14 20:52:24,723] INFO Connected to Aerospike cluster at rhes75
> 3000 (com.aerospike.kafka.connect.sink.AsyncWriter:73)
>
> [2019-06-14 20:52:24,724] INFO WorkerSinkTask{id=aerospike-sink-0} Sink
> task finished initialization and start
> (org.apache.kafka.connect.runtime.WorkerSinkTask:282)
>
> [2019-06-14 20:52:24,730] INFO Cluster ID: gPy8TvkhSsCijj3A8m2kzw
> (org.apache.kafka.clients.Metadata:265)
>
> [2019-06-14 20:52:24,730] INFO [Consumer clientId=consumer-1,
> groupId=connect-aerospike-sink] Discovered group coordinator rhes564:9093
> (id: 2147483642 rack: null)
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:605)
>
> [2019-06-14 20:52:24,731] INFO [Consumer clientId=consumer-1,
> groupId=connect-aerospike-sink] Revoking previously assigned partitions []
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:411)
>
> [2019-06-14 20:52:24,732] INFO [Consumer clientId=consumer-1,
> groupId=connect-aerospike-sink] (Re-)joining group
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:442)
>
> [2019-06-14 20:52:27,742] INFO [Consumer clientId=consumer-1,
> groupId=connect-aerospike-sink] Successfully joined group with generation 3
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:409)
>
> [2019-06-14 20:52:27,743] INFO [Consumer clientId=consumer-1,
> groupId=connect-aerospike-sink] Setting newly assigned partitions [md-8,
> md-7, md-6, md-5, md-4, md-3, md-2, md-1, md-0]
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
>
> [2019-06-14 20:52:27,753] INFO [Consumer clientId=consumer-1,
> groupId=connect-aerospike-sink] Resetting offset for partition md-3 to
> offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)
>
> [2019-06-14 20:52:27,753] INFO [Consumer clientId=consumer-1,
> groupId=connect-aerospike-sink] Resetting offset for partition md-4 to
> offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)
>
> [2019-06-14 20:52:27,753] INFO [Consumer clientId=consumer-1,
> groupId=connect-aerospike-sink] Resetting offset for partition md-5 to
> offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)
>
> [2019-06-14 20:52:27,753] INFO [Consumer clientId=consumer-1,
> groupId=connect-aerospike-sink] Resetting offset for partition md-2 to
> offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)
>
> [2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1,
> groupId=connect-aerospike-sink] Resetting offset for partition md-6 to
> offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)
>
> [2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1,
> groupId=connect-aerospike-sink] Resetting offset for partition md-7 to
> offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)
>
> [2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1,
> groupId=connect-aerospike-sink] Resetting offset for partition md-8 to
> offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)
>
> [2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1,
> groupId=connect-aerospike-sink] Resetting offset for partition md-0 to
> offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)
>
> [2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1,
> groupId=connect-aerospike-sink] Resetting offset for partition md-1 to
> offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561)
>
> [2019-06-14 20:52:27,774] ERROR WorkerSinkTask{id=aerospike-sink-0} Task
> threw an uncaught and unrecoverable exception. Task is being killed and
> will not recover until manually restarted.
> (org.apache.kafka.connect.runtime.WorkerSinkTask:544)
>
> *org.apache.kafka.connect.errors.DataException: No mapper for records of
> type STRING*
>
>         at
> com.aerospike.kafka.connect.data.RecordMapperFactory.createMapper(RecordMapperFactory.java:58)
>
>         at
> com.aerospike.kafka.connect.data.RecordMapperFactory.getMapper(RecordMapperFactory.java:45)
>
>         at
> com.aerospike.kafka.connect.sink.AerospikeSinkTask.put(AerospikeSinkTask.java:58)
>
>         at
> org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
>
>         at
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
>
>         at
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>
>         at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>
>         at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>
>         at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>
>
> I have run out of ideas what is wrong here. Appreciate any hint please.
>
>
>
> Regards,
>
>
>
> Mich
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message