I’m building a LangGraph workflow with multiple connected nodes and having trouble with state persistence. My workflow has several nodes linked with conditional routing, but the state data isn’t carrying over properly.
Here’s my setup with the first two nodes:
def message_processor(state: TypedDict) -> None:
result, state = fetch_messages(state)
if result == "Messages Found":
state["current_step"] = "category_handler"
else:
state["error_msg"] = "No Messages Available"
state["current_step"] = "final_output"
def get_current_step(state: TypedDict) -> str:
logging.debug(f"Current Step: {state["current_step"]}")
return state["current_step"]
I’m using a conditional edge to route between nodes:
flow.add_conditional_edges(
"message_processor",
get_current_step,
{
"category_handler": "category_handler",
"final_output": "final_output",
},
)
The problem is that my debug logging shows an empty string for the state value, which means the state isn’t being passed correctly between nodes. My complete workflow looks like this:
flow = StateGraph(MyGraphState)
flow.add_node("message_processor", message_processor)
flow.add_node("category_handler", category_handler)
flow.add_node("priority_handler", priority_handler)
flow.add_node("task_handler", task_handler)
flow.add_node("final_output", final_output_node)
flow.set_entry_point("message_processor")
flow.add_conditional_edges(
"message_processor",
get_current_step,
{
"category_handler": "category_handler",
"final_output": "final_output",
},
)
flow.add_conditional_edges(
"category_handler",
get_current_step,
{
"priority_handler": "priority_handler",
"final_output": "final_output",
},
)
flow.add_edge("task_handler", "final_output")
flow.add_edge("final_output", END)
I’m compiling this in one class and invoking it from another function. Could the separation across multiple files be causing the state to not persist? What am I missing here?