pulsar-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Pulsar Slack" <apache.pulsar.sl...@gmail.com>
Subject Slack digest for #general - 2019-04-09
Date Tue, 09 Apr 2019 09:11:03 GMT
2019-04-08 13:24:50 UTC - Mark Marijnissen: How do I enable websockets on the Pulsar Proxy?
2019-04-08 16:00:01 UTC - Gene Fojtik: @Gene Fojtik has joined the channel
2019-04-08 16:11:46 UTC - Devin G. Bost: I'm looking for ideas of how to create graceful continuous
deployments at-scale (with downtime &lt; 300 ms). I'm considering deploying to a new tenant,
renaming the old tenant, and then renaming the new tenant to the old tenant name. However,
I'm not sure if that will allow me to gracefully roll over existing consumers and producers.
Are there any thoughts on this approach or suggestions of how to create a better continuous
deployment process?
2019-04-08 16:16:05 UTC - Devin G. Bost: We also can't have data loss, so that's another issue
that must be considered.
2019-04-08 16:22:49 UTC - Kenan Dalley: Hi.  I've run into an issue with Pulsar Functions
and the Jackson JSON library.  My function is defined as Function&lt;MyModel,MyModel&gt;,
where MyModel has a String id, FruitEnum fruitType (Apple, Orange, etc) and a long stateCounter.
 My function fills in the id &amp; counter and my Producer sends in the fruitType.

