From 821d48ee2c614e1ba06000625ed21fac607764d4 Mon Sep 17 00:00:00 2001 From: betegon Date: Thu, 4 Jun 2026 12:12:40 +0200 Subject: [PATCH] fix(init): avoid stale resume replays --- src/lib/init/types.ts | 1 + src/lib/init/wizard-runner.ts | 392 ++++++++++++++++++++-------- test/lib/init/wizard-runner.test.ts | 296 ++++++++++++++++++--- 3 files changed, 553 insertions(+), 136 deletions(-) diff --git a/src/lib/init/types.ts b/src/lib/init/types.ts index e59b2a549..d81acf506 100644 --- a/src/lib/init/types.ts +++ b/src/lib/init/types.ts @@ -272,6 +272,7 @@ export type SuspendPayload = ToolPayload | InteractivePayload; export type WorkflowRunResult = { status: "suspended" | "success" | "failed"; suspended?: string[][]; + activeStepsPath?: Record; steps?: Record; suspendPayload?: unknown; result?: WizardOutput; diff --git a/src/lib/init/wizard-runner.ts b/src/lib/init/wizard-runner.ts index 6927f2336..506eef805 100644 --- a/src/lib/init/wizard-runner.ts +++ b/src/lib/init/wizard-runner.ts @@ -12,6 +12,7 @@ */ import { randomBytes } from "node:crypto"; +import { isDeepStrictEqual } from "node:util"; import { MastraClient } from "@mastra/client-js"; import { @@ -54,6 +55,8 @@ import { describeTool, executeTool } from "./tools/registry.js"; import type { ResolvedInitContext, SuspendPayload, + ToolPayload, + ToolResult, WizardOptions, WorkflowRunResult, } from "./types.js"; @@ -68,6 +71,16 @@ import { type SpinState = { running: boolean }; +const APPLY_CODEMODS_STEP = "apply-codemods"; + +type CompactPhaseHistoryEntry = { + ok: boolean; + operation: ToolPayload["operation"]; + _phase: string; + error?: string; + data?: { files: Record }; +}; + type StepContext = { payload: SuspendPayload; stepId: string; @@ -87,6 +100,46 @@ function nextPhase( return names[Math.min(phase - 1, names.length - 1)] ?? "done"; } +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function filePathMarkersForHistory( + data: unknown +): Record | undefined { + if (!(isRecord(data) && isRecord(data.files))) { + return; + } + + return Object.fromEntries( + Object.keys(data.files).map((path) => [path, null]) + ); +} + +/** + * Keep `_prevPhases` useful to apply-codemods without resending prior file contents. + * The server only needs prior errors and read file paths for retry/replan decisions. + */ +function summarizeToolPhaseForHistory( + payload: ToolPayload, + phase: string, + result: ToolResult +): CompactPhaseHistoryEntry { + const summary: CompactPhaseHistoryEntry = { + ok: result.ok, + operation: payload.operation, + _phase: phase, + }; + if (result.error) { + summary.error = result.error; + } + const files = filePathMarkersForHistory(result.data); + if (files) { + summary.data = { files }; + } + return summary; +} + /** * Truncate a spinner message to fit within the terminal width. * Leaves room for the spinner character and padding. @@ -156,7 +209,7 @@ function describePostTool(payload: SuspendPayload): string | undefined { async function handleSuspendedStep( ctx: StepContext, stepPhases: Map, - stepHistory: Map[]> + stepHistory: Map ): Promise> { const { payload, stepId, spin, spinState, context, ui } = ctx; const label = STEP_LABELS[stepId] ?? stepId; @@ -203,15 +256,26 @@ async function handleSuspendedStep( ui.markFilesAnalyzed?.(payload.params.paths); } + const phase = nextPhase(stepPhases, stepId, [ + "read-files", + "analyze", + "done", + ]); const history = stepHistory.get(stepId) ?? []; - history.push(toolResult); + const previousPhases = history.slice(); + history.push(summarizeToolPhaseForHistory(payload, phase, toolResult)); stepHistory.set(stepId, history); - return { + const resumeData: Record = { ...toolResult, - _phase: nextPhase(stepPhases, stepId, ["read-files", "analyze", "done"]), - _prevPhases: history.slice(0, -1), + _phase: phase, }; + if (stepId === APPLY_CODEMODS_STEP) { + // apply-codemods uses prior failures and read paths to repair failed patches. + // Other steps do not need phase history, so skip it to avoid payload growth. + resumeData._prevPhases = previousPhases; + } + return resumeData; } if (payload.type === "interactive") { @@ -279,6 +343,12 @@ function assertWorkflowResult(raw: unknown): WorkflowRunResult { ) { throw new Error(`Unexpected workflow status: ${String(obj.status)}`); } + if (isRecord(obj.activeStepsPath)) { + const activeStepIds = Object.keys(obj.activeStepsPath); + if (activeStepIds.length > 0) { + obj.suspended = activeStepIds.map((id) => [id]); + } + } return obj as WorkflowRunResult; } @@ -435,9 +505,9 @@ async function preamble( return true; } -const MAX_RESUME_RETRIES = 3; -const RETRY_BACKOFF_MS = [2000, 4000, 8000]; -const RUN_STATE_RECOVERY_BACKOFF_MS = [0, 250, 750, 1500]; +const RUN_STATE_RECOVERY_INITIAL_BACKOFF_MS = [0, 250, 750, 1500]; +const RUN_STATE_RECOVERY_POLL_MS = 3000; +const RUN_STATE_RECOVERY_MAX_WAIT_MS = 120_000; const RUN_STATE_RECOVERY_TIMEOUT_MS = 10_000; type ResumeRetryArgs = { @@ -449,6 +519,7 @@ type ResumeRetryArgs = { runById: (runId: string, opts?: { fields?: string[] }) => Promise; }; stepId: string; + payload: SuspendPayload; resumeData: Record; tracingOptions: Record; spin: SpinnerHandle; @@ -456,7 +527,7 @@ type ResumeRetryArgs = { }; /** - * Detect Mastra's "not suspended" 500 — means the server already + * Detect Mastra's "not suspended" conflict — means the server already * processed this step (our previous request succeeded but the response was * dropped before we received it). The MastraClientError message embeds the * server body, e.g.: @@ -468,116 +539,182 @@ function isStepAlreadyAdvancedError(err: unknown): boolean { return err instanceof Error && err.message.includes("was not suspended"); } +function httpStatus(err: unknown): number | undefined { + if (!isRecord(err)) { + return; + } + return typeof err.status === "number" ? err.status : undefined; +} + +function runStateRecoveryBackoffMs(): number[] { + const delays = [...RUN_STATE_RECOVERY_INITIAL_BACKOFF_MS]; + let totalWaitMs = delays.reduce((total, delayMs) => total + delayMs, 0); + while (totalWaitMs < RUN_STATE_RECOVERY_MAX_WAIT_MS) { + const delayMs = Math.min( + RUN_STATE_RECOVERY_POLL_MS, + RUN_STATE_RECOVERY_MAX_WAIT_MS - totalWaitMs + ); + delays.push(delayMs); + totalWaitMs += delayMs; + } + return delays; +} + +function isRecoverableRunState( + result: WorkflowRunResult, + resumedStepId: string, + resumedPayload: SuspendPayload +): boolean { + if (result.status !== "suspended") { + return true; + } + + const recovered = extractSuspendPayload(result, resumedStepId); + if (!recovered) { + return false; + } + + return !( + recovered.stepId === resumedStepId && + isDeepStrictEqual(recovered.payload, resumedPayload) + ); +} + /** - * Recover from a stale-step retry by fetching the current run state. + * Recover from stale or ambiguous resume failures by fetching the current run state. * If the workflow has already advanced (e.g. plan-codemods is now suspended), * the returned WorkflowRunResult lets the main loop continue from the right step. */ async function tryRecoverCurrentRunState( workflow: ResumeRetryArgs["workflow"], - runId: string + runId: string, + resumedStepId: string, + resumedPayload: SuspendPayload ): Promise { - for (const delayMs of RUN_STATE_RECOVERY_BACKOFF_MS) { + const deadlineAt = Date.now() + RUN_STATE_RECOVERY_MAX_WAIT_MS; + for (const delayMs of runStateRecoveryBackoffMs()) { + const remainingMs = deadlineAt - Date.now(); + if (remainingMs <= 0) { + return null; + } if (delayMs > 0) { - await new Promise((resolve) => setTimeout(resolve, delayMs)); + await new Promise((resolve) => + setTimeout(resolve, Math.min(delayMs, remainingMs)) + ); + } + const timeoutMs = Math.min( + RUN_STATE_RECOVERY_TIMEOUT_MS, + deadlineAt - Date.now() + ); + if (timeoutMs <= 0) { + return null; } try { const raw = await withTimeout( workflow.runById(runId, { - fields: ["steps", "activeStepsPath", "result"], + fields: [ + "status", + "suspended", + "steps", + "activeStepsPath", + "suspendPayload", + "result", + "error", + ], }), - RUN_STATE_RECOVERY_TIMEOUT_MS, + timeoutMs, "Run state recovery" ); - // runById returns activeStepsPath (Record) but - // not suspended (string[][]). The main loop reads result.suspended to - // find the active step; without it, stepId falls back to "unknown" and - // extractSuspendPayload iterates all steps — picking the first with any - // suspendPayload, which could be a completed step with stale D1 data. - // Derive suspended from the activeStepsPath keys so the lookup is - // deterministic: those keys are exactly the currently-active step IDs. - const state = raw as Record; - if (!state.suspended && state.activeStepsPath) { - state.suspended = Object.keys( - state.activeStepsPath as Record - ).map((id) => [id]); + const result = assertWorkflowResult(raw); + if (isRecoverableRunState(result, resumedStepId, resumedPayload)) { + return result; } - return assertWorkflowResult(state); } catch { // Mastra/D1 can briefly return a not-yet-readable or intermediate run - // state immediately after rejecting a stale resume. Poll a few times - // before surfacing the original 500 to the user. + // state while the original resume request is still running. Keep + // observing run state instead of replaying a non-idempotent resume. } } return null; } -// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: retry loop branches across transient errors, stale-step recovery, and backoff -async function resumeWithRetry( +async function resumeWithRecovery( args: ResumeRetryArgs ): Promise { - const { run, workflow, stepId, resumeData, tracingOptions, spin, ui } = args; - let lastError: unknown; - for (let attempt = 0; attempt <= MAX_RESUME_RETRIES; attempt++) { - try { - if (attempt > 0) { - ui.setOverlay?.({ - kind: "health", - message: "Connection interrupted, retrying...", - retryCount: attempt, - }); - await new Promise((r) => - setTimeout(r, RETRY_BACKOFF_MS[attempt - 1] ?? 8000) - ); - } - const raw = await withTimeout( - run.resumeAsync({ step: stepId, resumeData, tracingOptions }), - API_TIMEOUT_MS, - "Workflow resume" + const { + run, + workflow, + stepId, + payload, + resumeData, + tracingOptions, + spin, + ui, + } = args; + try { + const raw = await withTimeout( + run.resumeAsync({ step: stepId, resumeData, tracingOptions }), + API_TIMEOUT_MS, + "Workflow resume" + ); + return assertWorkflowResult(raw); + } catch (err) { + if (isStepAlreadyAdvancedError(err)) { + spin.message("Reconnecting..."); + const recovered = await tryRecoverCurrentRunState( + workflow, + run.runId, + stepId, + payload ); - if (attempt > 0) { - ui.clearOverlay?.(); - } - return assertWorkflowResult(raw); - } catch (err) { - lastError = err; - // "Step not suspended" means the server processed our step but the - // response was dropped (network blip, CF response timeout, etc.). - // Retrying the same step will always 500. Fetch the current run state - // so the main loop can continue from whichever step is actually suspended. - if (isStepAlreadyAdvancedError(err)) { - ui.clearOverlay?.(); - spin.message("Reconnecting..."); - const recovered = await tryRecoverCurrentRunState(workflow, run.runId); - if (recovered) { - addBreadcrumb({ - category: "wizard", - message: `stale-step recovery succeeded for ${stepId}`, - level: "info", - data: { stepId, runId: run.runId }, - }); - return recovered; - } - // Recovery failed — the step is confirmed not suspended and retrying - // it will always 500. Throw immediately instead of wasting 14s. - captureException(err, { - level: "warning", - tags: { - "wizard.stale_step_recovery": "failed", - "wizard.resume_step": stepId, - }, - extra: { runId: run.runId }, + if (recovered) { + addBreadcrumb({ + category: "wizard", + message: `stale-step recovery succeeded for ${stepId}`, + level: "info", + data: { stepId, runId: run.runId }, }); - throw err; - } - if (attempt === MAX_RESUME_RETRIES) { - ui.clearOverlay?.(); - throw err; + return recovered; } + captureException(err, { + level: "warning", + tags: { + "wizard.stale_step_recovery": "failed", + "wizard.resume_step": stepId, + }, + extra: { runId: run.runId }, + }); + throw err; } + + if (httpStatus(err) !== undefined) { + throw err; + } + + ui.setOverlay?.({ + kind: "health", + message: "Connection interrupted, reconnecting...", + retryCount: 1, + }); + spin.message("Reconnecting..."); + const recovered = await tryRecoverCurrentRunState( + workflow, + run.runId, + stepId, + payload + ); + ui.clearOverlay?.(); + if (recovered) { + addBreadcrumb({ + category: "wizard", + message: `resume state recovery succeeded for ${stepId}`, + level: "info", + data: { stepId, runId: run.runId }, + }); + return recovered; + } + throw err; } - ui.clearOverlay?.(); - throw lastError; } // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: sequential wizard orchestration with error handling branches @@ -657,6 +794,7 @@ export async function runWizard(initialOptions: WizardOptions): Promise { const client = new MastraClient({ baseUrl: MASTRA_API_URL, + retries: 0, headers: token ? { Authorization: `Bearer ${token}` } : {}, abortSignal: abortController.signal, fetch: ((url, init) => { @@ -732,7 +870,7 @@ export async function runWizard(initialOptions: WizardOptions): Promise { } const stepPhases = new Map(); - const stepHistory = new Map[]>(); + const stepHistory = new Map(); // Track which step the runner is currently suspended on so the // sidebar checklist can flip rows as the workflow advances. A @@ -791,10 +929,11 @@ export async function runWizard(initialOptions: WizardOptions): Promise { stepHistory ); - result = await resumeWithRetry({ + result = await resumeWithRecovery({ run, workflow, stepId: extracted.stepId, + payload: extracted.payload, resumeData, tracingOptions, spin, @@ -922,28 +1061,73 @@ function mapWorkflowExitCode(workflowCode: number | undefined): number { } } -function extractSuspendPayload( +function activeStepIdsFor(result: WorkflowRunResult, stepId: string): string[] { + const activeStepsPathIds = Object.keys(result.activeStepsPath ?? {}); + if (activeStepsPathIds.length > 0) { + return activeStepsPathIds; + } + + const ids = new Set(); + for (const path of result.suspended ?? []) { + const id = path.at(-1); + if (id) { + ids.add(id); + } + } + if (ids.size === 0 && stepId !== "unknown") { + ids.add(stepId); + } + return [...ids]; +} + +function extractSuspendPayloadFromStep( result: WorkflowRunResult, stepId: string ): { payload: SuspendPayload; stepId: string } | undefined { const stepPayload = result.steps?.[stepId]?.suspendPayload; - if (stepPayload) { - return { payload: assertSuspendPayload(stepPayload), stepId }; - } - - if (result.suspendPayload) { - return { payload: assertSuspendPayload(result.suspendPayload), stepId }; + if (!stepPayload) { + return; } + return { payload: assertSuspendPayload(stepPayload), stepId }; +} - for (const key of Object.keys(result.steps ?? {})) { - const step = result.steps?.[key]; - if (step?.suspendPayload) { +function extractSuspendPayload( + result: WorkflowRunResult, + stepId: string +): { payload: SuspendPayload; stepId: string } | undefined { + const activeStepIds = activeStepIdsFor(result, stepId); + if (activeStepIds.length > 0) { + for (const activeStepId of activeStepIds) { + const extracted = extractSuspendPayloadFromStep(result, activeStepId); + if (extracted) { + return extracted; + } + } + if (result.suspendPayload) { return { - payload: assertSuspendPayload(step.suspendPayload), - stepId: key, + payload: assertSuspendPayload(result.suspendPayload), + stepId: activeStepIds[0] ?? stepId, }; } + return; + } + + if (result.suspendPayload) { + return { payload: assertSuspendPayload(result.suspendPayload), stepId }; } - return; + const payloadEntries = Object.entries(result.steps ?? {}).filter( + ([, entry]) => entry.suspendPayload + ); + if (payloadEntries.length !== 1) { + return; + } + const [payloadStepId, step] = payloadEntries[0] as [ + string, + { suspendPayload: unknown }, + ]; + return { + payload: assertSuspendPayload(step.suspendPayload), + stepId: payloadStepId, + }; } diff --git a/test/lib/init/wizard-runner.test.ts b/test/lib/init/wizard-runner.test.ts index a47d5f408..3860a4615 100644 --- a/test/lib/init/wizard-runner.test.ts +++ b/test/lib/init/wizard-runner.test.ts @@ -114,7 +114,8 @@ let stderrSpy: ReturnType; * runWizard. Used by the MastraClient lifecycle suite to assert that the * `abortSignal` passed at construction time is aborted on teardown. */ -let capturedClientOptions: { abortSignal?: AbortSignal }[] = []; +let capturedClientOptions: { abortSignal?: AbortSignal; retries?: number }[] = + []; let savedPlainOutput: string | undefined; @@ -240,13 +241,19 @@ beforeEach(() => { // `this` is the MastraClient instance. `BaseResource.options` holds the // full ClientOptions passed to the constructor — including abortSignal. capturedClientOptions.push( - (this as unknown as { options: { abortSignal?: AbortSignal } }).options + ( + this as unknown as { + options: { abortSignal?: AbortSignal; retries?: number }; + } + ).options ); return workflow as any; }); }); afterEach(() => { + vi.useRealTimers(); + getUISpy.mockRestore(); formatBannerSpy.mockRestore(); formatResultSpy.mockRestore(); @@ -825,6 +832,7 @@ describe("runWizard — MastraClient lifecycle", () => { await runWizard(makeOptions()); expect(capturedClientOptions).toHaveLength(1); + expect(capturedClientOptions[0]?.retries).toBe(0); const signal = capturedClientOptions[0]?.abortSignal; expect(signal).toBeInstanceOf(AbortSignal); // Using the non-null assertion safely — we asserted toBeInstanceOf above. @@ -889,7 +897,9 @@ describe("runWizard — MastraClient lifecycle", () => { let abortedAtConstruction: boolean | undefined; getWorkflowSpy.mockImplementation(function (this: MastraClient) { const opts = ( - this as unknown as { options: { abortSignal?: AbortSignal } } + this as unknown as { + options: { abortSignal?: AbortSignal; retries?: number }; + } ).options; capturedClientOptions.push(opts); abortedAtConstruction = opts.abortSignal?.aborted; @@ -946,11 +956,19 @@ describe("runWizard — resumeWithRetry stale-step recovery", () => { params: { commands: ["npm install"] }, }; - function makeStaleStepRun(resumeAsyncImpl: () => Promise) { + function makeStaleStepRun( + resumeAsyncImpl: ( + args: Record + ) => Promise + ) { let runByIdRef: ReturnType; getWorkflowSpy.mockImplementation(function (this: MastraClient) { capturedClientOptions.push( - (this as unknown as { options: { abortSignal?: AbortSignal } }).options + ( + this as unknown as { + options: { abortSignal?: AbortSignal; retries?: number }; + } + ).options ); runByIdRef = runByIdMock; return { @@ -966,37 +984,63 @@ describe("runWizard — resumeWithRetry stale-step recovery", () => { }); } - function staleStepError(): Error { - return new Error( - "HTTP error! status: 500 - " + - JSON.stringify({ - error: - "This workflow step 'tool-step' was not suspended. Available suspended steps: [next-step]", - }) + function httpError( + status: number, + body: unknown + ): Error & { status: number } { + return Object.assign( + new Error(`HTTP error! status: ${status} - ${JSON.stringify(body)}`), + { status } ); } - function staleRunError(): Error { - return new Error( - "HTTP error! status: 500 - " + - JSON.stringify({ error: "This workflow run was not suspended" }) - ); + function staleStepError(status = 500): Error & { status: number } { + return httpError(status, { + error: + "This workflow step 'tool-step' was not suspended. Available suspended steps: [next-step]", + }); } - test("recovers when server has already advanced to the next step", async () => { + function staleRunError(status = 500): Error & { status: number } { + return httpError(status, { + error: "This workflow run was not suspended", + }); + } + + function selectWorkflowFields( + result: WorkflowRunResult, + fields: string[] | undefined + ): Record { + const source = result as unknown as Record; + const selected: Record = {}; + for (const field of fields ?? Object.keys(source)) { + if (field in source) { + selected[field] = source[field]; + } + } + return selected; + } + + test("recovers from a sparse 409 stale-resume conflict", async () => { mockStartResult = { status: "suspended", suspended: [["tool-step"]], steps: { "tool-step": { suspendPayload: toolPayload } }, }; - // runById returns a finished workflow — the wizard should complete cleanly. - mockRunByIdResult = { status: "success" }; + const currentRunState: WorkflowRunResult = { + status: "success", + suspended: [], + }; + runByIdMock.mockImplementation( + (_runId: string, opts?: { fields?: string[] }) => + Promise.resolve(selectWorkflowFields(currentRunState, opts?.fields)) + ); let resumeCount = 0; makeStaleStepRun(() => { resumeCount += 1; if (resumeCount === 1) { - return Promise.reject(staleStepError()); + return Promise.reject(staleStepError(409)); } return Promise.resolve({ status: "success" }); }); @@ -1006,36 +1050,142 @@ describe("runWizard — resumeWithRetry stale-step recovery", () => { expect(formatResultSpy).toHaveBeenCalled(); expect(runByIdMock).toHaveBeenCalledWith( "test-run-id", - expect.objectContaining({ fields: expect.any(Array) }) + expect.objectContaining({ + fields: expect.arrayContaining([ + "status", + "suspended", + "activeStepsPath", + ]), + }) ); // Recovery succeeded on the first attempt — resumeAsync was not called again. expect(resumeCount).toBe(1); }); - test("recovers from run-level not-suspended errors after transient runById failure", async () => { + test("keeps polling when runById returns the same suspended payload snapshot", async () => { + vi.useFakeTimers(); mockStartResult = { status: "suspended", suspended: [["tool-step"]], steps: { "tool-step": { suspendPayload: toolPayload } }, }; runByIdMock - .mockRejectedValueOnce(new Error("D1 snapshot not ready")) + .mockResolvedValueOnce({ + status: "suspended", + suspendPayload: toolPayload, + }) .mockResolvedValueOnce({ status: "success" }); let resumeCount = 0; makeStaleStepRun(() => { resumeCount += 1; - return Promise.reject(staleRunError()); + return Promise.reject(staleRunError(409)); }); - await runWizard(makeOptions()); + const run = runWizard(makeOptions()); + await vi.advanceTimersByTimeAsync(250); + await run; expect(formatResultSpy).toHaveBeenCalled(); expect(runByIdMock).toHaveBeenCalledTimes(2); expect(resumeCount).toBe(1); }); + test("uses active recovered payload instead of stale historical step payloads", async () => { + const stalePayload: ToolPayload = { + type: "tool", + operation: "read-files", + cwd: "/tmp/test", + params: { paths: ["old-package.json"] }, + }; + const activePayload: ToolPayload = { + type: "tool", + operation: "run-commands", + cwd: "/tmp/test", + params: { commands: ["echo apply"] }, + }; + mockStartResult = { + status: "suspended", + suspended: [["tool-step"]], + steps: { "tool-step": { suspendPayload: toolPayload } }, + }; + mockRunByIdResult = { + status: "suspended", + suspended: [["discover-context"]], + activeStepsPath: { "apply-codemods": [] }, + steps: { + "discover-context": { suspendPayload: stalePayload }, + "apply-codemods": { suspendPayload: activePayload }, + }, + }; + + let resumeCount = 0; + makeStaleStepRun(() => { + resumeCount += 1; + if (resumeCount === 1) { + return Promise.reject(staleStepError()); + } + return Promise.resolve({ status: "success" }); + }); + + await runWizard(makeOptions()); + + expect(executeToolSpy).toHaveBeenCalledWith(toolPayload, makeContext()); + expect(executeToolSpy).toHaveBeenCalledWith(activePayload, makeContext()); + expect(executeToolSpy).not.toHaveBeenCalledWith( + stalePayload, + makeContext() + ); + expect(resumeCount).toBe(2); + }); + + test("does not replay non-stale HTTP 500 resume responses", async () => { + mockStartResult = { + status: "suspended", + suspended: [["tool-step"]], + steps: { "tool-step": { suspendPayload: toolPayload } }, + }; + + let resumeCount = 0; + makeStaleStepRun(() => { + resumeCount += 1; + return Promise.reject(httpError(500, { error: "Error calling handler" })); + }); + + await expect(runWizard(makeOptions())).rejects.toThrow(WizardError); + + expect(resumeCount).toBe(1); + expect(runByIdMock).not.toHaveBeenCalled(); + }); + + test("observes run state after a resume timeout without replaying resumeAsync", async () => { + vi.useFakeTimers(); + mockStartResult = { + status: "suspended", + suspended: [["tool-step"]], + steps: { "tool-step": { suspendPayload: toolPayload } }, + }; + mockRunByIdResult = { status: "success" }; + + let resumeCount = 0; + makeStaleStepRun(() => { + resumeCount += 1; + return new Promise(() => { + /* Simulate a response that never arrives. */ + }); + }); + + const run = runWizard(makeOptions()); + await vi.advanceTimersByTimeAsync(180_000); + await run; + + expect(formatResultSpy).toHaveBeenCalled(); + expect(resumeCount).toBe(1); + expect(runByIdMock).toHaveBeenCalledTimes(1); + }); + test("throws when stale-step error occurs and runById keeps failing", async () => { + vi.useFakeTimers(); mockStartResult = { status: "suspended", suspended: [["tool-step"]], @@ -1051,11 +1201,74 @@ describe("runWizard — resumeWithRetry stale-step recovery", () => { return Promise.reject(staleStepError()); }); - await expect(runWizard(makeOptions())).rejects.toThrow(WizardError); + const run = runWizard(makeOptions()); + const rejection = expect(run).rejects.toThrow(WizardError); + await vi.runAllTimersAsync(); + await rejection; // Threw after recovery polling failed — no futile retries of the stale step. expect(resumeCount).toBe(1); - expect(runByIdMock).toHaveBeenCalledTimes(4); + expect(runByIdMock).toHaveBeenCalled(); + }); + + test("sends compact _prevPhases only for apply-codemods", async () => { + const largeContent = "x".repeat(10_000); + const readPayload: ToolPayload = { + type: "tool", + operation: "read-files", + cwd: "/tmp/test", + params: { paths: ["package.json"] }, + }; + const applyPayload: ToolPayload = { + type: "tool", + operation: "apply-patchset", + cwd: "/tmp/test", + params: { patches: [] }, + }; + mockStartResult = { + status: "suspended", + suspended: [["apply-codemods"]], + steps: { "apply-codemods": { suspendPayload: readPayload } }, + }; + executeToolSpy + .mockResolvedValueOnce({ + ok: true, + data: { files: { "package.json": largeContent } }, + }) + .mockResolvedValueOnce({ + ok: false, + error: "patch conflict", + }); + + const resumeArgs: Record[] = []; + makeStaleStepRun((args) => { + resumeArgs.push(args); + if (resumeArgs.length === 1) { + return Promise.resolve({ + status: "suspended", + suspended: [["apply-codemods"]], + steps: { "apply-codemods": { suspendPayload: applyPayload } }, + }); + } + return Promise.resolve({ status: "success" }); + }); + + await runWizard(makeOptions()); + + const secondResumeData = resumeArgs[1]?.resumeData as + | Record + | undefined; + expect(secondResumeData?._prevPhases).toEqual([ + { + ok: true, + operation: "read-files", + _phase: "read-files", + data: { files: { "package.json": null } }, + }, + ]); + expect(JSON.stringify(secondResumeData?._prevPhases)).not.toContain( + largeContent + ); }); }); @@ -1093,16 +1306,15 @@ describe("runWizard — additional coverage", () => { await expect(runWizard(makeOptions())).rejects.toThrow(WizardError); }); - test("finds suspend payload via fallback loop when primary step has none", async () => { + test("does not use inactive step payloads when active step info exists", async () => { const payload: ToolPayload = { type: "tool", operation: "run-commands", cwd: "/tmp/test", params: { commands: ["echo hi"] }, }; - // `suspended` points to "step-a", but its payload is missing. - // extractSuspendPayload falls back to iterating all steps and finds - // the payload in "step-b". + // `suspended` points to "step-a", so the stale payload on "step-b" + // must not be used. mockStartResult = { status: "suspended", suspended: [["step-a"]], @@ -1113,6 +1325,26 @@ describe("runWizard — additional coverage", () => { }; mockResumeResults = [{ status: "success" }]; + await expect(runWizard(makeOptions())).rejects.toThrow(WizardError); + + expect(executeToolSpy).not.toHaveBeenCalledWith(payload, makeContext()); + }); + + test("uses legacy fallback only when no active step info exists and one payload is present", async () => { + const payload: ToolPayload = { + type: "tool", + operation: "run-commands", + cwd: "/tmp/test", + params: { commands: ["echo hi"] }, + }; + mockStartResult = { + status: "suspended", + steps: { + "step-b": { suspendPayload: payload }, + }, + }; + mockResumeResults = [{ status: "success" }]; + await runWizard(makeOptions()); expect(executeToolSpy).toHaveBeenCalledWith(payload, makeContext());