Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions backend/pycon/dbos_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Central DBOS configuration for the backend.

Single source of truth for ``DBOSConfig``. The dedicated ``dbos_worker``
management command launches DBOS with :func:`build_dbos_config`; the web tier
uses :func:`get_dbos_client` to *enqueue* workflows without ever launching DBOS.

DBOS runs alongside Celery (purely additive) and stores its workflow state in a
separate ``dbos`` database on the existing Postgres server. See SPEC.md.
"""

from __future__ import annotations

from dbos import DBOSClient, DBOSConfig
from django.conf import settings

# Name of the database-backed queue the worker dequeues from and the web tier
# enqueues onto. Centralised so producer and consumer cannot drift.
DBOS_QUEUE_NAME = "dbos_default"


def build_dbos_config() -> DBOSConfig:
"""Build the DBOSConfig from Django settings.

Used by the worker process to configure and launch DBOS. ``run_admin_server``
is disabled for now to avoid binding an extra port (see SPEC.md open items).
"""
config: DBOSConfig = {
"name": settings.DBOS_APP_NAME,
"system_database_url": settings.DBOS_SYSTEM_DATABASE_URL,
"sys_db_pool_size": settings.DBOS_SYS_DB_POOL_SIZE,
"run_admin_server": False,
}
return config


_client: DBOSClient | None = None


def get_dbos_client() -> DBOSClient:
"""Return a lazily-created, process-wide ``DBOSClient`` for the web tier.

The client enqueues workflows by name against the DBOS system database. It
never calls ``DBOS.launch()`` (only the dedicated worker process does). The
singleton is reused across calls and torn down at process exit.
"""
global _client
if _client is None:
_client = DBOSClient(
system_database_url=settings.DBOS_SYSTEM_DATABASE_URL,
)
return _client
39 changes: 39 additions & 0 deletions backend/pycon/dbos_workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""DBOS proof-of-concept workflow.

A minimal ``healthcheck`` workflow used to prove that DBOS launches and executes
a durable, idempotent workflow end to end. It is intentionally *not* wired into
any request path, signal, or Celery task — this is the single proof the
foundation slice owes (see SPEC.md). Real task migration is out of scope here.
"""

from __future__ import annotations

from dbos import DBOS

# Explicit, stable workflow name so external enqueuers (DBOSClient) reference it
# by string without depending on the function's location.
HEALTHCHECK_WORKFLOW_NAME = "healthcheck"


def _perform_check() -> str:
"""The unit of work the healthcheck performs.

Pulled out as a plain function so tests can observe how many times it runs:
re-invoking the workflow under the same workflow ID must execute this once,
proving DBOS's exactly-once / idempotent behaviour.
"""
return "ok"


@DBOS.step()
def healthcheck_step() -> str:
"""Single checkpointed step. Its result is recorded for recovery."""
return _perform_check()


@DBOS.workflow(name=HEALTHCHECK_WORKFLOW_NAME)
def healthcheck() -> str:
"""Run one step and return its result."""
result = healthcheck_step()
DBOS.logger.info(f"healthcheck workflow {DBOS.workflow_id} -> {result}")
return result
Empty file.
Empty file.
46 changes: 46 additions & 0 deletions backend/pycon/management/commands/dbos_enqueue_healthcheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Enqueue the proof-of-concept ``healthcheck`` workflow via ``DBOSClient``.

Demonstrates how the web tier submits work to DBOS without launching it: build a
client against the system database and enqueue by workflow name. With the
``dbos_worker`` process running, the workflow is dequeued, executed, and its
result returned here — proving the full round-trip. See SPEC.md (Phase B).
"""

from django.core.management.base import BaseCommand, CommandError

from dbos import DBOSClient, EnqueueOptions

from pycon.dbos_app import DBOS_QUEUE_NAME
from pycon.dbos_workflows import HEALTHCHECK_WORKFLOW_NAME


class Command(BaseCommand):
help = "Enqueue the healthcheck workflow via DBOSClient (proves the round-trip)."

def add_arguments(self, parser):
parser.add_argument(
"--workflow-id",
help="Optional explicit workflow ID (re-using one is idempotent).",
)

def handle(self, *args, **options):
from django.conf import settings

if not settings.DBOS_SYSTEM_DATABASE_URL:
raise CommandError("DBOS_SYSTEM_DATABASE_URL is not set.")

client = DBOSClient(system_database_url=settings.DBOS_SYSTEM_DATABASE_URL)
try:
enqueue_options: EnqueueOptions = {
"workflow_name": HEALTHCHECK_WORKFLOW_NAME,
"queue_name": DBOS_QUEUE_NAME,
}
if options.get("workflow_id"):
enqueue_options["workflow_id"] = options["workflow_id"]

handle = client.enqueue(enqueue_options)
self.stdout.write(f"Enqueued workflow id={handle.workflow_id}")
result = handle.get_result()
self.stdout.write(self.style.SUCCESS(f"Workflow completed -> {result!r}"))
finally:
client.destroy()
58 changes: 58 additions & 0 deletions backend/pycon/management/commands/dbos_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Run the dedicated DBOS worker process.

This launches DBOS and blocks, executing durable workflows. It plays the same
role for DBOS that the Celery worker plays for Celery, and runs alongside it.
The web tier never launches DBOS — it only enqueues via ``get_dbos_client()``.

See SPEC.md / tasks/plan.md (Phase B).
"""

import signal
import threading

from django.core.management.base import BaseCommand, CommandError

from dbos import DBOS

# Importing the workflow module registers its @DBOS.workflow / @DBOS.step
# functions before launch so the worker can execute them.
from pycon import dbos_workflows # noqa: F401
from pycon.dbos_app import DBOS_QUEUE_NAME, build_dbos_config


class Command(BaseCommand):
help = "Launch the DBOS worker (executes durable workflows). Runs alongside Celery."

def handle(self, *args, **options):
config = build_dbos_config()
if not config.get("system_database_url"):
raise CommandError(
"DBOS_SYSTEM_DATABASE_URL is not set; cannot launch the DBOS worker."
)

DBOS(config=config)
DBOS.launch()
# Register the shared queue (after launch) so this worker dequeues the
# workflows the web tier enqueues onto it.
DBOS.register_queue(DBOS_QUEUE_NAME)
self.stdout.write(
self.style.SUCCESS(
f"DBOS launched (app={config['name']}, queue={DBOS_QUEUE_NAME}). "
"Waiting for work — Ctrl-C to stop."
)
)

stop = threading.Event()

def _shutdown(signum, frame):
self.stdout.write("Received shutdown signal, stopping DBOS...")
stop.set()

signal.signal(signal.SIGTERM, _shutdown)
signal.signal(signal.SIGINT, _shutdown)

try:
stop.wait()
finally:
DBOS.destroy(workflow_completion_timeout_sec=30)
self.stdout.write("DBOS stopped.")
14 changes: 14 additions & 0 deletions backend/pycon/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@
"billing.apps.BillingConfig",
"privacy_policy.apps.PrivacyPolicyConfig",
"visa.apps.VisaConfig",
# Project package, registered as an app so its project-level management
# commands (e.g. dbos_worker) are discoverable. It has no models.
"pycon",
]

MIDDLEWARE = [
Expand Down Expand Up @@ -403,6 +406,17 @@

CELERY_TASK_IGNORE_RESULT = True

# DBOS (durable workflows). Runs alongside Celery; stores workflow state in a
# separate `dbos` database on the existing Postgres server. The dedicated
# `dbos_worker` management command launches DBOS with this config. See SPEC.md.
# NOTE: this URL uses the SQLAlchemy `postgresql://` scheme, not django-environ's
# `psql://` used by DATABASE_URL.
DBOS_APP_NAME = env("DBOS_APP_NAME", default="pycon")
DBOS_SYSTEM_DATABASE_URL = env("DBOS_SYSTEM_DATABASE_URL", default="")
# Kept low on purpose: the single Postgres instance is shared with Django,
# Celery and the DBOS web client. See SPEC.md connection-budget note.
DBOS_SYS_DB_POOL_SIZE = env.int("DBOS_SYS_DB_POOL_SIZE", default=5)

AWS_STORAGE_BUCKET_NAME = env("AWS_MEDIA_BUCKET", default=None)
AWS_REGION_NAME = AWS_SES_REGION_NAME = AWS_S3_REGION_NAME = env(
"AWS_REGION_NAME", default="eu-central-1"
Expand Down
100 changes: 100 additions & 0 deletions backend/pycon/tests/test_dbos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""In-process tests for the DBOS foundation.

These run against a dedicated ``dbos_test`` Postgres database on the same server
as the app database (auto-created if missing, reset before each test). DBOS uses
its own SQLAlchemy engine and does not touch the Django ORM, so these tests do
not request the pytest-django ``db`` fixture and are unaffected by its
transactional rollback. See SPEC.md / tasks/plan.md (Phase A).

Note: a SQLite system DB is NOT usable here — DBOS 2.24.0's migrations are raw
Postgres DDL (``EXTRACT(epoch FROM now())`` defaults, ``gen_random_uuid()``,
``JSON[]``), so SQLite cannot honour them. Hence a real Postgres test DB.
"""

import os
from urllib.parse import urlsplit, urlunsplit

import psycopg
import pytest
from django.conf import settings
from django.test import override_settings

from dbos import DBOS, DBOSConfig, SetWorkflowID

from pycon import dbos_workflows
from pycon.dbos_app import build_dbos_config


def _dbos_test_system_database_url() -> str:
"""Build the DBOS test system DB URL from Django's DB connection params.

Same server/credentials as the app database, but a dedicated ``dbos_test``
database. Overridable via ``DBOS_TEST_SYSTEM_DATABASE_URL``.
"""
override = os.environ.get("DBOS_TEST_SYSTEM_DATABASE_URL")
if override:
return override
db = settings.DATABASES["default"]
host = db.get("HOST") or "localhost"
port = db.get("PORT") or 5432
user = db.get("USER") or "postgres"
password = db.get("PASSWORD") or ""
return f"postgresql://{user}:{password}@{host}:{port}/dbos_test"


def _ensure_database(url: str) -> None:
"""Create the target database if it does not already exist."""
parsed = urlsplit(url)
target = parsed.path.lstrip("/")
admin_url = urlunsplit((parsed.scheme, parsed.netloc, "/postgres", "", ""))
with psycopg.connect(admin_url, autocommit=True) as conn:
exists = conn.execute(
"SELECT 1 FROM pg_database WHERE datname = %s", (target,)
).fetchone()
if not exists:
conn.execute(f'CREATE DATABASE "{target}"')


@pytest.fixture()
def reset_dbos():
"""Give each test a clean DBOS instance on a reset ``dbos_test`` database."""
url = _dbos_test_system_database_url()
_ensure_database(url)
DBOS.destroy(destroy_registry=False)
config: DBOSConfig = {"name": "pycon-test", "system_database_url": url}
DBOS(config=config)
DBOS.reset_system_database()
DBOS.launch()
yield
DBOS.destroy(destroy_registry=False)


def test_healthcheck_executes(reset_dbos):
assert dbos_workflows.healthcheck() == "ok"


def test_healthcheck_idempotent(reset_dbos, mocker):
spy = mocker.patch.object(dbos_workflows, "_perform_check", return_value="ok")

workflow_id = "healthcheck-idempotency-test"
with SetWorkflowID(workflow_id):
first = dbos_workflows.healthcheck()
with SetWorkflowID(workflow_id):
second = dbos_workflows.healthcheck()

assert first == second == "ok"
# Same workflow ID => DBOS returns the recorded result without re-running.
assert spy.call_count == 1


@override_settings(
DBOS_APP_NAME="pycon",
DBOS_SYSTEM_DATABASE_URL="postgresql://u:p@db:5432/dbos",
DBOS_SYS_DB_POOL_SIZE=5,
)
def test_build_dbos_config():
config = build_dbos_config()
assert config["name"] == "pycon"
assert config["system_database_url"].startswith("postgresql://")
assert config["sys_db_pool_size"] == 5
assert config["run_admin_server"] is False
1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ dependencies = [
"wagtail==7.3.2",
"wagtail-localize==1.13",
"celery==5.6.3",
"dbos==2.24.0",
"wagtail-headless-preview==0.8.0",
"Jinja2>=3.1.6",
"icalendar>=5.0.11",
Expand Down
Loading
Loading