flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chet Masterson <chet.master...@yandex.com>
Subject Re: Queryable State
Date Thu, 04 May 2017 18:05:29 GMT
<div>I found the issue. When parallelism = 3, my test data set was skewed such that data
was only going to two of the three task managers (kafka partition = 3, number of flink nodes
= 3, parallelism = 3). As soon as I created a test data set with enough keys that spread across
all three task managers, queryable state started working as expected. That is why only two
KVStates were registered with the job manager, instead of three.</div><div> </div><div>my
FINAL :-) question....should I be getting org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation
in the event only N-1 task managers have data in a parallelism of N situation?</div><div> </div><div>Thanks
for all the help!</div><div> </div><div> </div><div>04.05.2017,
11:24, "Ufuk Celebi" &lt;uce@apache.org&gt;:</div><blockquote type="cite"><p>Could
you try KvStateRegistry#registerKvState please?<br /><br />In the JM logs you
should see something about the number of connected<br />task managers and in the task
manager logs that each one connects to a<br />JM.<br /><br />– Ufuk<br
/><br /><br />On Tue, May 2, 2017 at 2:53 PM, Chet Masterson<br />&lt;<a
href="mailto:chet.masterson@yandex.com">chet.masterson@yandex.com</a>&gt; wrote:</p><blockquote> Can
do. Any advice on where the trace prints should go in the task manager<br /> source
code?<br /><br /> BTW - How do I know I have a correctly configured cluster?
Is there a set of<br /> messages in the job / task manager logs that indicate all required<br
/> connectivity is present? I know I use the UI to make sure all the task<br /> managers
are present, and that the job is running on all of them, but is<br /> there some verbiage
in the logs that indicates the job manager can talk to<br /> all the task managers,
and vice versa?<br /><br /> Thanks!<br /><br /><br /> 02.05.2017,
06:03, "Ufuk Celebi" &lt;<a href="mailto:uce@apache.org">uce@apache.org</a>&gt;:<br
/><br /> Hey Chet! I'm wondering why you are only seeing 2 registration<br /> messages
for 3 task managers. Unfortunately, there is no log message<br /> at the task managers
when they send out the notification. Is it<br /> possible for you to run a remote debugger
with the task managers or<br /> build a custom Flink version with the appropriate log
messages on the<br /> task manager side?<br /> – Ufuk<br /><br /><br
/> On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson<br /> &lt;<a href="mailto:chet.masterson@yandex.com">chet.masterson@yandex.com</a>&gt;
wrote:<br /><br /><br /><br />  Any insight here? I've got a situation
where a key value state on a task<br />  manager is being registered with the job
manager, but when I try to query<br />  it, the job manager responds it doesn't know
the location of the key value<br />  state...<br /><br /><br />  26.04.2017,
12:11, "Chet Masterson" &lt;<a href="mailto:chet.masterson@yandex.com">chet.masterson@yandex.com</a>&gt;:<br
/><br />  After setting the logging to DEBUG on the job manager, I learned four<br
/>  things:<br /><br />  (On the message formatting below, I have the Flink
logs formatted into JSON<br />  so I can import them into Kibana)<br /><br
/>  1. The appropriate key value state is registered in both parallelism = 1<br /> and<br
/>  parallelism = 3 environments. In parallelism = 1, I saw one registration<br />  message
in the log, in the parallelism = 3, I saw two registration<br /> messages:<br />  {"level":"DEBUG","time":"<span>2017-04-26</span><br
/><br /> 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"",<br
/>  "msg":"Key value state registered for job &lt;job id&gt; under name &lt;statename&gt;"}<br
/><br />  2. When I issued the query in both parallelism = 1 and parallelism =
3<br />  environments, I saw "Lookup key-value state for job &lt;job id&gt;
with<br />  registration name &lt;statename&gt;". In parallelism = 1, I saw
1 log message, in<br />  parallelism = 3, I saw two identical messages.<br /><br
/>  3. I saw no other messages in the job manager log that seemed relevant.<br /><br
/>  4. When issuing the query in parallelism = 3, I continued to get the error:<br
/>  org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a<br /> message<br
/>  of null.<br /><br />  Thanks!<br /><br /><br /><br
/><br /><br />  26.04.2017, 09:52, "Ufuk Celebi" &lt;<a href="mailto:uce@apache.org">uce@apache.org</a>&gt;:<br
/><br />  Thanks! Your config looks good to me.<br /><br />  Could
you please set the log level org.apache.flink.runtime.jobmanager to<br />  DEBUG?<br
/><br />  log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG<br /><br
/>  Then we can check whether the JobManager logs the registration of the<br />  state
instance with the respective name in the case of parallelism &gt;<br />  1?<br
/><br />  Expected output is something like this: "Key value state registered<br
/>  for job ${msg.getJobId} under name ${msg.getRegistrationName}."<br /><br
/>  – Ufuk<br /><br />  On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson<br
/>  &lt;<a href="mailto:chet.masterson@yandex.com">chet.masterson@yandex.com</a>&gt;
wrote:<br /><br />   Ok...more information.<br /><br />   1.
Built a fresh cluster from the ground up. Started testing queryable<br />  state<br
/>   at each step.<br />   2. When running under any configuration of task
managers and job managers<br />   were parallelism = 1, the queries execute as expected.<br
/>   3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job<br
/>   manager) feeding off a kafka topic partitioned three ways, queries will<br />   always
fail, returning error<br />   (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation)
with an<br />   error message of null.<br />   4. I do know my state is
as expected on the cluster. Liberal use of trace<br />   prints show my state managed
on the jobs is as I expect. However, I cannot<br />   query them external.<br
/>   5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed<br
/>   is configured by using the job manager UI.<br />   6. My flink-conf.yaml:<br
/><br />   jobmanager.rpc.address: flink01<br />   jobmanager.rpc.port:
6123<br />   jobmanager.heap.mb: 256<br /><br />   taskmanager.heap.mb:
512<br />   taskmanager.data.port: 6121<br />   taskmanager.numberOfTaskSlots:
1<br />   taskmanager.memory.preallocate: false<br /><br />   parallelism.default:
1<br />   blob.server.port: 6130<br /><br />   jobmanager.web.port:
8081<br />   query.server.enable: true<br /><br />   7. I do know
my job is indeed running in parallel, from trace prints going<br />   to the task
manager logs.<br /><br />   Do I need a backend configured when running in
parallel for the queryable<br />   state? Do I need a shared temp directory on the
task managers?<br /><br />   THANKS!<br /><br /><br />   25.04.2017,
04:24, "Ufuk Celebi" &lt;<a href="mailto:uce@apache.org">uce@apache.org</a>&gt;:<br
/><br />   It's strange that the rpc port is set to 30000 when you use a<br
/>   standalone cluster and configure 6123 as the port. I'm pretty sure<br />   that
the config has not been updated.<br /><br />   But everything should work as
you say when you point it to the correct<br />   jobmanager address and port. Could
you please post the complete<br />   stacktrace you get instead of the message you
log?<br /><br /><br />   On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson<br
/>   &lt;<a href="mailto:chet.masterson@yandex.com">chet.masterson@yandex.com</a>&gt;
wrote:<br /><br /><br /><br />    More information:<br /><br
/>    0. I did remove the query.server.port and query.server.enabled from all<br
/>    flink-conf.yaml files, and restarted the cluster.<br /><br />    1.
The Akka error doesn't seem to have anything to do with the problem.<br /> If<br
/>   I<br />    point my query client at an IP address with no Flink server
running at<br />  all,<br />    I get that error. It seems to be a (side
effect?) timeout for "no flink<br />    service is listening on the port you told
me to check"<br /><br />    2. I did notice (using the Flink Web UI) even
with the config file<br /> changes<br />    in 0, and no changes to the default
flink-conf.yaml the<br />  jobmanager.rpc.port<br />    (6123), on my cluster,
jobmanager.rpc.port is set to 30000.<br /><br />    3. If I do send a query
using the jobmanager.rpc.address and the<br />    jobmanager.rpc.port as displayed
in the Flink Web UI, the connection to<br />   from<br />    the client
to Flink will be initiated and completed. When I try to<br /> execute<br />    the
query (code below), it will fail, and will get trapped. When I look<br /> at<br
/>    the error message returned (e.getMessage() below), it is simply 'null':<br
/><br />    try {<br />          byte[] serializedResult = Await.result(future,
new<br />    FiniteDuration(maxQueryTime, TimeUnit.SECONDS));<br />          //
de-serialize, commented out for testing<br />          return null;<br
/>            }<br />            catch (Exception e) {<br
/>                logger.error("Queryable State Error:<br />    "+key+"-"+flinkJobID+"-"+stateName+"
Error: "+e.getMessage());<br />                return null;<br />            }<br
/><br />    Should I be sending the query to the job manager on the the job manager's<br
/>    rpc port when flink is clustered?<br /><br />    ALSO - I do
know the state name I am trying to query exists, is<br /> populated,<br />    and
the job id exists. I also know the task managers are communicating<br />  with<br
/>    the job managers (task managers data port: 6121) and processed the data<br
/>   that<br />    resulted in the state variable I am trying to query being
populated. All<br />    this was logged.<br /><br /><br />    24.04.2017,
10:34, "Ufuk Celebi" &lt;<a href="mailto:uce@apache.org">uce@apache.org</a>&gt;:<br
/><br />    Hey Chet! You can remove<br /><br />    query.server.port:
6123<br />    query.server.enable: true<br /><br />    That shouldn't
cause the Exception we see here though. I'm actually<br />    not sure what is causing
the PduCodecException. Could this be related<br />    to different Akka versions
being used in Flink and your client code?<br />    [1] Is it possible for you to
check this?<br /><br />    – Ufuk<br /><br />    [1] <a
View raw message