Flow Engine
Build multi-step, stateful interactions that route between questions, messages, summaries, and tools. The engine drives a SelfPrompt per question using strategies like ChoicesStrat, UntilStrat, and CharsStrat.
Concepts
FlowQuestion— one question with a prompt, astrategy, transitions, optional assignments/branching, and a completion suffix.FlowDefinition— groups a graph of questions and builds a summary.FlowEngine— per-request state + constraint orchestration; single entrypointhandle_event(...).- Routes —
route_question,route_message,route_summary,route_output,route_tool,route_noop.
Define a Flow
from quote_mod_sdk.flow import (
FlowQuestion, FlowDefinition, FlowEngine,
route_question, route_message, route_summary, route_tool
)
from quote_mod_sdk.strategies.strategy_constructor import ChoicesStrat, UntilStrat, CharsStrat
from quote_mod_sdk.strategies.primitives import UntilEndType, CharsMode
q_confirm = FlowQuestion(
name="confirm",
prompt=" Proceed? (yes/no): ",
strategy=ChoicesStrat(["yes", "no"]),
)
q_color = FlowQuestion(
name="color",
prompt=" Pick a color: ",
strategy=ChoicesStrat(["red", "green", "blue"]),
)
q_confirm.on("yes", route_question(q_color)).on("no", route_message("Cancelled."))
q_color.then(route_summary())
flow = FlowDefinition(
name="demo",
root=q_confirm,
summary_builder=lambda s: f"Summary: confirm={s.answers.get('confirm')}, color={s.answers.get('color')}"
)
ENGINE = FlowEngine(entry_question=q_confirm, flows={flow.name: flow})Strategies in questions
- Extract until a tag:
UntilStrat("<answer>", UntilEndType.TAG, "</answer>") - Fixed-length digits:
CharsStrat(CharsMode.NUMERIC, min=4, stop=4) - Lists with separators/wrappers:
ListStrat(elements=ChoicesStrat(["A","B"]), sep=", ", open="[", close="]")
Integrate in a Mod
Use the single entrypoint ENGINE.handle_event(event, actions, tokenizer) as in the airline example.
from quote_mod_sdk import mod, Prefilled, ForwardPass, Added, get_conversation
@mod
def flow_mod(event, actions, tokenizer):
# Optional: inspect/normalize conversation before handing control to the engine
if isinstance(event, Prefilled):
print("conversation", get_conversation())
return ENGINE.handle_event(event, actions, tokenizer)App State and Tooling
Create an AppState that satisfies the engine’s FlowState protocol (request_id, current_question, pending_route, answers, data). Populate domain state using conversation utilities and route to tools via route_tool.
- Parse tool results:
tool_call_pairs(...) - Populate caches from conversation history (e.g., user/reservation/flight status)
- Emit tool calls from routes using callbacks:
route_tool(my_tool_fn)wheremy_tool_fn(actions, state, tokenizer)returns an action (oftenactions.tool_calls(payload))
The airline helper example wires these pieces together and demonstrates dynamic branching, auto-answers, and summary messaging.
Routing & Transitions
FlowQuestion supports multiple ways to move to the next step — these mirror patterns used in Tau2:
-
Explicit transitions:
.on(answer: str, target)- Match the normalized answer string and route accordingly.
- Target may be a
FlowQuestion, aFlowRoute(e.g.,route_message(...)), aFlowDefinition(routes to its root), or a callable returning any of those.
-
Default transition:
.then(target)or.otherwise(target)- Route when no explicit
.onmatches.
- Route when no explicit
-
Assignments:
.assign(lambda state, answer: ...)- Run after completion to persist side effects in your
AppState.
- Run after completion to persist side effects in your
-
Branch resolvers:
.branch(lambda state: Optional[RouteSpec])- Evaluated after an answer is recorded (or when no explicit/default transition is set), useful for “inspect state and decide”.
- Return
Noneto skip; otherwise return aRouteSpec(question, route helper, or callable).
-
Auto-answers:
.with_auto_answer(lambda state: Optional[str])- Evaluated when entering a question. If a string is returned, the question is answered immediately, assignments run, and routing proceeds without emitting any tokens for that question.
Route Helpers
route_question(q)— move to anotherFlowQuestionroute_message(text)— injects the message as tokens (plus EOS if available)route_summary(message?)— callsFlowDefinition.summary_builder(state)when present, otherwise usesmessageroute_output(text)— force the final output tokensroute_tool(callback)— invoke a tool callback, typically to returnactions.tool_calls(payload)
Tool callback signatures:
def my_tool(actions, state, tokenizer):
# Return an action (e.g., actions.tool_calls(...), actions.force_output(...))
return actions.tool_calls({ ... })
# Optionally, a state -> RouteSpec function can be supplied and will be coerced to a route
def decide_next(state):
return route_message("done")Completion + Erase
When a question completes, the engine reads the answer tokens from the internal SelfPrompt and decodes the answer text. If an erase mode is active (e.g., prompt-only or all), the engine emits a Backtrack to remove the injected prompt (and optionally the answer) and then queues the next route for the following ForwardPass.
Examples (from Tau2)
- Menu router with explicit transitions and branching:
router = FlowQuestion(
name="planner.router",
prompt=" Choose a path (find info / cancel / modify / book / status): ",
strategy=ChoicesStrat(["find info", "cancel", "modify", "book", "status"]),
).on("find info", lambda s: get_user_id(s, get_all_reservation_details))
.on("cancel", lambda s: get_user_id(s, any_flown))
.on("modify", modify_router)
.on("book", lambda s: get_user_id(s, book_flight))
.branch(lambda s: continue_route(s))- Clarification question using tags:
def clarify(what: str, target):
q = FlowQuestion(
name=f"planner.clarify_{what}",
prompt=f" Ask for {what} wrapped in <question_to_user> tags: ",
strategy=UntilStrat("<question_to_user>", UntilEndType.TAG, "</question_to_user>"),
)
return q.then(lambda s: route_output("Let me ask: " + s.answers[q.name][len("<question_to_user>"):-len("</question_to_user>")]))- Assigning to state and gating:
reason = FlowQuestion(
name="planner.cancel_reason",
prompt=" Reason? (weather/health/other): ",
strategy=ChoicesStrat(["weather", "health", "other"]),
).assign(lambda st, ans: setattr(st.active_reservation, "cancel_reason", ans))
.branch(lambda st: user_confirm_cancel(st) if st.active_reservation.cancel_reason in ("weather", "health") else route_message("Not covered"))