When I run with my own Producer, which sends out a MyModel, it runs fine.  But when I attempted
to use "bin/pulsar-client functions produce -m "{fruitType=\"Orange\"}", Jackson blew up the
function with the message "JsonParseException: Unexpected character ('f' (code 102)): was
expecting double-quote to start field name."

I understand what Jackson wants here, but the underlying issue is that, because of this error
and that the offset wasn't committed, my function was pushed into an infinite loop of restarts
on the server because it kept trying to read that message over and over when it was restarted.

3 questions
1. Is there a way to configure Jackson to be looser with it's parsing for Functions?
2. Is there a way to manually, or automatically, change the commit the f(n) is looking at
on the fly?
3. Is there a way to intercept the message prior to the Jackson call to do message type validation
so that this can be prevented?

This situation is definitely not good for an Enterprise-level application.
2019-04-08 16:42:08 UTC - David Kjerrumgaard: @Kenan Dalley For #1, you can configure the
Jackson parser to allow unquoted field names. `mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES,
2019-04-08 16:46:51 UTC - David Kjerrumgaard: @Kenan Dalley for #2, you can add the following
property to the function deployment which will cause the messages to be acknowledged automatically
upon processing, rather than requiring you to perform the ack in the code.  `"autoAck": true,`
2019-04-08 17:05:04 UTC - Kenan Dalley: Adding "--auto-ack true" to the function's create/update
statement didn't fix anything.  It's still going into an infinite loop.
2019-04-08 17:12:15 UTC - Matteo Merli: @Mark Marijnissen Currently the websocket service
is not integrated in the Pulsar proxy. You have to start it on its own with `pulsar websocket`
2019-04-08 17:13:15 UTC - Matteo Merli: @Devin G. Bost You mean Pulsar deployments or your
application deployments?
2019-04-08 17:21:44 UTC - Kenan Dalley: @David Kjerrumgaard Where would I go to configure
the mapper for either that function or all functions?  Since the function class itself is
after the conversion, that would be too late and I don't know anywhere else that makes sense.
2019-04-08 17:22:38 UTC - Mandi Goddard: @Jon Bock I am trying to find some way to communicate
with students who don't read their emails. Any suggestions?
2019-04-08 17:34:17 UTC - Devin G. Bost: @Matteo Merli
Great question. Thanks for asking. I mean application deployments (e.g. a set of Pulsar functions,
sinks, sources, topics, and namespaces).
2019-04-08 17:37:29 UTC - Guy Feldman: @Mandi Goddard sounds like more of a job for an email
sending service like Mailjet or easysendy
2019-04-08 17:37:31 UTC - Matteo Merli: Ok, in any case (even if it were for a Pulsar deployement)
there would be no downtime or data loss.

For functions, you will issue an “update” of a running function and that will trigger
a rolling restart of the instances. Across multiple instances there will still be some instance

Regarding data loss, functions use a consumer which has a subscription associated. If the
function doesn’t process the messages, the messages won’t be acknowledged and therefore
the broker will keep them.
2019-04-08 17:40:14 UTC - Devin G. Bost: Thanks for the guidance.
2019-04-08 17:43:16 UTC - Sanjeev Kulkarni: @Kenan Dalley There is a maxMessageRetries parameter
in the FunctionConfig. By default its -1(which is forever), but you can send in a finite value
which should attempt the framework send the message at max those many times before giving
2019-04-08 17:43:40 UTC - Devin G. Bost: What would be involved if I wanted to create something
like an "upsert" operation that would create a component (if it doesn't exist) or update a
component (if it does)?
2019-04-08 17:48:35 UTC - Kenan Dalley: @Sanjeev Kulkarni I tried that too (set to 1) and
I tried the dead-letter queue setup as well and nothing has worked.  It continues to fail
the function, no ack and restarts the function an infinite number of times.  It's as though
all of these other config settings are activated after the failure occurs and are made irrelevant.
2019-04-08 17:50:01 UTC - Sanjeev Kulkarni: when/where is the error occuring?
2019-04-08 17:50:04 UTC - Sanjeev Kulkarni: is there a stack trace
2019-04-08 17:56:36 UTC - Matteo Merli: Not sure :slightly_smiling_face:
2019-04-08 17:57:05 UTC - Matteo Merli: You can try “functions create” and if it fails
fallback to update.. I guess it’s not ideal..
2019-04-08 17:58:33 UTC - Jerry Peng: i see
2019-04-08 17:58:50 UTC - Kenan Dalley: Not easy on my phone. :slightly_smiling_face:

Here's what the pulsar function log has:

17:01:23.343 [pulsar-client-io-1-1] INFO  com.scurrilous.circe.checksum.Crc32cIntChecksum
- SSE4.2 CRC32C provider initialized
17:01:23.413 [public/default/FnFruit-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable
- [public/default/FnFruit:0] Uncaught exception in Java Instance
org.apache.pulsar.client.api.SchemaSerializationException: org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.JsonParseException:
Unexpected character ('f' (code 102)): was expecting double-quote to start field name
 at [Source: (byte[])"{fruitType="Pear"}"; line: 1, column: 3]
        at org.apache.pulsar.client.impl.schema.JSONSchema.decode(JSONSchema.java:84) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:233) ~[java-instance.jar:2.3.0]
        at org.apache.pulsar.functions.source.PulsarRecord.getValue(PulsarRecord.java:74)
        at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:458)
        at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:243)
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.JsonParseException:
Unexpected character ('f' (code 102)): was expecting double-quote to start field name
 at [Source: (byte[])"{fruitType="Pear"}"; line: 1, column: 3]
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:669)
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:567)
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:1988)
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1639)
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:725)
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:151)
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013)
        at org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
        at org.apache.pulsar.client.impl.schema.JSONSchema.decode(JSONSchema.java:82) ~[java-instance.jar:2.3.0]
        ... 5 more
17:01:23.420 [public/default/FnFruit-0] INFO  org.apache.pulsar.functions.instance.JavaInstanceRunnable
- Closing instance
2019-04-08 17:58:56 UTC - Jerry Peng: and interesting logs printed in the broker log related
to that topic?
2019-04-08 17:59:22 UTC - Jerry Peng: partitioned topic or not partitioned topic?
2019-04-08 18:19:28 UTC - Sanjeev Kulkarni: so this is the pulsar’s schema trying to intepret
the json and failing. For now, can you interprit the messages as bytes[] and do the conversion
2019-04-08 18:19:53 UTC - Sanjeev Kulkarni: essentially your function signature will change
to &lt;byte[], MyModel&gt;
2019-04-08 18:20:15 UTC - Sanjeev Kulkarni: and inside your function, you can then try to
interpret loosely the bytes to json object and then do the manipulation
2019-04-08 18:24:57 UTC - Kenan Dalley: That seems like a valid approach, but definitely not
ideal.   I'll try it out.
2019-04-08 18:43:31 UTC - Sanjeev Kulkarni: @Kenan Dalley agreed. <https://github.com/apache/pulsar/pull/4004>
should fix the behavior for next release
2019-04-08 18:49:24 UTC - Devin G. Bost: I'm adding an `upsert` method to Pulsar-Admin to
enable conditional creation/update (where it creates a component if it doesn't exist and updates
the component if it does exist). This method will make rolling deployments easier because
we can just call this single method on every component in our project tree during our deployments.

In my fork, I added `upsertFunction` to
and added upsert methods to:
`pulsar-broker/src/main/java/org.apache.pulsar/broker/admin/impl/FunctionsBase.java`, `SinkBase.java`,
and `SourceBase.java`.

Adding upsert to NamespacesBase and TenantsBase looks more involved, so I haven't done that

Are there any other points that I need to include before I submit a PR with these changes?
2019-04-08 18:50:02 UTC - Devin G. Bost: At some point, I’d also like to implement a bulk-upsert
method where I can pass in a manifest file and trigger upserts in parallel for everything
in the manifest file.
2019-04-08 18:50:16 UTC - Devin G. Bost: The purpose of these changes is to simplify (and
speed up) continuous deployment.
+1 : David Kjerrumgaard, Karthik Ramasamy
2019-04-08 19:08:06 UTC - John Crawford: non-partitioned
2019-04-08 19:08:10 UTC - John Crawford: seeing this log again:
2019-04-08 19:08:36 UTC - John Crawford: ```
19:02:46.150 [bookkeeper-ml-scheduler-OrderedScheduler-1-0] WARN  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl
- [02efa741-a96f-4124-a463-ae13a704b8fc/pXeyflmLEUEnpjUa5radVPTTj44PSG1P/persistent/raw-pallets]-4372
read entry timeout for 0-0 after 120 sec
19:02:46.151 [bookkeeper-ml-scheduler-OrderedScheduler-1-0] WARN  org.apache.bookkeeper.mledger.impl.OpReadEntry
- [02efa741-a96f-4124-a463-ae13a704b8fc/pXeyflmLEUEnpjUa5radVPTTj44PSG1P/persistent/raw-pallets][etl-production]
read failed from ledger at position:4372:0 : Bookie operation timeout
19:02:46.151 [bookkeeper-ml-scheduler-OrderedScheduler-1-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
- [<persistent://02efa741-a96f-4124-a463-ae13a704b8fc/pXeyflmLEUEnpjUa5radVPTTj44PSG1P/raw-pallets>
/ etl-production] Error reading entries at 4372:0 : Bookie operation timeout, Read Type Normal
- Retrying to read in 58.832 seconds
2019-04-08 19:08:37 UTC - Devin G. Bost: Alternatively, if there are objections to the `upsert`
approach, we could still build a tree of components from a manifest file with a `bulkDeploy`
method, compute differences between the manifest tree and the tree of components currently
existing in Pulsar, and then conditionally call create, update, and/or delete methods to bring
Pulsar in-sync with the manifest file.

Feedback on preferences would be helpful.
2019-04-08 19:09:59 UTC - John Crawford: the further down the log (58 seconds later):
2019-04-08 19:10:07 UTC - John Crawford: ```
19:03:44.983 [pulsar-io-23-3] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
- [<persistent://02efa741-a96f-4124-a463-ae13a704b8fc/pXeyflmLEUEnpjUa5radVPTTj44PSG1P/raw-pallets>
/ etl-production] Retrying read operation
19:03:44.983 [pulsar-io-23-3] ERROR org.apache.bookkeeper.common.util.SafeRunnable - Unexpected
throwable caught 
java.lang.ArrayIndexOutOfBoundsException: null
2019-04-08 19:10:34 UTC - John Crawford: don’t see anything but GC logs in bookkeeper logs
2019-04-08 19:14:04 UTC - Grant Wu: @Kenan Dalley Why can’t you quote your field names?
2019-04-08 19:14:24 UTC - Grant Wu: Where is the input coming from that is so blatantly non-compliant
with the spec
2019-04-08 19:14:46 UTC - Grant Wu: I was considering using the JSON schema in the future,
I would prefer for this to either be tunable or not changed
2019-04-08 19:16:04 UTC - Grant Wu: if we allow non-quoted field names, we immediately break
the Python JSON parser as well:
&gt;&gt;&gt; import json
&gt;&gt;&gt; json.loads('{foo: "bar"}')
Traceback (most recent call last):
  File "&lt;stdin&gt;", line 1, in &lt;module&gt;
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/__init__.py",
line 348, in loads
    return _default_decoder.decode(s)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/decoder.py",
line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/decoder.py",
line 353, in raw_decode
    obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column
2 (char 1)
2019-04-08 19:22:13 UTC - Grant Wu: To clarify my objection: if we allow this, then consumers
assuming that the input they get is valid JSON will all now need to be able to handle this
non-compliant JSON.  So, for example, the standard JSON parsing library in Python would no
longer suffice; we would need to pull in a third party dependency.  <https://stackoverflow.com/questions/39491420/python-jsonexpecting-property-name-enclosed-in-double-quotes>
2019-04-08 19:23:42 UTC - Kenan Dalley: @Grant Wu I'd prefer it to be tunable because  I cannot
guarantee that the data that comes in will be 100% compliant with the spec.

Also, I personally don't agree that field names not being surrounded by double quotes is something
that should completely fail the object mapper since it's a very widely used pattern.  Otherwise,
it wouldn't be an option to override.
2019-04-08 19:28:58 UTC - Grant Wu: We should probably move this discussion to the PR
+1 : Devin G. Bost
2019-04-08 19:30:06 UTC - Ryan Samo: Hey guys,
When using Pulsar SQL (presto) I do not see an obvious way to pass in the admin certificates.
Since we are using TLS, do you have any idea what config the tlsCert, tlsKey, and rootCA would
reside for presto?
2019-04-08 19:30:36 UTC - Ryan Samo: Get 401 unauthorized if we don’t provide the certs
2019-04-08 19:32:40 UTC - Grant Wu: I put a comment there
2019-04-08 19:44:05 UTC - Grant Wu: I’m not sure what the best approach is here, but here
are my thoughts:
1. This would be welcome, my deployment script for Pulsar Functions currently is quite ugly,
it greps the output of pulsar-admin functions get.
2. A manifest approach kind of feels somewhat heavyweight, and it might overlap with native
k8s tooling (for those of us running Pulsar in k8s).  Pulsar Functions are the exception here
because they’re not materialized as k8s resources anywhere (possible future avenue of work)
3. Possible alternative name to “upsert” - “put” - better matches the HTTP verbs that
pulsar-admin already uses
2019-04-08 19:44:16 UTC - Kenan Dalley: Dittos.   :blush:
2019-04-08 20:46:50 UTC - Devin G. Bost: Thanks for the feedback.
2019-04-08 20:51:34 UTC - Devin G. Bost: If I were to create a PR, how long do you think it
would take before it would end up in a release?
2019-04-08 20:52:47 UTC - Grant Wu: I don’t know exactly - not a maintainer - but <https://github.com/apache/pulsar/releases>
can give you some sense of how frequently releases happen
2019-04-08 21:50:39 UTC - Devin G. Bost: Is there going to be a Pulsar conference?
2019-04-08 21:52:26 UTC - Sanjeev Kulkarni: Hopefully soon
+1 : Devin G. Bost, Ali Ahmed, Ezequiel Lovelle, Karthik Ramasamy, Yuvaraj Loganathan, Shivji
Kumar Jha
bananadance : Yuvaraj Loganathan
2019-04-08 23:34:53 UTC - Devin G. Bost: Regarding the upsert function, I made the changes
in my fork and got the build to succeed. I think I updated all of the touch-points correctly
(though I may be missing something). When I try to call it from the admin API, I get null
output with:

`Reason: javax.ws.rs.ProcessingException: Java heap space`

I tried updating my pulsar_env.sh line to:
`PULSAR_MEM=${PULSAR_MEM:-"-Xms2g -Xmx8g -XX:MaxDirectMemorySize=12g"}`
but I got the same issue. Any ideas?
2019-04-08 23:45:59 UTC - Ali Ahmed: @Devin G. Bost you have to change pulsar_tools_env.sh
2019-04-08 23:46:18 UTC - Devin G. Bost: Oh, thanks! I'll try that!
2019-04-08 23:52:38 UTC - Sijie Guo: @young sorry that I forgot to reply to you yesterday.
from the exception, it seems that you are using both pulsar-client and pulsar-client-admin?
what dependencies do you include for your java program?
2019-04-08 23:54:24 UTC - Devin G. Bost: It worked! Thanks!
2019-04-09 02:00:31 UTC - young: @Sijie Guo thanks for reply. I only import org.apache.pulsar.client.api.Producer
 and org.apache.pulsar.client.api.PulsarClientException , no pulsar-client-admin.
2019-04-09 02:07:19 UTC - Sijie Guo: are you using pulsar-client or pulsar-client-original?
what version are you using?
2019-04-09 02:08:04 UTC - young: further more,when I run a pulsar cluster by standalone on
my own machine, the java code pass  the test. but the same code running on the server machine
will report exception.
2019-04-09 02:10:45 UTC - young: @Sijie Guo the pulsar-client, version is pulsar-client-api-2.3.0.jar.
2019-04-09 02:11:40 UTC - Sijie Guo: are you using maven or gradle to build the java program?
2019-04-09 02:14:31 UTC - young: both not yet, just a simple test.
2019-04-09 02:17:03 UTC - Sijie Guo: it seems that you are invoking the test from eclipse
and the exception seems to show it failed to load some default configuration from async http
2019-04-09 02:17:19 UTC - Sijie Guo: I am not sure how your test is running in eclipse.
2019-04-09 02:17:29 UTC - Sijie Guo: can you try to use pulsar-client-original dependency?
2019-04-09 02:22:15 UTC - young: Running the test in eclipse  connect to my own machine that
deploy the standalone pulsar is ok, but I package to a jar file and exec  the command" java
-jar  MessageProducer.jar " by command line  on the server machine is failed.
2019-04-09 02:23:27 UTC - Sijie Guo: &gt; but I package to a jar file and exec  the command”
java -jar  MessageProducer.jar ” by command line  on the server machine is failed.

how do you package the jar file?
2019-04-09 02:25:59 UTC - young: eclipse file menu ---export ---runnable jar file---Package
required libraries into generated JAR...
2019-04-09 02:27:44 UTC - Sijie Guo: okay I am not sure how eclipse do that. but asynchttpclient
has some resources files which should be packaged into the JAR, otherwise the asynchttpclient
will fail to load those resources files.
2019-04-09 02:33:44 UTC - young: oh, and then,any normal style of deploying java code to server
machine for your recommendations?
2019-04-09 02:36:30 UTC - Sijie Guo: can you try using pulsar-client-original?
2019-04-09 02:41:01 UTC - young: thanks, I will try it.
2019-04-09 07:02:32 UTC - Mark Marijnissen: Thanks for the info. :slightly_smiling_face:
2019-04-09 07:04:56 UTC - Mark Marijnissen: Do Pulsar clients need to be able to connect to
a specific broker, or will any broker do? The docs say they are stateless, so I expect any
broker to work.
(Follow up question: If any broker will do, why do is the Pulsar proxy enabled in a Kubernetes

(Context: I want to use nodejs to send messages. The node client with the c++ lib doesn't
compile, so I want to use the websocket API as it's a low-volume use case. But the websocket
API is not supported on the proxy)
2019-04-09 07:10:59 UTC - Ali Ahmed: @Mark Marijnissen connecting to any broker is fine the
proxy is a transparent tcp proxy it’s there for convenience in certain kind of deployments.
nodejs should start to stabilize it’s in in early stages of developmet, you can open issues
if you are unable to compile.
the websocket is supported by the proxy if it’s having problems please open a issue in github.
2019-04-09 07:23:55 UTC - Matteo Merli: &gt; Do Pulsar clients need to be able to connect
to a specific broker, or will any broker do? The docs say they are stateless, so I expect
any broker to work.

A client can connect initially to any broker, though it needs to be able to connect directly
to all the brokers.

If direct connection is not possible (or desirable), then the proxy is a possible solution.
2019-04-09 08:50:33 UTC - Mark Marijnissen: A, that is useful to know.

In a kubernetes deployment, will the client receive an IP address of a pod? Or will it receive
the service-IP adress, thus obscuring the actual pod (i.e. broker) that is there?
View raw message