Skip to content

[18.0][FIX] fastapi: Forwardport 16.0 pullrequest 486 - Avoid zombie threads #499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: 18.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions fastapi/fastapi_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from .context import odoo_env_ctx
from .error_handlers import convert_exception_to_status_body
from .pools import fastapi_app_pool


class FastApiDispatcher(Dispatcher):
Expand All @@ -24,18 +25,17 @@ def dispatch(self, endpoint, args):
root_path = "/" + environ["PATH_INFO"].split("/")[1]
# TODO store the env into contextvar to be used by the odoo_env
# depends method
fastapi_endpoint = self.request.env["fastapi.endpoint"].sudo()
app = fastapi_endpoint.get_app(root_path)
uid = fastapi_endpoint.get_uid(root_path)
data = BytesIO()
with self._manage_odoo_env(uid):
for r in app(environ, self._make_response):
data.write(r)
if self.inner_exception:
raise self.inner_exception
return self.request.make_response(
data.getvalue(), headers=self.headers, status=self.status
)
with fastapi_app_pool.get_app(env=request.env, root_path=root_path) as app:
uid = request.env["fastapi.endpoint"].sudo().get_uid(root_path)
data = BytesIO()
with self._manage_odoo_env(uid):
for r in app(environ, self._make_response):
data.write(r)
if self.inner_exception:
raise self.inner_exception
return self.request.make_response(
data.getvalue(), headers=self.headers, status=self.status
)

def handle_error(self, exc):
headers = getattr(exc, "headers", None)
Expand Down
26 changes: 26 additions & 0 deletions fastapi/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2025 ACSONE SA/NV
# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL).
"""
ASGI middleware for FastAPI.

This module provides an ASGI middleware for FastAPI applications. The middleware
is designed to ensure managed the lifecycle of the threads used to as event loop
for the ASGI application.

"""

from collections.abc import Iterable

import a2wsgi
from a2wsgi.asgi import ASGIResponder
from a2wsgi.wsgi_typing import Environ, StartResponse

from .pools import event_loop_pool


class ASGIMiddleware(a2wsgi.ASGIMiddleware):
def __call__(
self, environ: Environ, start_response: StartResponse
) -> Iterable[bytes]:
with event_loop_pool.get_event_loop() as loop:
return ASGIResponder(self.app, loop)(environ, start_response)
23 changes: 14 additions & 9 deletions fastapi/models/fastapi_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from itertools import chain
from typing import Any

from a2wsgi import ASGIMiddleware
from starlette.middleware import Middleware
from starlette.routing import Mount

Expand All @@ -16,6 +15,7 @@
from fastapi import APIRouter, Depends, FastAPI

from .. import dependencies
from ..middleware import ASGIMiddleware

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -121,10 +121,10 @@
return tuple(res)

@api.model
def _routing_impacting_fields(self) -> tuple[str]:
def _routing_impacting_fields(self) -> tuple[str, ...]:
"""The list of fields requiring to refresh the mount point of the pp
into odoo if modified"""
return ("root_path",)
return ("root_path", "save_http_session")

Check warning on line 127 in fastapi/models/fastapi_endpoint.py

View check run for this annotation

Codecov / codecov/patch

fastapi/models/fastapi_endpoint.py#L127

Added line #L127 was not covered by tests

#
# end of endpoint.route.sync.mixin methods implementation
Expand Down Expand Up @@ -199,14 +199,19 @@
return f"{self._name}:{self.id}:{path}"

def _reset_app(self):
self.env.registry.clear_cache()
self._reset_app_cache_marker.clear_cache(self)

Check warning on line 202 in fastapi/models/fastapi_endpoint.py

View check run for this annotation

Codecov / codecov/patch

fastapi/models/fastapi_endpoint.py#L202

Added line #L202 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #510


@tools.ormcache()
def _reset_app_cache_marker(self):
"""This methos is used to get a way to mark the orm cache as dirty
when the app is reset. By marking the cache as dirty, the system
will signal to others instances that the cache is not up to date
and that they should invalidate their cache as well. This is required
to ensure that any change requiring a reset of the app is propagated
to all the running instances.
"""

@api.model
@tools.ormcache("root_path")
# TODO cache on thread local by db to enable to get 1 middelware by
# thread when odoo runs in multi threads mode and to allows invalidate
# specific entries in place og the overall cache as we have to do into
# the _rest_app method
def get_app(self, root_path):
record = self.search([("root_path", "=", root_path)])
if not record:
Expand Down
11 changes: 11 additions & 0 deletions fastapi/pools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .event_loop import EventLoopPool
from .fastapi_app import FastApiAppPool
from odoo.service.server import CommonServer

event_loop_pool = EventLoopPool()
fastapi_app_pool = FastApiAppPool()


CommonServer.on_stop(event_loop_pool.shutdown)

