pulsar-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Pulsar Slack" <apache.pulsar.sl...@gmail.com>
Subject Slack digest for #general - 2018-12-17
Date Mon, 17 Dec 2018 09:11:04 GMT
2018-12-16 09:54:52 UTC - zero.xu: @zero.xu has joined the channel
----
2018-12-16 15:02:25 UTC - zero.xu: after read the code, I found the msg will be add async
inot ledger in PersistentTopic impl, but how the consumer know the new msg coming? I can't
find any code about this, ashamed about my terrible ability. but in NonPersistentTopic impl,
the msg just go through subscriptions-&gt; subscription -&gt; dispatcher-&gt;
consumer-&gt;channel. who call show me the related code in PersistentTopic?
----
2018-12-16 15:25:38 UTC - jia zhai: Hi @zero.xu Consumer will send a “Flow” command to
broker, broker will handle this command, and push data to Consumer. Please take a look at
`handleFlow` command in `ServerCnx.java`, and `handleMessage` in ClientCnx.java
----
2018-12-16 15:26:55 UTC - zero.xu: thx!
----
2018-12-16 15:28:31 UTC - jia zhai: welcome
----
2018-12-16 15:29:19 UTC - Sijie Guo: @zero.xu: to add on what @jia zhai explained - the consumer
is “reading”/“waiting for” entries, while the producer is writing the entries and
on successfully appending entries, it will add the entries to ManagedLedger entries cache,
which will then notify the consumers waiting for entries.
----
2018-12-16 15:52:14 UTC - zero.xu: I did not found any code aboud the notify action, can u
show related code?
----
2018-12-16 15:53:07 UTC - zero.xu: I review the code: PersistentTopic ManagedLedgerImpl PersistentSubscription,
can not found any notify action
----
2018-12-16 15:53:52 UTC - zero.xu: @Sijie Guo
----
2018-12-16 16:03:51 UTC - Matteo Merli: @zero.xu Take a look at <https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L413>
----
2018-12-16 16:04:21 UTC - Sijie Guo: @zero.xu check ManagedCursorImpl.asyncReadEntriesOrWait
----
2018-12-16 16:05:35 UTC - Matteo Merli: `cursor.asyncReadEntriesOrWait()` is the call that
will register to get the next batch of messages. if the cursor is at the end of topic, it
will asynchronosly wait until messages are available
+1 : zero.xu
----
2018-12-16 16:05:35 UTC - Sijie Guo: if a cursor is caught up, it will be added to the managed
ledger’s `waitingCursors` list. when new entries appened, the cursors will be waken up to
read the actual emtries from the cache.
----
2018-12-16 16:06:07 UTC - Sijie Guo: yeah @Matteo Merli is faster than me
+1 : zero.xu
----
2018-12-16 16:06:12 UTC - Matteo Merli: :slightly_smiling_face:
----
2018-12-17 00:26:41 UTC - jia zhai: :+1:
----
2018-12-17 01:13:01 UTC - zero.xu: @Matteo Merli @Sijie Guo after read the code carefully,
I found the clue: PersistentTopic.publishMessage -&gt; ManagedLedgerImpl.asyncAddEntry
-&gt; OpAddEntry.initiate -&gt; LedgerHandle.asyncAddEntry -&gt; OpAddEntry.addComplete+safeRun
-&gt; ManagedLedgerImpl.notifyCursors -&gt; ManagedCursorImpl.notifyEntriesAvailable
-&gt; ManagedLedgerImpl.asyncReadEntries -&gt; EntryCacheImpl.asyncReadEntry
----
2018-12-17 01:14:12 UTC - Matteo Merli: Yes, there’s is an optimization there to avoid contention
between the threads managing the publishers and consumers
----
2018-12-17 01:14:43 UTC - Matteo Merli: With a retry logic to avoid the notification when
the throughput is high enough
----
2018-12-17 07:07:20 UTC - linxin: @linxin has joined the channel
----
2018-12-17 07:26:17 UTC - linxin: I have read the official documentation on the Schema Registry
and read the source code. I found that the Pulsar Broker only performs the Schema checking
when the Producers and Consumers first connect to the Broker. The actual write process does
not verify. What happens if the schema is deleted after the producer connects to the broker?
----
2018-12-17 07:28:39 UTC - linxin: I have read the official documentation on the Schema Registry
and read the source code. I found that the Pulsar Broker only performs the Schema checking
when the Producers and Consumers first connect to the Broker. The actual write process does
not verify. What happens if the schema is deleted after the producer connects to the broker?
@Sijie Guo
----
2018-12-17 07:42:39 UTC - Matteo Merli: You mean the schema gets created or deleted after
the producer creation?
----
2018-12-17 07:44:12 UTC - linxin: @Matteo Merli Yes
----
2018-12-17 07:51:58 UTC - Matteo Merli: In case of deletion, I think we’re not taking any
action at the moment 
----
2018-12-17 07:53:47 UTC - Matteo Merli: For creation, I believe that the topic should have
been created with type bytes already, therefore it should not let you to set the schema at
this point since it will be for a different schema type
----
2018-12-17 07:54:16 UTC - Matteo Merli: Just going from memory, I don’t have the code in
front 
----
2018-12-17 08:10:23 UTC - linxin: @Matteo Merli I understand what you mean, thanks. And i
have another question, why Pulsar schema is topic dimension, unlike Kafka, each message gets
a schema id?
----
2018-12-17 08:14:46 UTC - linxin: What happens if the schema makes incompatible changes (such
as delete a Topic schema and then re-creating one) and then register a new consumer with new
schema trying to consume the old message with old schema?
----
2018-12-17 08:19:42 UTC - linxin: 1. Topic#setSchema(SchemaA); 2. send a messageA under SchemaA;
3. delete SchemaA and Topic#setSchema(SchemaB); 4. Consumer#subscribeTopicWith(SchemaB); How
the consumer handles the messageA? My English is not very good. Can you understand what I
mean?
----
Mime
View raw message