tinkerpop-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "selfish finch (JIRA)" <j...@apache.org>
Subject [jira] [Created] (TINKERPOP-2277) Python sdk postpone the timing to create transport
Date Tue, 06 Aug 2019 14:33:00 GMT
selfish finch created TINKERPOP-2277:
----------------------------------------

             Summary: Python sdk postpone the timing to create transport
                 Key: TINKERPOP-2277
                 URL: https://issues.apache.org/jira/browse/TINKERPOP-2277
             Project: TinkerPop
          Issue Type: Improvement
          Components: python
    Affects Versions: 3.4.2
            Reporter: selfish finch


The current timing of transport creation is inside connection.__init__. It's OK in all scenarios
in one process model. 

But if users need to run Gremlin Python SDK inside services like `celery` which use multiple
processes model, this implementation may cause error.

As transport(which represents socket) is created inside Main Processes, it will be inherited
by Child Processes, then threads inside ThreadPoolExecutor will try to read from the same
socket, so it may happen that:
 * Child Process 1 send a request with request id 'xxx' use socket s1
 * Child Process 2 send another request with request id 'yyy' also use socket s1, then able
to read the response of request id 'xxx', but unable to find 'xxx' inside '_results'

 If this happens, it will cause KeyError exception like:
{code:java}
[2019-08-06 15:31:21,822: WARNING/ForkPoolWorker-3] Traceback (most recent call last):
  File "/root/celery_gremlin/gremlin_test/tasks.py", line 15, in graph_query_test
    res = client.submit(dsl).one()
  File "/usr/local/lib/python3.6/site-packages/gremlin_python/driver/resultset.py", line 83,
in one
    return self.done.result()
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/usr/lib64/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.6/site-packages/gremlin_python/driver/connection.py", line
80, in _receive
    status_code = self._protocol.data_received(data, self._results)
  File "/usr/local/lib/python3.6/site-packages/gremlin_python/driver/protocol.py", line 83,
in data_received
    result_set = results_dict[request_id]
KeyError: '1ff0ad4a-dcea-45d1-b240-27c2dc792f73'
{code}
Steps to reproduce:
 # Following the [tutorial|[https://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html]]
of Celery to create a simple app named `tasks`
 # Refactor `tasks.py` to send request through Gremin Python SDK inside the task method
{code:java}
from celery import Celery
from gremlin_python.driver import client

username = "test"
password = "test"

my_client = client.Client('ws://localhost:8182/gremlin', 'g',
                          username=username, password=password,
                          pool_size=1, max_workers=1)
app = Celery('tasks', broker='redis://localhost', backend='redis://localhost:6379/0')


@app.task
def add(x, y):
    dsl = "g.V().count()"
    my_client.submit(dsl).one()
    return x + y
{code}

 # Invoke `add.delay(1,2)`

There is a quick fix which changes several lines inside `connection.py` without breaking other
code:
{code:java}
class Connection:

    def __init__(self, url, traversal_source, protocol, transport_factory,
                 executor, pool):
        self._url = url
        self._traversal_source = traversal_source
        self._protocol = protocol
        self._transport_factory = transport_factory
        self._executor = executor
        self._transport = None
        self._pool = pool
        self._results = {}
        self._inited = False

    def connect(self):
        if self._transport:
            self._transport.close()
        self._transport = self._transport_factory()
        self._transport.connect(self._url)
        self._protocol.connection_made(self._transport)
        self._inited = True

    def close(self):
        if self._inited:
            self._transport.close()

    def write(self, request_message):
        if not self._inited:
            self.connect()
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Mime
View raw message