__all__ = ["event_loop_pool", "fastapi_app_pool"]
61 changes: 61 additions & 0 deletions fastapi/pools/event_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2025 ACSONE SA/NV
# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL).

import asyncio
import queue
import threading
from collections.abc import Generator
from contextlib import contextmanager


class EventLoopPool:
def __init__(self):
self.pool = queue.Queue[tuple[asyncio.AbstractEventLoop, threading.Thread]]()

def __get_event_loop_and_thread(
self,
) -> tuple[asyncio.AbstractEventLoop, threading.Thread]:
"""
Get an event loop from the pool. If no event loop is available,
create a new one.
"""
try:
return self.pool.get_nowait()
except queue.Empty:
loop = asyncio.new_event_loop()
thread = threading.Thread(target=loop.run_forever, daemon=True)
thread.start()
return loop, thread

def __return_event_loop(
self, loop: asyncio.AbstractEventLoop, thread: threading.Thread
) -> None:
"""
Return an event loop to the pool for reuse.
"""
self.pool.put((loop, thread))

def shutdown(self):
"""
Shutdown all event loop threads in the pool.
"""
while not self.pool.empty():
loop, thread = self.pool.get_nowait()
loop.call_soon_threadsafe(loop.stop)
thread.join()
loop.close()

@contextmanager
def get_event_loop(self) -> Generator[asyncio.AbstractEventLoop, None, None]:
"""
Get an event loop from the pool. If no event loop is available,
create a new one.

After the context manager exits, the event loop is returned to
the pool for reuse.
"""
loop, thread = self.__get_event_loop_and_thread()
try:
yield loop
finally:
self.__return_event_loop(loop, thread)
136 changes: 136 additions & 0 deletions fastapi/pools/fastapi_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright 2025 ACSONE SA/NV
# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL).
import logging
import queue
import threading
from collections import defaultdict
from collections.abc import Generator
from contextlib import contextmanager

from odoo.api import Environment

from fastapi import FastAPI

_logger = logging.getLogger(__name__)


class FastApiAppPool:
"""Pool of FastAPI apps.

This class manages a pool of FastAPI apps. The pool is organized by database name
and root path. Each pool is a queue of FastAPI apps.

The pool is used to reuse FastAPI apps across multiple requests. This is useful
to avoid the overhead of creating a new FastAPI app for each request. The pool
ensures that only one request at a time uses an app.

The proper way to use the pool is to use the get_app method as a context manager.
This ensures that the app is returned to the pool after the context manager exits.
The get_app method is designed to ensure that the app made available to the
caller is unique and not used by another caller at the same time.

.. code-block:: python

with fastapi_app_pool.get_app(env=request.env, root_path=root_path) as app:
# use the app

The pool is invalidated when the cache registry is updated. This ensures that
the pool is always up-to-date with the latest app configuration. It also
ensures that the invalidation is done even in the case of a modification occurring
in a different worker process or thread or server instance. This mechanism
works because every time an attribute of the fastapi.endpoint model is modified
and this attribute is part of the list returned by the `_fastapi_app_fields`,
or `_routing_impacting_fields` methods, we reset the cache of a marker method
`_reset_app_cache_marker`. As side effect, the cache registry is marked to be
updated by the increment of the `cache_sequence` SQL sequence. This cache sequence
on the registry is reloaded from the DB on each request made to a specific database.
When an app is retrieved from the pool, we always compare the cache sequence of
the pool with the cache sequence of the registry. If the two sequences are
different, we invalidate the pool and save the new cache sequence on the pool.

The cache is based on a defaultdict of defaultdict of queue.Queue. We are cautious
that the use of defaultdict is not thread-safe for operations that modify the
dictionary. However the only operation that modifies the dictionary is the
first access to a new key. If two threads access the same key at the same time,
the two threads will create two different queues. This is not a problem since
at the time of returning an app to the pool, we are sure that a queue exists
for the key into the cache and all the created apps are returned to the same
valid queue. And the end, the lack of thread-safety for the defaultdict could
only lead to a negligible overhead of creating a new queue that will never be
used. This is why we consider that the use of defaultdict is safe in this context.
"""

def __init__(self):
self._queue_by_db_by_root_path: dict[str, dict[str, queue.Queue[FastAPI]]] = (
defaultdict(lambda: defaultdict(queue.Queue))
)
self.__cache_sequences = {}
self._lock = threading.Lock()

def __get_pool(self, env: Environment, root_path: str) -> queue.Queue[FastAPI]:
db_name = env.cr.dbname
return self._queue_by_db_by_root_path[db_name][root_path]

def __get_app(self, env: Environment, root_path: str) -> FastAPI:
pool = self.__get_pool(env, root_path)
try:
return pool.get_nowait()
except queue.Empty:
env["fastapi.endpoint"].sudo()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this line?

