Skip to content

Commit

Permalink
Use ephemeral lock file in versioning collection instead
Browse files Browse the repository at this point in the history
  • Loading branch information
TheByronHimes committed Jan 20, 2025
1 parent 8795ca0 commit 7f5bc50
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 64 deletions.
3 changes: 1 addition & 2 deletions services/ifrs/dev_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,5 @@ file_staged_event_topic: internal-file-registry
file_staged_event_type: file_staged_for_download
file_deleted_event_topic: internal-file-registry
file_deleted_event_type: file_deleted
lock_collection: ifrs_db_lock
db_version_collection: ifrs_db_versions
db_version_collection: ifrsDbVersions
migration_wait_sec: 10
93 changes: 33 additions & 60 deletions services/ifrs/src/ifrs/migration_logic/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""Tools to run database migrations in services"""

import logging
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager, suppress
from time import sleep, time
from typing import Any, Literal

Expand All @@ -41,11 +41,6 @@ def duration_in_ms(duration: float) -> int:
class MigrationConfig(MongoDbConfig):
"""Minimal configuration required to run the migration process."""

lock_collection: str = Field(
...,
description="The name of the collection containing the DB Lock document for this service",
examples=["ifrsLock", "dcsLock"],
)
db_version_collection: str = Field(
...,
description="The name of the collection containing DB version information for this service",
Expand Down Expand Up @@ -93,7 +88,7 @@ def __init__(
self, *, op: Literal["acquire", "release"], coll_name: str, err_info: str
):
msg = (
f"Failed to {op} the lock in DB lock collection {coll_name}."
f"Failed to {op} the lock in collection {coll_name}."
+ f" Error details:\n '{err_info}'"
)
super().__init__(msg)
Expand Down Expand Up @@ -187,8 +182,10 @@ async def __aexit__(self, exc_type_, exc_value, exc_tb):
async def _get_version_docs(self) -> list[DbVersionRecord]:
"""Gets the DB version information from the database."""
collection = self.db[self.config.db_version_collection]
version_docs = [DbVersionRecord(**doc) async for doc in collection.find()]

# use a filter to avoid picking up the lock doc, just in case
version_docs = [
DbVersionRecord(**doc) async for doc in collection.find({"_id": {"$ne": 0}})
]
return version_docs

@asynccontextmanager
Expand All @@ -199,58 +196,50 @@ async def _lock_db(self):
finally:
await self._release_db_lock()

async def _acquire_db_lock(self):
async def _acquire_db_lock(self) -> None:
"""Try to acquire the lock on the DB and return the result.
Logs and raises any error that occurs while updating the lock document.
"""
if self._lock_acquired:
log.debug("Database lock already acquired")
return

coll_name = self.config.db_version_collection
try:
lock_col = self.db[self.config.lock_collection]
lock_acquired = await lock_col.find_one_and_update(
{"lock_acquired": False},
{
"$set": {
version_coll = self.db[coll_name]
with suppress(DuplicateKeyError):
await version_coll.insert_one(
{
"_id": 0,
"lock_acquired": True,
"acquired_at": now_as_utc().isoformat(),
}
},
)
)
self._lock_acquired = True
log.info("Database lock acquired")
except BaseException as exc:
error = DbLockError(
op="acquire", coll_name=self.config.lock_collection, err_info=str(exc)
)
error = DbLockError(op="acquire", coll_name=coll_name, err_info=str(exc))
log.error(error)
raise error from exc

self._lock_acquired = bool(lock_acquired)
if self._lock_acquired:
log.info("Database lock acquired")
if not self._lock_acquired:
log.debug("Did not acquire DB lock in collection %s", coll_name)

async def _release_db_lock(self) -> None:
"""Release the DB lock.
"""Release the DB lock by deleting the lock document.
Logs and re-raises any errors that occur during the update.
"""
if not self._lock_acquired:
log.debug("Database lock already released")
return
coll_name = self.config.db_version_collection
try:
lock_col = self.db[self.config.lock_collection]
await lock_col.find_one_and_update(
{"lock_acquired": True},
{"$set": {"lock_acquired": False, "acquired_at": ""}},
)
version_coll = self.db[coll_name]
await version_coll.find_one_and_delete({"lock_acquired": True})
self._lock_acquired = False
except BaseException as exc:
error = DbLockError(
op="release",
coll_name=self.config.lock_collection,
err_info=str(exc),
)
error = DbLockError(op="release", coll_name=coll_name, err_info=str(exc))
log.critical(error)
raise error from exc
log.info("Database lock released")
Expand All @@ -272,31 +261,15 @@ async def _initialize_versioning(self) -> bool:
Returns `True` if setup was performed, else `False`.
"""
init_start = time()
lock_collection = self.db[self.config.lock_collection]
lock_doc = [_ async for _ in lock_collection.find()]
if not lock_doc:
# lock document has not been created yet, so add it
try:
await lock_collection.insert_one(
{
"_id": 0,
"lock_acquired": True,
"acquired_at": now_as_utc().isoformat(),
}
async with self._lock_db():
if self._lock_acquired:
# Initialize db version collection
await self._record_migration(
version=1,
total_duration_ms=duration_in_ms(time() - init_start),
)
except DuplicateKeyError:
# another instance inserted the doc first, so stop and wait to retry
return False

if not self._lock_acquired:
return False

# Initialize db version collection
await self._record_migration(
version=1,
total_duration_ms=duration_in_ms(time() - init_start),
)
return True
return True
return False

def _get_sequence(self, *, current_ver: int, target_ver: int) -> list[int]:
"""Return an ordered list of the version migrations to apply/unapply"""
Expand Down Expand Up @@ -362,7 +335,7 @@ async def _perform_migrations(self, *, current_ver: int):
log.critical(error)
raise error from exc

async def _migrate_db(self) -> bool | None:
async def _migrate_db(self) -> bool:
"""Ensure the database is up to date before running the actual app.
If the database is already up to date, no changes are made. If the database is
Expand Down
3 changes: 1 addition & 2 deletions services/ifrs/tests_ifrs/fixtures/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,5 @@ file_staged_event_topic: internal-file-registry
file_staged_event_type: file_staged_for_download
file_deleted_event_topic: internal-file-registry
file_deleted_event_type: file_deleted
lock_collection: ifrs_db_lock
db_version_collection: ifrs_db_versions
db_version_collection: ifrsDbVersions
migration_wait_sec: 2

0 comments on commit 7f5bc50

Please sign in to comment.