Include workflow_id in all execution WebSocket messages (CORE-198)#13684
Conversation
The frontend already stores extra_data['extra_pnginfo']['workflow']['id'] when queueing a prompt and exposes it via the /api/jobs REST endpoint, but none of the WebSocket events emitted during execution carry it. That makes it impossible to scope progress state by workflow on the client without maintaining a job_id -> workflow_id mapping that races with execution_start. This adds workflow_id alongside prompt_id on every execution event: - execution_start, execution_success, execution_error, execution_interrupted, execution_cached, executing, executed - progress and progress_state - the metadata block on PREVIEW_IMAGE_WITH_METADATA A new public extract_workflow_id helper in comfy_execution/jobs.py is the single source of truth for the lookup; the existing _extract_job_metadata delegates to it. The id is plumbed through PromptExecutor (stored as self.workflow_id and on server.last_workflow_id), the module-level execute() coroutine, the _send_cached_ui helper, and ProgressRegistry / reset_progress_state so WebUIProgressHandler can include it in progress_state and preview-image metadata. The progress hook in main.py reads server.last_workflow_id to populate the legacy 'progress' event. Tests cover the helper's edge cases (missing/non-string ids, non-dict levels) and that the WebUIProgressHandler emits workflow_id on every progress_state payload via mocked PromptServer.
…rden tests - Clear self.server.last_workflow_id and self.workflow_id in the PromptExecutor finally block so a progress callback racing with teardown can no longer attach the previous run's workflow_id to a later 'progress' event. - Include prompt_id and last_workflow_id in the reconnect 'executing' message in server.py so reconnecting clients can recover both workflow- and prompt-scoped execution state, matching the regular 'executing' payload. - Add an AST-based static guard that walks execution.py, main.py, and comfy_execution/progress.py and asserts every dict literal carrying prompt_id also carries workflow_id. Also add a unit test covering PREVIEW_IMAGE_WITH_METADATA metadata. Together these regression-test every emitter (execution_start/success/error/interrupted/cached, executing, executed, progress, progress_state, preview metadata) without requiring a GPU-backed import of execution.py.
|
ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (3)
📝 WalkthroughWalkthroughThis PR adds workflow_id tracking throughout the node execution pipeline. It extracts workflow_id from request metadata via a new 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@main.py`:
- Line 325: The "executing" reset uses server_instance.last_workflow_id after
e.execute(...) returns, but the executor clears last_workflow_id in finally
which can emit workflow_id: None; capture the workflow id from the
prompt/context before calling e.execute (e.g., read getattr(server_instance,
'last_workflow_id', None) or the prompt's workflow id into a local variable like
workflow_id_for_prompt) and pass that local value into
server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id,
"workflow_id": workflow_id_for_prompt}, server_instance.client_id) so the
per-item workflow id is preserved even if last_workflow_id is cleared in
finally.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: d7664042-4cf3-4d37-99a9-960bc31c56e7
📒 Files selected for processing (7)
comfy_execution/jobs.pycomfy_execution/progress.pyexecution.pymain.pyserver.pytests-unit/execution_test/test_workflow_id_in_ws_messages.pytests/execution/test_jobs.py
|
Thanks for the automated walkthrough — replying re: the docstring-coverage warning. The 9.62% is computed across the whole repo, but the surfaces this PR actually adds or modifies already match (or exceed) the local convention for each touched file:
The 80% threshold isn't a Comfy-Org convention — the affected files (especially I'd rather leave docstrings as-is here to keep the diff focused on the actual behavior change. Happy to revisit if a maintainer disagrees and wants this PR to add docstrings beyond its scope. |
The previous commit cleared server.last_workflow_id in PromptExecutor's finally block to prevent racing progress callbacks from leaking stale ids. That fix introduced a new bug: prompt_worker in main.py reads server.last_workflow_id *after* e.execute() returns to populate the terminal 'executing' (node=None) message that tells the client a prompt is done. Because the executor has already cleared the field by then, this message always emitted workflow_id=None. Capture the workflow id from extra_data into a local variable before calling e.execute(), and use that local in the send_sync() call so the per-prompt id is preserved through the executor's finally cleanup. Adds an AST-based regression test asserting that prompt_worker extracts workflow_id locally and that the terminal 'executing' send_sync never reads server.last_workflow_id. Verified the test fails on the buggy revision and passes after the fix.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests-unit/execution_test/test_workflow_id_in_ws_messages.py`:
- Around line 138-157: The tests read source files with
Path(file_path).read_text(), which relies on the current working directory and
makes the tests flaky; update the reader to resolve paths relative to the test
module by deriving a base_dir = Path(__file__).resolve().parent (or an
appropriate ancestor) and then use (base_dir / file_path).read_text(); apply
this change in _assert_workflow_id_in_every_prompt_id_dict and the other similar
helper calls (the other occurrences around the second helper/test block) so all
file reads are deterministic regardless of CWD.
- Around line 265-287: The test currently only asserts that the payload dict for
send_sync("executing", ...) does not reference server.last_workflow_id but
silently passes if no matching send call is found; modify the test loop that
walks the AST (ast.walk(worker)) and inspects ast.Call nodes invoking .send_sync
with first arg "executing" to set a boolean flag (e.g., found_executing_send)
when a match is encountered, keep the existing payload check for workflow_id vs
last_workflow_id using ast.unparse(value), and after the loop assert that
found_executing_send is True (fail the test if no matching
send_sync("executing", ...) call was found) so the regression is caught when the
send is refactored to use a local variable.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: bc33d301-89e6-4cda-8069-de3e57d6c5a0
📒 Files selected for processing (2)
main.pytests-unit/execution_test/test_workflow_id_in_ws_messages.py
🚧 Files skipped from review as they are similar to previous changes (1)
- main.py
Two robustness fixes to the workflow_id regression tests:
1. Resolve source-file paths relative to __file__ (repo_root =
Path(__file__).resolve().parents[2]) instead of bare
Path("main.py"). The bare form depends on the process CWD being the
repo root, which made the tests flaky when run from tests-unit/, an
IDE runner, or any other directory. Verified passing from three
different CWDs (repo root, tests-unit/, /tmp).
2. Track matched_terminal_executing_send and assert it after the AST
walk in TestTerminalExecutingResetInMainPy. Previously, if someone
refactored the send_sync("executing", {...}) call to pass a local
variable instead of an inline dict literal, the loop would simply
find no match and the test would pass vacuously, losing its
regression-protection value. Verified by temporarily refactoring
main.py to use a variable payload: the test now fails with a clear
message asking to update the regression test, and passes again once
the inline dict is restored.
|
lgtm, just needs @guill 's blessing now |
|
Thanks for the review, @Kosinkadink! Will leave the draft state until @guill takes a look. |
|
Thanks @guill! With both code-owner approvals in place, this is ready to land whenever a maintainer wants to merge. |
| self.status_messages = [] | ||
| self.add_message("execution_start", { "prompt_id": prompt_id}, broadcast=False) | ||
| self.workflow_id = extract_workflow_id(extra_data) | ||
| self.server.last_workflow_id = self.workflow_id |
There was a problem hiding this comment.
I slightly question whether the server is the right place to put this data, but I don't have any other ideas. This is probably fine as-is -- we can always move it later.
There was a problem hiding this comment.
Agreed — server.last_workflow_id is mirroring the existing server.last_node_id / server.last_prompt_id pattern that hijack_progress in main.py reads from, mostly because the progress callback fires from inside the executor and needs some well-known place to pick up the current workflow id. A cleaner home would be a per-execution context object (something like a RunContext passed through the progress handler chain), but that's a larger refactor of how hijack_progress and the WebUI handler share state.
Happy to do that as a follow-up if you want it scoped — keeping it on the server here mainly to avoid expanding the PR's blast radius beyond the workflow_id plumbing itself.
Adds an `extra_data.metadata` envelope on POST /prompt body. The backend (Comfy-Org/ComfyUI#13979) treats the dict as opaque and merges every key/value at the top level of dict-typed WebSocket frames emitted during the prompt's execution. For now the envelope only carries `workflow_id` (sourced from the ComfyWorkflowJSON the user is queueing). The server has no awareness of what `workflow_id` means — it's just one key in a generic tag dict, and adding new tags later requires no backend change. This restores the wire shape from the reverted Comfy-Org/ComfyUI#13684 (top-level `workflow_id` on WS frames) without coupling the core execution layer to workflow concepts. Fixes FE-745
PR Created by the Glary-Bot Agent
Why
The frontend already stores
extra_data["extra_pnginfo"]["workflow"]["id"]when queueing a prompt and surfaces it viaGET /api/jobs, but none of the WebSocket events emitted during execution carry it. As a result, frontends that need to scope progress state by workflow (multi-tab UIs, the Cloud frontend, etc.) have to maintain aprompt_id → workflow_idmapping that races withexecution_startand silently leaks state across workflows when terminal frames are dropped.This PR plumbs the existing workflow id through the executor and adds it alongside
prompt_idon every execution event so clients can scope state purely from the message payload.What changes
Single source of truth for workflow id
comfy_execution/jobs.pygains a publicextract_workflow_id(extra_data)helper that returnsNonefor any missing or non-string value. The existing_extract_job_metadata(used by/api/jobs) now delegates to it so REST and WS share one extraction path.Plumbing
PromptExecutor.execute_asyncextracts the id once and stores it asself.workflow_idandself.server.last_workflow_id.execute()coroutine and_send_cached_ui()take an explicitworkflow_idargument.ProgressRegistry.__init__andreset_progress_state()accept an optionalworkflow_id, whichWebUIProgressHandlerreads when emittingprogress_stateandPREVIEW_IMAGE_WITH_METADATA.progressevent inmain.pyand the post-completionexecutingreset both pull fromserver.last_workflow_id.WebSocket events that now include
workflow_idexecution_start,execution_success,execution_error,execution_interrupted,execution_cachedexecuting,executedprogress,progress_statePREVIEW_IMAGE_WITH_METADATAHygiene
self.workflow_idandself.server.last_workflow_idare cleared in thefinallyblock ofexecute_asyncso a progress callback racing with teardown cannot attach a previous run's workflow id to a later event.server.py's/wshandler now sendsprompt_idandworkflow_idalongsidenode, matching the regularexecutingpayload so reconnecting clients can recover both prompt- and workflow-scoped state.Backward compatibility
This is strictly additive. Every existing field is preserved; clients that don't read
workflow_idare unaffected. When a prompt was queued without anextra_pnginfo.workflow.id(programmatic API usage, older frontends), the field is sent asnullrather than omitted, so consumers see a consistent shape.Tests
tests/execution/test_jobs.py: 10 new cases coveringextract_workflow_idedge cases (missing/non-dict/non-string/empty values).tests-unit/execution_test/test_workflow_id_in_ws_messages.py(new):PromptServertests assertprogress_statepayloads includeworkflow_idat both the top level and per-node, plus thePREVIEW_IMAGE_WITH_METADATAmetadata.ProgressRegistryandreset_progress_statepropagation tests.execution.py,main.py, andcomfy_execution/progress.pyand fails if any dict literal carryingprompt_idis missingworkflow_id. This coversexecution_start/_success/_error/_interrupted/_cached/executing/executed/progresswithout needing a GPU-backed import ofexecution.pyand serves as a regression net for future refactors.Test counts:
pytest tests/execution/test_jobs.py→ 58 passed (was 48)pytest tests-unit/(excluding the GPU-onlypreview_method_override_test.py) → 734 passed, 0 failedruff checkon every touched file → cleanManual verification
Confirmed the backend boots cleanly with
--cpu --disable-all-custom-nodesand/system_statsresponds. Live end-to-end ws capture against a real prompt was not run because the sandbox has no checkpoints / sample workflow handy and no GPU. The AST static guard plus the mock-server payload assertions cover every event listed above deterministically.