From 7ce51b4629a44586013f85966b10dfa4a424a5d0 Mon Sep 17 00:00:00 2001 From: "Laurent Mignon (ACSONE)" Date: Wed, 8 Jan 2025 11:58:22 +0100 Subject: [PATCH 1/6] [FIX] fastapi: Avoid zombie threads Each time a fastapi app is created, a new event loop thread is created by the ASGIMiddleware. Unfortunately, every time the cache is cleared, a new app is created with a new even loop thread. This leads to an increase in the number of threads created to manage the asyncio event loop, even though many of them are no longer in use. To avoid this problem, the thread in charge of the event loop is now created only once per thread / process and the result is stored in the thread's local storage. If a new instance of an app needs to be created following a cache reset, this ensures that the same event loop is reused. refs #484 --- fastapi/models/fastapi_endpoint.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/fastapi/models/fastapi_endpoint.py b/fastapi/models/fastapi_endpoint.py index b77bf5141..88f83fdab 100644 --- a/fastapi/models/fastapi_endpoint.py +++ b/fastapi/models/fastapi_endpoint.py @@ -1,8 +1,10 @@ # Copyright 2022 ACSONE SA/NV # License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL). +import asyncio import logging from collections.abc import Callable +import threading from functools import partial from itertools import chain from typing import Any @@ -20,6 +22,26 @@ _logger = logging.getLogger(__name__) +# Thread-local storage for event loops +# Using a thread-local storage allows to have a dedicated event loop per thread +# and avoid the need to create a new event loop for each request. It's also +# compatible with the multi-worker mode of Odoo. +_event_loop_storage = threading.local() + + +def get_or_create_event_loop() -> asyncio.AbstractEventLoop: + """ + Get or create a reusable event loop for the current thread. + """ + if not hasattr(_event_loop_storage, "loop"): + loop = asyncio.new_event_loop() + loop_thread = threading.Thread(target=loop.run_forever, daemon=True) + loop_thread.start() + _event_loop_storage.loop = loop + _event_loop_storage.thread = loop_thread + return _event_loop_storage.loop + + class FastapiEndpoint(models.Model): _name = "fastapi.endpoint" _inherit = "endpoint.route.sync.mixin" @@ -214,7 +236,8 @@ def get_app(self, root_path): app = FastAPI() app.mount(record.root_path, record._get_app()) self._clear_fastapi_exception_handlers(app) - return ASGIMiddleware(app) + event_loop = get_or_create_event_loop() + return ASGIMiddleware(app, loop=event_loop) def _clear_fastapi_exception_handlers(self, app: FastAPI) -> None: """ From 457751d531430aa68cebf678dc3dddd361d6f121 Mon Sep 17 00:00:00 2001 From: "Laurent Mignon (ACSONE)" Date: Fri, 10 Jan 2025 14:44:18 +0100 Subject: [PATCH 2/6] [IMP] fastapi: add event loop lifecycle management This commit adds event loop lifecycle management to the FastAPI dispatcher. Before this commit, an event loop and the thread to run it were created each time a FastAPI app was created. The drawback of this approach is that when the app was destroyed (for example, when the cache of app was cleared), the event loop and the thread were not properly stopped, which could lead to memory leaks and zombie threads. This commit fixes this issue by creating a pool of event loops and threads that are shared among all FastAPI apps. On each call to a FastAPI app, a event loop is requested from the pool and is returned to the pool when the app is destroyed. At request time of an event loop, the pool try to reuse an existing event loop and if no event loop is available, a new event loop is created. The cache of the FastAPI app is also refactored to use it's own mechanism. It's now based on a dictionary of queues by root path by database, where each queue is a pool of FastAPI app. This allows a better management of the invalidation of the cache. It's now possible to invalidate the cache of FastAPI app by root path without affecting the cache of others root paths. --- fastapi/fastapi_dispatcher.py | 24 ++++++------- fastapi/middleware.py | 26 ++++++++++++++ fastapi/models/fastapi_endpoint.py | 35 +++--------------- fastapi/pools/__init__.py | 7 ++++ fastapi/pools/event_loop.py | 58 ++++++++++++++++++++++++++++++ fastapi/pools/fastapi_app.py | 52 +++++++++++++++++++++++++++ 6 files changed, 159 insertions(+), 43 deletions(-) create mode 100644 fastapi/middleware.py create mode 100644 fastapi/pools/__init__.py create mode 100644 fastapi/pools/event_loop.py create mode 100644 fastapi/pools/fastapi_app.py diff --git a/fastapi/fastapi_dispatcher.py b/fastapi/fastapi_dispatcher.py index 1a8eb3532..3edf29f14 100644 --- a/fastapi/fastapi_dispatcher.py +++ b/fastapi/fastapi_dispatcher.py @@ -8,6 +8,7 @@ from .context import odoo_env_ctx from .error_handlers import convert_exception_to_status_body +from .pools import fastapi_app_pool class FastApiDispatcher(Dispatcher): @@ -24,18 +25,17 @@ def dispatch(self, endpoint, args): root_path = "/" + environ["PATH_INFO"].split("/")[1] # TODO store the env into contextvar to be used by the odoo_env # depends method - fastapi_endpoint = self.request.env["fastapi.endpoint"].sudo() - app = fastapi_endpoint.get_app(root_path) - uid = fastapi_endpoint.get_uid(root_path) - data = BytesIO() - with self._manage_odoo_env(uid): - for r in app(environ, self._make_response): - data.write(r) - if self.inner_exception: - raise self.inner_exception - return self.request.make_response( - data.getvalue(), headers=self.headers, status=self.status - ) + with fastapi_app_pool.get_app(root_path, request.env) as app: + uid = request.env["fastapi.endpoint"].sudo().get_uid(root_path) + data = BytesIO() + with self._manage_odoo_env(uid): + for r in app(environ, self._make_response): + data.write(r) + if self.inner_exception: + raise self.inner_exception + return self.request.make_response( + data.getvalue(), headers=self.headers, status=self.status + ) def handle_error(self, exc): headers = getattr(exc, "headers", None) diff --git a/fastapi/middleware.py b/fastapi/middleware.py new file mode 100644 index 000000000..8f63c2339 --- /dev/null +++ b/fastapi/middleware.py @@ -0,0 +1,26 @@ +# Copyright 2025 ACSONE SA/NV +# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL). +""" +ASGI middleware for FastAPI. + +This module provides an ASGI middleware for FastAPI applications. The middleware +is designed to ensure managed the lifecycle of the threads used to as event loop +for the ASGI application. + +""" + +from typing import Iterable + +import a2wsgi +from a2wsgi.asgi import ASGIResponder +from a2wsgi.wsgi_typing import Environ, StartResponse + +from .pools import event_loop_pool + + +class ASGIMiddleware(a2wsgi.ASGIMiddleware): + def __call__( + self, environ: Environ, start_response: StartResponse + ) -> Iterable[bytes]: + with event_loop_pool.get_event_loop() as loop: + return ASGIResponder(self.app, loop)(environ, start_response) diff --git a/fastapi/models/fastapi_endpoint.py b/fastapi/models/fastapi_endpoint.py index 88f83fdab..97b92762e 100644 --- a/fastapi/models/fastapi_endpoint.py +++ b/fastapi/models/fastapi_endpoint.py @@ -1,15 +1,12 @@ # Copyright 2022 ACSONE SA/NV # License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL). -import asyncio import logging from collections.abc import Callable -import threading from functools import partial from itertools import chain from typing import Any -from a2wsgi import ASGIMiddleware from starlette.middleware import Middleware from starlette.routing import Mount @@ -18,30 +15,12 @@ from fastapi import APIRouter, Depends, FastAPI from .. import dependencies +from ..middleware import ASGIMiddleware +from ..pools import fastapi_app_pool _logger = logging.getLogger(__name__) -# Thread-local storage for event loops -# Using a thread-local storage allows to have a dedicated event loop per thread -# and avoid the need to create a new event loop for each request. It's also -# compatible with the multi-worker mode of Odoo. -_event_loop_storage = threading.local() - - -def get_or_create_event_loop() -> asyncio.AbstractEventLoop: - """ - Get or create a reusable event loop for the current thread. - """ - if not hasattr(_event_loop_storage, "loop"): - loop = asyncio.new_event_loop() - loop_thread = threading.Thread(target=loop.run_forever, daemon=True) - loop_thread.start() - _event_loop_storage.loop = loop - _event_loop_storage.thread = loop_thread - return _event_loop_storage.loop - - class FastapiEndpoint(models.Model): _name = "fastapi.endpoint" _inherit = "endpoint.route.sync.mixin" @@ -221,14 +200,9 @@ def _endpoint_registry_route_unique_key(self, routing: dict[str, Any]): return f"{self._name}:{self.id}:{path}" def _reset_app(self): - self.env.registry.clear_cache() + fastapi_app_pool.invalidate(self.root_path, self.env) @api.model - @tools.ormcache("root_path") - # TODO cache on thread local by db to enable to get 1 middelware by - # thread when odoo runs in multi threads mode and to allows invalidate - # specific entries in place og the overall cache as we have to do into - # the _rest_app method def get_app(self, root_path): record = self.search([("root_path", "=", root_path)]) if not record: @@ -236,8 +210,7 @@ def get_app(self, root_path): app = FastAPI() app.mount(record.root_path, record._get_app()) self._clear_fastapi_exception_handlers(app) - event_loop = get_or_create_event_loop() - return ASGIMiddleware(app, loop=event_loop) + return ASGIMiddleware(app) def _clear_fastapi_exception_handlers(self, app: FastAPI) -> None: """ diff --git a/fastapi/pools/__init__.py b/fastapi/pools/__init__.py new file mode 100644 index 000000000..08ab00781 --- /dev/null +++ b/fastapi/pools/__init__.py @@ -0,0 +1,7 @@ +from .event_loop import EventLoopPool +from .fastapi_app import FastApiAppPool + +event_loop_pool = EventLoopPool() +fastapi_app_pool = FastApiAppPool() + +__all__ = ["event_loop_pool", "fastapi_app_pool"] diff --git a/fastapi/pools/event_loop.py b/fastapi/pools/event_loop.py new file mode 100644 index 000000000..a0a02a8f3 --- /dev/null +++ b/fastapi/pools/event_loop.py @@ -0,0 +1,58 @@ +# Copyright 2025 ACSONE SA/NV +# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL). + +import asyncio +import queue +import threading +from contextlib import contextmanager +from typing import Generator + + +class EventLoopPool: + def __init__(self): + self.pool = queue.Queue[tuple[asyncio.AbstractEventLoop, threading.Thread]]() + + def __get_event_loop_and_thread( + self, + ) -> tuple[asyncio.AbstractEventLoop, threading.Thread]: + """ + Get an event loop from the pool. If no event loop is available, create a new one. + """ + try: + return self.pool.get_nowait() + except queue.Empty: + loop = asyncio.new_event_loop() + thread = threading.Thread(target=loop.run_forever, daemon=True) + thread.start() + return loop, thread + + def __return_event_loop( + self, loop: asyncio.AbstractEventLoop, thread: threading.Thread + ) -> None: + """ + Return an event loop to the pool for reuse. + """ + self.pool.put((loop, thread)) + + def shutdown(self): + """ + Shutdown all event loop threads in the pool. + """ + while not self.pool.empty(): + loop, thread = self.pool.get_nowait() + loop.call_soon_threadsafe(loop.stop) + thread.join() + loop.close() + + @contextmanager + def get_event_loop(self) -> Generator[asyncio.AbstractEventLoop, None, None]: + """ + Get an event loop from the pool. If no event loop is available, create a new one. + + After the context manager exits, the event loop is returned to the pool for reuse. + """ + loop, thread = self.__get_event_loop_and_thread() + try: + yield loop + finally: + self.__return_event_loop(loop, thread) diff --git a/fastapi/pools/fastapi_app.py b/fastapi/pools/fastapi_app.py new file mode 100644 index 000000000..3508ea373 --- /dev/null +++ b/fastapi/pools/fastapi_app.py @@ -0,0 +1,52 @@ +# Copyright 2025 ACSONE SA/NV +# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL). + +import queue +from collections import defaultdict +from contextlib import contextmanager +from typing import Generator + +from odoo.api import Environment + +from fastapi import FastAPI + + +class FastApiAppPool: + def __init__(self): + self._queue_by_db_by_root_path: dict[ + str, dict[str, queue.Queue[FastAPI]] + ] = defaultdict(lambda: defaultdict(queue.Queue)) + + def __get_app(self, env: Environment, root_path: str) -> FastAPI: + db_name = env.cr.dbname + try: + return self._queue_by_db_by_root_path[db_name][root_path].get_nowait() + except queue.Empty: + env["fastapi.endpoint"].sudo() + return env["fastapi.endpoint"].sudo().get_app(root_path) + + def __return_app(self, env: Environment, app: FastAPI, root_path: str) -> None: + db_name = env.cr.dbname + self._queue_by_db_by_root_path[db_name][root_path].put(app) + + @contextmanager + def get_app( + self, root_path: str, env: Environment + ) -> Generator[FastAPI, None, None]: + """Return a FastAPI app to be used in a context manager. + + The app is retrieved from the pool if available, otherwise a new one is created. + The app is returned to the pool after the context manager exits. + + When used into the FastApiDispatcher class this ensures that the app is reused + across multiple requests but only one request at a time uses an app. + """ + app = self.__get_app(env, root_path) + try: + yield app + finally: + self.__return_app(env, app, root_path) + + def invalidate(self, root_path: str, env: Environment) -> None: + db_name = env.cr.dbname + self._queue_by_db_by_root_path[db_name][root_path] = queue.Queue() From 1f7d17e69fc3aa18b49d8a823a8f3f5769b378f1 Mon Sep 17 00:00:00 2001 From: "Laurent Mignon (ACSONE)" Date: Fri, 10 Jan 2025 16:10:02 +0100 Subject: [PATCH 3/6] [FIX] fastapi: Graceful shutdown of event loop On server shutdown, ensure that created the event loops are closed properly. --- fastapi/pools/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fastapi/pools/__init__.py b/fastapi/pools/__init__.py index 08ab00781..31f1fb388 100644 --- a/fastapi/pools/__init__.py +++ b/fastapi/pools/__init__.py @@ -1,7 +1,11 @@ from .event_loop import EventLoopPool from .fastapi_app import FastApiAppPool +from odoo.service.server import CommonServer event_loop_pool = EventLoopPool() fastapi_app_pool = FastApiAppPool() + +CommonServer.on_stop(event_loop_pool.shutdown) + __all__ = ["event_loop_pool", "fastapi_app_pool"] From ca1aee63720e8e10f8fd8d7946334ce58c07d3aa Mon Sep 17 00:00:00 2001 From: "Laurent Mignon (ACSONE)" Date: Fri, 10 Jan 2025 16:36:35 +0100 Subject: [PATCH 4/6] [FIX] fastapi: Ensure thread safety of the FastAPI app cache defaultdict in python is not thread safe. Since this data structure is used to store the cache of FastAPI apps, we must ensure that the access to this cache is thread safe. This is done by using a lock to protect the access to the cache. --- fastapi/pools/fastapi_app.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/fastapi/pools/fastapi_app.py b/fastapi/pools/fastapi_app.py index 3508ea373..ffdd4d5e8 100644 --- a/fastapi/pools/fastapi_app.py +++ b/fastapi/pools/fastapi_app.py @@ -2,6 +2,7 @@ # License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL). import queue +import threading from collections import defaultdict from contextlib import contextmanager from typing import Generator @@ -16,18 +17,25 @@ def __init__(self): self._queue_by_db_by_root_path: dict[ str, dict[str, queue.Queue[FastAPI]] ] = defaultdict(lambda: defaultdict(queue.Queue)) + self._lock = threading.Lock() - def __get_app(self, env: Environment, root_path: str) -> FastAPI: + def __get_pool(self, env: Environment, root_path: str) -> queue.Queue[FastAPI]: db_name = env.cr.dbname + with self._lock: + # default dict is not thread safe but the use + return self._queue_by_db_by_root_path[db_name][root_path] + + def __get_app(self, env: Environment, root_path: str) -> FastAPI: + pool = self.__get_pool(env, root_path) try: - return self._queue_by_db_by_root_path[db_name][root_path].get_nowait() + return pool.get_nowait() except queue.Empty: env["fastapi.endpoint"].sudo() return env["fastapi.endpoint"].sudo().get_app(root_path) def __return_app(self, env: Environment, app: FastAPI, root_path: str) -> None: - db_name = env.cr.dbname - self._queue_by_db_by_root_path[db_name][root_path].put(app) + pool = self.__get_pool(env, root_path) + pool.put(app) @contextmanager def get_app( @@ -48,5 +56,6 @@ def get_app( self.__return_app(env, app, root_path) def invalidate(self, root_path: str, env: Environment) -> None: - db_name = env.cr.dbname - self._queue_by_db_by_root_path[db_name][root_path] = queue.Queue() + with self._lock: + db_name = env.cr.dbname + self._queue_by_db_by_root_path[db_name][root_path] = queue.Queue() From 13f9de0df496d4974ca8fae1ac0cce9f8b516d57 Mon Sep 17 00:00:00 2001 From: "Laurent Mignon (ACSONE)" Date: Mon, 13 Jan 2025 17:27:08 +0100 Subject: [PATCH 5/6] [IMP] fastapi: Improves app cache lifecycle This commit improves the lifecycle of the fastapi app cache. It first ensures that the cache is effectively invalidated when changes are made to the app configuration even if theses changes occur into an other server instance. It also remove the use of a locking mechanism put in place to ensure a thread safe access to a value into the cache to avoid potential concurrency issue when a default value is set to the cache at access time. This lock could lead to unnecessary contention and reduce the performance benefits of queue.Queue's fine-grained internal synchronization for a questionable gain. The only expected gain was to avoid the useless creation of a queue.Queue instance that would never be used since at the time of puting the value into the cache we are sure that a value is already present into the dictionary. --- fastapi/fastapi_dispatcher.py | 2 +- fastapi/models/fastapi_endpoint.py | 17 ++++-- fastapi/pools/fastapi_app.py | 85 +++++++++++++++++++++++++++--- 3 files changed, 91 insertions(+), 13 deletions(-) diff --git a/fastapi/fastapi_dispatcher.py b/fastapi/fastapi_dispatcher.py index 3edf29f14..3f2390d4d 100644 --- a/fastapi/fastapi_dispatcher.py +++ b/fastapi/fastapi_dispatcher.py @@ -25,7 +25,7 @@ def dispatch(self, endpoint, args): root_path = "/" + environ["PATH_INFO"].split("/")[1] # TODO store the env into contextvar to be used by the odoo_env # depends method - with fastapi_app_pool.get_app(root_path, request.env) as app: + with fastapi_app_pool.get_app(env=request.env, root_path=root_path) as app: uid = request.env["fastapi.endpoint"].sudo().get_uid(root_path) data = BytesIO() with self._manage_odoo_env(uid): diff --git a/fastapi/models/fastapi_endpoint.py b/fastapi/models/fastapi_endpoint.py index 97b92762e..e6a003bf0 100644 --- a/fastapi/models/fastapi_endpoint.py +++ b/fastapi/models/fastapi_endpoint.py @@ -16,7 +16,6 @@ from .. import dependencies from ..middleware import ASGIMiddleware -from ..pools import fastapi_app_pool _logger = logging.getLogger(__name__) @@ -122,10 +121,10 @@ def _registered_endpoint_rule_keys(self): return tuple(res) @api.model - def _routing_impacting_fields(self) -> tuple[str]: + def _routing_impacting_fields(self) -> Tuple[str, ...]: """The list of fields requiring to refresh the mount point of the pp into odoo if modified""" - return ("root_path",) + return ("root_path", "save_http_session") # # end of endpoint.route.sync.mixin methods implementation @@ -200,7 +199,17 @@ def _endpoint_registry_route_unique_key(self, routing: dict[str, Any]): return f"{self._name}:{self.id}:{path}" def _reset_app(self): - fastapi_app_pool.invalidate(self.root_path, self.env) + self._reset_app_cache_marker.clear_cache(self) + + @tools.ormcache() + def _reset_app_cache_marker(self): + """This methos is used to get a way to mark the orm cache as dirty + when the app is reset. By marking the cache as dirty, the system + will signal to others instances that the cache is not up to date + and that they should invalidate their cache as well. This is required + to ensure that any change requiring a reset of the app is propagated + to all the running instances. + """ @api.model def get_app(self, root_path): diff --git a/fastapi/pools/fastapi_app.py b/fastapi/pools/fastapi_app.py index ffdd4d5e8..991374179 100644 --- a/fastapi/pools/fastapi_app.py +++ b/fastapi/pools/fastapi_app.py @@ -1,6 +1,6 @@ # Copyright 2025 ACSONE SA/NV # License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL). - +import logging import queue import threading from collections import defaultdict @@ -11,19 +11,65 @@ from fastapi import FastAPI +_logger = logging.getLogger(__name__) + class FastApiAppPool: + """Pool of FastAPI apps. + + This class manages a pool of FastAPI apps. The pool is organized by database name + and root path. Each pool is a queue of FastAPI apps. + + The pool is used to reuse FastAPI apps across multiple requests. This is useful + to avoid the overhead of creating a new FastAPI app for each request. The pool + ensures that only one request at a time uses an app. + + The proper way to use the pool is to use the get_app method as a context manager. + This ensures that the app is returned to the pool after the context manager exits. + The get_app method is designed to ensure that the app made available to the + caller is unique and not used by another caller at the same time. + + .. code-block:: python + + with fastapi_app_pool.get_app(env=request.env, root_path=root_path) as app: + # use the app + + The pool is invalidated when the cache registry is updated. This ensures that + the pool is always up-to-date with the latest app configuration. It also + ensures that the invalidation is done even in the case of a modification occurring + in a different worker process or thread or server instance. This mechanism + works because every time an attribute of the fastapi.endpoint model is modified + and this attribute is part of the list returned by the `_fastapi_app_fields`, + or `_routing_impacting_fields` methods, we reset the cache of a marker method + `_reset_app_cache_marker`. As side effect, the cache registry is marked to be + updated by the increment of the `cache_sequence` SQL sequence. This cache sequence + on the registry is reloaded from the DB on each request made to a specific database. + When an app is retrieved from the pool, we always compare the cache sequence of + the pool with the cache sequence of the registry. If the two sequences are different, + we invalidate the pool and save the new cache sequence on the pool. + + The cache is based on a defaultdict of defaultdict of queue.Queue. We are cautious + that the use of defaultdict is not thread-safe for operations that modify the + dictionary. However the only operation that modifies the dictionary is the + first access to a new key. If two threads access the same key at the same time, + the two threads will create two different queues. This is not a problem since + at the time of returning an app to the pool, we are sure that a queue exists + for the key into the cache and all the created apps are returned to the same + valid queue. And the end, the lack of thread-safety for the defaultdict could + only lead to a negligible overhead of creating a new queue that will never be + used. This is why we consider that the use of defaultdict is safe in this context. + """ + def __init__(self): self._queue_by_db_by_root_path: dict[ str, dict[str, queue.Queue[FastAPI]] ] = defaultdict(lambda: defaultdict(queue.Queue)) + self.__cache_sequence = 0 self._lock = threading.Lock() def __get_pool(self, env: Environment, root_path: str) -> queue.Queue[FastAPI]: db_name = env.cr.dbname - with self._lock: - # default dict is not thread safe but the use - return self._queue_by_db_by_root_path[db_name][root_path] + return self._queue_by_db_by_root_path[db_name][root_path] def __get_app(self, env: Environment, root_path: str) -> FastAPI: pool = self.__get_pool(env, root_path) @@ -39,7 +85,7 @@ def __return_app(self, env: Environment, app: FastAPI, root_path: str) -> None: @contextmanager def get_app( - self, root_path: str, env: Environment + self, env: Environment, root_path: str ) -> Generator[FastAPI, None, None]: """Return a FastAPI app to be used in a context manager. @@ -49,13 +95,36 @@ def get_app( When used into the FastApiDispatcher class this ensures that the app is reused across multiple requests but only one request at a time uses an app. """ + self._check_cache(env) app = self.__get_app(env, root_path) try: yield app finally: self.__return_app(env, app, root_path) - def invalidate(self, root_path: str, env: Environment) -> None: - with self._lock: - db_name = env.cr.dbname + @property + def cache_sequence(self) -> int: + return self.__cache_sequence + + @cache_sequence.setter + def cache_sequence(self, value: int) -> None: + if value != self.__cache_sequence: + with self._lock: + self.__cache_sequence = value + + def _check_cache(self, env: Environment) -> None: + cache_sequence = env.registry.cache_sequence + if cache_sequence != self.cache_sequence and self.cache_sequence != 0: + _logger.info( + "Cache registry updated, reset fastapi_app pool for the current " + "database" + ) + self.invalidate(env) + self.cache_sequence = cache_sequence + + def invalidate(self, env: Environment, root_path: str | None = None) -> None: + db_name = env.cr.dbname + if root_path: self._queue_by_db_by_root_path[db_name][root_path] = queue.Queue() + elif db_name in self._queue_by_db_by_root_path: + del self._queue_by_db_by_root_path[db_name] From 56a6d4a370bb52711d06cf4cf806a5c26cc8573e Mon Sep 17 00:00:00 2001 From: Eric Lembregts Date: Tue, 25 Feb 2025 08:46:45 +0100 Subject: [PATCH 6/6] [MIG] fastapi: Add support for multiple Odoo cache sequences and fix linting [FIX] fastapi: Apply linting recommendations in 18 --- fastapi/middleware.py | 2 +- fastapi/models/fastapi_endpoint.py | 2 +- fastapi/pools/event_loop.py | 11 ++++--- fastapi/pools/fastapi_app.py | 52 +++++++++++++++++------------- 4 files changed, 38 insertions(+), 29 deletions(-) diff --git a/fastapi/middleware.py b/fastapi/middleware.py index 8f63c2339..e09b22309 100644 --- a/fastapi/middleware.py +++ b/fastapi/middleware.py @@ -9,7 +9,7 @@ """ -from typing import Iterable +from collections.abc import Iterable import a2wsgi from a2wsgi.asgi import ASGIResponder diff --git a/fastapi/models/fastapi_endpoint.py b/fastapi/models/fastapi_endpoint.py index e6a003bf0..a884cee8a 100644 --- a/fastapi/models/fastapi_endpoint.py +++ b/fastapi/models/fastapi_endpoint.py @@ -121,7 +121,7 @@ def _registered_endpoint_rule_keys(self): return tuple(res) @api.model - def _routing_impacting_fields(self) -> Tuple[str, ...]: + def _routing_impacting_fields(self) -> tuple[str, ...]: """The list of fields requiring to refresh the mount point of the pp into odoo if modified""" return ("root_path", "save_http_session") diff --git a/fastapi/pools/event_loop.py b/fastapi/pools/event_loop.py index a0a02a8f3..9f3a0160e 100644 --- a/fastapi/pools/event_loop.py +++ b/fastapi/pools/event_loop.py @@ -4,8 +4,8 @@ import asyncio import queue import threading +from collections.abc import Generator from contextlib import contextmanager -from typing import Generator class EventLoopPool: @@ -16,7 +16,8 @@ def __get_event_loop_and_thread( self, ) -> tuple[asyncio.AbstractEventLoop, threading.Thread]: """ - Get an event loop from the pool. If no event loop is available, create a new one. + Get an event loop from the pool. If no event loop is available, + create a new one. """ try: return self.pool.get_nowait() @@ -47,9 +48,11 @@ def shutdown(self): @contextmanager def get_event_loop(self) -> Generator[asyncio.AbstractEventLoop, None, None]: """ - Get an event loop from the pool. If no event loop is available, create a new one. + Get an event loop from the pool. If no event loop is available, + create a new one. - After the context manager exits, the event loop is returned to the pool for reuse. + After the context manager exits, the event loop is returned to + the pool for reuse. """ loop, thread = self.__get_event_loop_and_thread() try: diff --git a/fastapi/pools/fastapi_app.py b/fastapi/pools/fastapi_app.py index 991374179..67bfafb75 100644 --- a/fastapi/pools/fastapi_app.py +++ b/fastapi/pools/fastapi_app.py @@ -4,8 +4,8 @@ import queue import threading from collections import defaultdict +from collections.abc import Generator from contextlib import contextmanager -from typing import Generator from odoo.api import Environment @@ -45,8 +45,8 @@ class FastApiAppPool: updated by the increment of the `cache_sequence` SQL sequence. This cache sequence on the registry is reloaded from the DB on each request made to a specific database. When an app is retrieved from the pool, we always compare the cache sequence of - the pool with the cache sequence of the registry. If the two sequences are different, - we invalidate the pool and save the new cache sequence on the pool. + the pool with the cache sequence of the registry. If the two sequences are + different, we invalidate the pool and save the new cache sequence on the pool. The cache is based on a defaultdict of defaultdict of queue.Queue. We are cautious that the use of defaultdict is not thread-safe for operations that modify the @@ -61,10 +61,10 @@ class FastApiAppPool: """ def __init__(self): - self._queue_by_db_by_root_path: dict[ - str, dict[str, queue.Queue[FastAPI]] - ] = defaultdict(lambda: defaultdict(queue.Queue)) - self.__cache_sequence = 0 + self._queue_by_db_by_root_path: dict[str, dict[str, queue.Queue[FastAPI]]] = ( + defaultdict(lambda: defaultdict(queue.Queue)) + ) + self.__cache_sequences = {} self._lock = threading.Lock() def __get_pool(self, env: Environment, root_path: str) -> queue.Queue[FastAPI]: @@ -102,25 +102,31 @@ def get_app( finally: self.__return_app(env, app, root_path) - @property - def cache_sequence(self) -> int: - return self.__cache_sequence + def get_cache_sequence(self, key: str) -> int: + with self._lock: + return self.__cache_sequences.get(key, 0) - @cache_sequence.setter - def cache_sequence(self, value: int) -> None: - if value != self.__cache_sequence: - with self._lock: - self.__cache_sequence = value + def set_cache_sequence(self, key: str, value: int) -> None: + with self._lock: + if ( + key not in self.__cache_sequences + or value != self.__cache_sequences[key] + ): + self.__cache_sequences[key] = value def _check_cache(self, env: Environment) -> None: - cache_sequence = env.registry.cache_sequence - if cache_sequence != self.cache_sequence and self.cache_sequence != 0: - _logger.info( - "Cache registry updated, reset fastapi_app pool for the current " - "database" - ) - self.invalidate(env) - self.cache_sequence = cache_sequence + cache_sequences = env.registry.cache_sequences + for key, value in cache_sequences.items(): + if ( + value != self.get_cache_sequence(key) + and self.get_cache_sequence(key) != 0 + ): + _logger.info( + "Cache registry updated, reset fastapi_app pool for the current " + "database" + ) + self.invalidate(env) + self.set_cache_sequence(key, value) def invalidate(self, env: Environment, root_path: str | None = None) -> None: db_name = env.cr.dbname