return env["fastapi.endpoint"].sudo().get_app(root_path)
Comment on lines +78 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
except queue.Empty:
env["fastapi.endpoint"].sudo()
return env["fastapi.endpoint"].sudo().get_app(root_path)
except queue.Empty:
return env["fastapi.endpoint"].sudo().get_app(root_path)


def __return_app(self, env: Environment, app: FastAPI, root_path: str) -> None:
pool = self.__get_pool(env, root_path)
pool.put(app)

@contextmanager
def get_app(
self, env: Environment, root_path: str
) -> Generator[FastAPI, None, None]:
"""Return a FastAPI app to be used in a context manager.

The app is retrieved from the pool if available, otherwise a new one is created.
The app is returned to the pool after the context manager exits.

When used into the FastApiDispatcher class this ensures that the app is reused
across multiple requests but only one request at a time uses an app.
"""
self._check_cache(env)
app = self.__get_app(env, root_path)
try:
yield app
finally:
self.__return_app(env, app, root_path)

def get_cache_sequence(self, key: str) -> int:
with self._lock:
return self.__cache_sequences.get(key, 0)

def set_cache_sequence(self, key: str, value: int) -> None:
with self._lock:
if (
key not in self.__cache_sequences
or value != self.__cache_sequences[key]
):
self.__cache_sequences[key] = value

def _check_cache(self, env: Environment) -> None:
cache_sequences = env.registry.cache_sequences
for key, value in cache_sequences.items():
if (
value != self.get_cache_sequence(key)
and self.get_cache_sequence(key) != 0
):
_logger.info(

Check warning on line 124 in fastapi/pools/fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

fastapi/pools/fastapi_app.py#L124

Added line #L124 was not covered by tests
"Cache registry updated, reset fastapi_app pool for the current "
"database"
)
self.invalidate(env)

Check warning on line 128 in fastapi/pools/fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

fastapi/pools/fastapi_app.py#L128

Added line #L128 was not covered by tests
self.set_cache_sequence(key, value)

def invalidate(self, env: Environment, root_path: str | None = None) -> None:
db_name = env.cr.dbname

Check warning on line 132 in fastapi/pools/fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

fastapi/pools/fastapi_app.py#L132

Added line #L132 was not covered by tests
if root_path:
self._queue_by_db_by_root_path[db_name][root_path] = queue.Queue()

Check warning on line 134 in fastapi/pools/fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

fastapi/pools/fastapi_app.py#L134

Added line #L134 was not covered by tests
elif db_name in self._queue_by_db_by_root_path:
del self._queue_by_db_by_root_path[db_name]

Check warning on line 136 in fastapi/pools/fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

fastapi/pools/fastapi_app.py#L136

Added line #L136 was not covered by tests

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lmignon Why do you think it is sufficient to delete content of _queue_by_db_by_root_path (ASGIMiddleware) this way? Threads created before supposed to be cleaned by Python GC? I've done some test with Odoo cache invalidation and logging current threads number, the number is incremented by 1 after every "Caches invalidated" signalling. The threads remain in the process.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Screenshot 2025-03-17 at 11 08 05

Screenshot 2025-03-17 at 11 10 03
the difference between 2 logs is 1 Odoo cache invalidation.

Copy link

@veryberry veryberry Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lmignon I think I've found the issue. seems like you should have redefined "init" in ASGIMiddleware
(https://github.com/abersheeran/a2wsgi/blob/master/a2wsgi/asgi.py#L130)
because every time new loop is created

Screenshot 2025-03-17 at 11 54 33

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@veryberry The middleware used is defined here and extend https://github.com/abersheeran/a2wsgi/blob/master/a2wsgi/asgi.py#L130. The loop used comes from a poo and if one is available into the pool it will be reused.

Copy link

@veryberry veryberry Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lmignon the problem is that you have extended its __call__ method
But actually the __init__ is triggered first, and there loop is alway None, and new loop is created every time you call return ASGIMiddleware(app).
I've proved it locally controlling threads after every Odoo cache invalidation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

@veryberry veryberry Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks
@lmignon just wanted you to be aware of this issue and know your opinion on it. In my case I've moved context manager usage from __call__ to __init__. It helped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my case I've moved context manager usage from __call__ to __init__. It helped.

????

without seeing the code, I find it hard to imagine that it would work correctly with the change you describes. It's important that the eventloop theard can only be used for one call at a time. That's why it's the call method that's overloaded, because once the context manager exit, the pool becomes available again for another odoo process/thread.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lmignon would you be so kind to explain why it's important that the eventloop theard can only be used for one call at a time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think of the event loop thread like a single train track. If two trains (calls) try to use the same track at the same time, they’ll collide, and everything will break. To keep things running smoothly, we let one train (call) complete its journey before allowing another one on the track.