Fix multicast context propagation race in reactor/reactive-streams#11825
Fix multicast context propagation race in reactor/reactive-streams#11825amarziali wants to merge 1 commit into
Conversation
|
@codex review |
|
Codex Review: Didn't find any major issues. Hooray! Reviewed commit: ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
|
🎯 Code Coverage (details) 🔗 Commit SHA: c77fa10 | Docs | Datadog PR Page | Give us feedback! |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 08cc9ac832
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
08cc9ac to
c77fa10
Compare
🟡 Java Benchmark SLOs — Performance SLO warning (near threshold)
PR vs. master results
Commit: Load and DaCapo benchmarks can be triggered manually in the GitLab pipeline. Results will appear in the Benchmarking Platform UI after completion. |
There was a problem hiding this comment.
Interesting approach - it does make me wonder if publisher is the ideal carrier for this context? (but on the other hand injecting into arbitrary events could quickly bloat the heap)
In the case where we use thread-confining, the first context to use the publisher wins and gets to have that carried over to the subscribing thread. Other subscribers now use their local context - that feels odd though, if they're all being triggered by the same event.
If we're expecting the same publisher to be re-used to send different events (with a 1:1 mapping of publishing event to thread) then maybe we could store a ThreadLocal to disambiguate the different events/channels? i.e. rather than have just one "winner"?
Basically I'm probing to see if we might run into situation where the same publisher instance is used by different threads to send events at the same time - with each event going to a subscriber on the same thread.
|
@mcculls thanks for raising them they are good questions. I try to quote my answer to explain the full reasoning I used
That is a fair concern. The Publisher is not ideal as a long-term carrier because it can be shared and subscribed multiple times. That is exactly what caused this race. But in this instrumentation, the publisher store is not meant to be the final storage location. It is only a temporary handoff point for cases where we have a context and a publisher, but we do not yet have the final subscriber. Once the Reactive Streams subscribe advice sees the real subscriber, it moves the context into the subscriber store: PubStore[Publisher] → temporary handoff The subscriber store is the important one for signal propagation. It is more precise because each subscriber is unique. So the publisher is used only as a short-lived bridge, not as the long-term context carrier.
I would not describe it as “first context wins.” The rule is closer to: same-thread handoff wins For the Reactor bridge, this handoff happens during subscription, not during event emission. In this case it is trying to carry context across one synchronous subscribe path (reason why we have the thread id affinity) So if another thread reads a context deposited by a different thread, that is not a valid shared event context. It is a sibling concurrent subscription seeing a shared publisher slot. In that case, falling back to the local active context is the correct behavior: Thread-54 active context = parent 57 Thread-54 ignores the foreign handoff That prevents Thread-54 from accidentally parenting its work under Thread-55.
The publisher store is only used while establishing the subscription. After the subscriber is known, the context is stored in the subscriber store. Later, when events flow through onNext, context restoration uses: SubStore[Subscriber] not: PubStore[Publisher] I indeed initially though using a ThreadLocal but we already had the PubStore that only needed to be adjusted for the reactor bridge because of this race. Note that the PubStore is also used as a temporary drop to other instrumentations (webflux, resilience4j) that needs to infer in this case a known context so the anyThread semantic is fine (it's like today). |
What Does This Do
This change fixes a race in Reactor context propagation when multiple threads concurrently subscribe to the same shared publisher, such as a multicast sink returned by connection.asFlux().
Previously, the Reactor subscribe-time handoff used a shared (Publisher, Context) slot. Since the publisher instance can be shared across concurrent subscribers, one subscribing thread could accidentally consume a context deposited by another thread. When that happened, the affected subscriber stored the wrong parent context, and later onNext restored that incorrect context, causing spans to be attached to the wrong context.
The fix changes the Reactor bridge handoff to use a HandoffContext that is stamped with the producing thread. Consumers now only adopt the publisher handoff when it was produced on the same thread. If the handoff came from a different thread, it is treated as foreign and ignored, and the subscriber falls back to its currently active context.
Ignoring a foreign-thread handoff is safe for this Reactor bridge because the handoff is only intended to bridge context within a single synchronous subscribe flow. In that flow, the producer and the legitimate consumer run on the same thread.
Note: this confinement is intentionally applied only to the Reactor bridge. Other producers, such as resilience4j, Spring WebFlux, or Spring Messaging, may create context during assembly and subscribe later on another thread. For those integrations, cross-thread handoff can be legitimate, so they continue to use anyThread.
Motivation
Additional Notes
A good way to choose between anyThread and threadConfined is to ask:
Can this same publisher be subscribed to by multiple threads at the same time?If yes, use threadConfined.
That means the publisher is shared. Since several subscribers may use the same publisher slot concurrently, we must make sure a subscriber only uses context created by its own thread. This prevents one subscriber from accidentally picking up another subscriber’s context.
If no, use anyThread.
That means the publisher is specific to one operation. There is no competing subscriber that could steal the context. However, the subscription may still happen later on a different thread, so the context must remain usable across threads.
Contributor Checklist
type:and (comp:orinst:) labels in addition to any other useful labelsclose,fix, or any linking keywords when referencing an issueUse
solvesinstead, and assign the PR milestone to the issue/merge. You can also:/merge --commit-message "..."/merge -c/merge -f --reason "reason"; please use this judiciously, as some checks do not run at the PR-level (note: the PR still needs to be mergeable, this will only skip the pre-merge build)Jira ticket: [PROJ-IDENT]