Skip to content

Fix multicast context propagation race in reactor/reactive-streams#11825

Open
amarziali wants to merge 1 commit into
masterfrom
andrea.marziali/reactor-threadconfined
Open

Fix multicast context propagation race in reactor/reactive-streams#11825
amarziali wants to merge 1 commit into
masterfrom
andrea.marziali/reactor-threadconfined

Conversation

@amarziali

@amarziali amarziali commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

What Does This Do

This change fixes a race in Reactor context propagation when multiple threads concurrently subscribe to the same shared publisher, such as a multicast sink returned by connection.asFlux().

Previously, the Reactor subscribe-time handoff used a shared (Publisher, Context) slot. Since the publisher instance can be shared across concurrent subscribers, one subscribing thread could accidentally consume a context deposited by another thread. When that happened, the affected subscriber stored the wrong parent context, and later onNext restored that incorrect context, causing spans to be attached to the wrong context.

The fix changes the Reactor bridge handoff to use a HandoffContext that is stamped with the producing thread. Consumers now only adopt the publisher handoff when it was produced on the same thread. If the handoff came from a different thread, it is treated as foreign and ignored, and the subscriber falls back to its currently active context.

Ignoring a foreign-thread handoff is safe for this Reactor bridge because the handoff is only intended to bridge context within a single synchronous subscribe flow. In that flow, the producer and the legitimate consumer run on the same thread.

Note: this confinement is intentionally applied only to the Reactor bridge. Other producers, such as resilience4j, Spring WebFlux, or Spring Messaging, may create context during assembly and subscribe later on another thread. For those integrations, cross-thread handoff can be legitimate, so they continue to use anyThread.

Motivation

Additional Notes

A good way to choose between anyThread and threadConfined is to ask: Can this same publisher be subscribed to by multiple threads at the same time?

If yes, use threadConfined.

That means the publisher is shared. Since several subscribers may use the same publisher slot concurrently, we must make sure a subscriber only uses context created by its own thread. This prevents one subscriber from accidentally picking up another subscriber’s context.

If no, use anyThread.

That means the publisher is specific to one operation. There is no competing subscriber that could steal the context. However, the subscription may still happen later on a different thread, so the context must remain usable across threads.

Contributor Checklist

  • Format the title according to the contribution guidelines
  • Assign the type: and (comp: or inst:) labels in addition to any other useful labels
  • Avoid using close, fix, or any linking keywords when referencing an issue
    Use solves instead, and assign the PR milestone to the issue
  • Update the CODEOWNERS file on source file addition, migration, or deletion
  • Update public documentation with any new configuration flags or behaviors
  • Add your completed PR to the merge queue by commenting /merge. You can also:
    • Customize the commit message associated with the merge with /merge --commit-message "..."
    • Remove your PR from the merge queue with /merge -c
    • Skip all merge queue checks with /merge -f --reason "reason"; please use this judiciously, as some checks do not run at the PR-level (note: the PR still needs to be mergeable, this will only skip the pre-merge build)
    • Get more information in this doc

Jira ticket: [PROJ-IDENT]

@amarziali amarziali added type: bug Bug report and fix inst: reactor Reactor instrumentation labels Jul 1, 2026
@amarziali

Copy link
Copy Markdown
Contributor Author

@codex review

@chatgpt-codex-connector

Copy link
Copy Markdown

Codex Review: Didn't find any major issues. Hooray!

Reviewed commit: 08cc9ac832

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@datadog-prod-us1-6

datadog-prod-us1-6 Bot commented Jul 1, 2026

Copy link
Copy Markdown

🎯 Code Coverage (details)
Patch Coverage: 0.00%
Overall Coverage: 60.78% (+3.85%)

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: c77fa10 | Docs | Datadog PR Page | Give us feedback!

@amarziali amarziali marked this pull request as ready for review July 1, 2026 10:06
@amarziali amarziali requested review from a team as code owners July 1, 2026 10:06
@amarziali amarziali requested review from jordan-wong and mcculls and removed request for a team July 1, 2026 10:06

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 08cc9ac832

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@amarziali amarziali force-pushed the andrea.marziali/reactor-threadconfined branch from 08cc9ac to c77fa10 Compare July 1, 2026 22:03
@dd-octo-sts

dd-octo-sts Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

🟡 Java Benchmark SLOs — Performance SLO warning (near threshold)

Suite Status
Startup 🟡 warning

SLO thresholds are defined here based on automatically generated metrics. A warning is raised when results are within 5% of the threshold.

PR vs. master results
Scenario Candidate master Δ (95% CI of mean)
startup:insecure-bank:iast:Agent 13.98 s 13.94 s [-0.5%; +1.0%] (no difference)
startup:insecure-bank:tracing:Agent 12.95 s 13.01 s [-1.1%; +0.2%] (no difference)
startup:petclinic:appsec:Agent 17.37 s 17.11 s [+0.4%; +2.6%] (maybe worse)
startup:petclinic:iast:Agent 17.33 s 16.94 s [-2.1%; +6.7%] (no difference)
startup:petclinic:profiling:Agent 17.50 s 17.34 s [-0.2%; +2.1%] (no difference)
startup:petclinic:sca:Agent 17.44 s 17.35 s [-0.3%; +1.3%] (no difference)
startup:petclinic:tracing:Agent 16.54 s 16.18 s [-2.1%; +6.6%] (no difference)

Commit: c77fa108 · CI Pipeline · Benchmarking Platform UI


Load and DaCapo benchmarks can be triggered manually in the GitLab pipeline. Results will appear in the Benchmarking Platform UI after completion.

@mcculls mcculls left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting approach - it does make me wonder if publisher is the ideal carrier for this context? (but on the other hand injecting into arbitrary events could quickly bloat the heap)

In the case where we use thread-confining, the first context to use the publisher wins and gets to have that carried over to the subscribing thread. Other subscribers now use their local context - that feels odd though, if they're all being triggered by the same event.

If we're expecting the same publisher to be re-used to send different events (with a 1:1 mapping of publishing event to thread) then maybe we could store a ThreadLocal to disambiguate the different events/channels? i.e. rather than have just one "winner"?

Basically I'm probing to see if we might run into situation where the same publisher instance is used by different threads to send events at the same time - with each event going to a subscriber on the same thread.

@amarziali

Copy link
Copy Markdown
Contributor Author

@mcculls thanks for raising them they are good questions. I try to quote my answer to explain the full reasoning I used

Interesting approach - it does make me wonder if publisher is the ideal carrier for this context? (but on the other hand injecting into arbitrary events could quickly bloat the heap)

That is a fair concern. The Publisher is not ideal as a long-term carrier because it can be shared and subscribed multiple times. That is exactly what caused this race.

But in this instrumentation, the publisher store is not meant to be the final storage location. It is only a temporary handoff point for cases where we have a context and a publisher, but we do not yet have the final subscriber.

Once the Reactive Streams subscribe advice sees the real subscriber, it moves the context into the subscriber store:

PubStore[Publisher] → temporary handoff
SubStore[Subscriber] → final per-subscriber context

The subscriber store is the important one for signal propagation. It is more precise because each subscriber is unique.

So the publisher is used only as a short-lived bridge, not as the long-term context carrier.

In the case where we use thread-confining, the first context to use the publisher wins and gets to have that carried over to the subscribing thread. Other subscribers now use their local context - that feels odd though, if they're all being triggered by the same event.

I would not describe it as “first context wins.” The rule is closer to:

same-thread handoff wins
foreign-thread handoff is ignored
otherwise use the local active context

For the Reactor bridge, this handoff happens during subscription, not during event emission. In this case it is trying to carry context across one synchronous subscribe path (reason why we have the thread id affinity)

So if another thread reads a context deposited by a different thread, that is not a valid shared event context. It is a sibling concurrent subscription seeing a shared publisher slot.

In that case, falling back to the local active context is the correct behavior:

Thread-54 active context = parent 57
PubStore[sink] = context from Thread-55

Thread-54 ignores the foreign handoff
Thread-54 stores parent 57 for its own subscriber

That prevents Thread-54 from accidentally parenting its work under Thread-55.

If we're expecting the same publisher to be re-used to send different events (with a 1:1 mapping of publishing event to thread) then maybe we could store a ThreadLocal to disambiguate the different events/channels? i.e. rather than have just one "winner"?

The publisher store is only used while establishing the subscription. After the subscriber is known, the context is stored in the subscriber store. Later, when events flow through onNext, context restoration uses:

SubStore[Subscriber]

not:

PubStore[Publisher]

I indeed initially though using a ThreadLocal but we already had the PubStore that only needed to be adjusted for the reactor bridge because of this race. Note that the PubStore is also used as a temporary drop to other instrumentations (webflux, resilience4j) that needs to infer in this case a known context so the anyThread semantic is fine (it's like today).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

inst: reactor Reactor instrumentation type: bug Bug report and fix

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants