Spaces:
Paused
Paused
| from langgraph.graph import StateGraph, END | |
| from agents.agent_state import AgentState | |
| class AgentSystem: | |
| def __init__(self, nodes, members): | |
| self.workflow = self.setup_nodes(nodes) | |
| self.setup_graph(members) | |
| self.members = members | |
| def get_workflow(self, ): | |
| return workflow | |
| def compile(self): | |
| return self.workflow.compile() | |
| def setup_nodes(self, nodes): | |
| workflow = StateGraph(AgentState) | |
| for node in nodes: | |
| workflow.add_node(node["name"], node["instance"]) | |
| return workflow | |
| def check_step(self, x): | |
| o = x["next"] | |
| #print(f"Check for {o}") | |
| #print(self.members) | |
| for member in self.members: | |
| if member in o: | |
| #print(f"Found member: {member} in target_string") | |
| return member | |
| #else: | |
| #print(f"Member: {member} not found in target_string") | |
| #print("nothing found, Finish") | |
| return "FINISH" | |
| def setup_graph(self, members): | |
| for member in members: | |
| # We want our workers to ALWAYS "report back" to the supervisor when done | |
| print(f"Add return path to supervisor for {member}") | |
| self.workflow.add_edge(member, "supervisor") | |
| # The supervisor populates the "next" field in the graph state | |
| # which routes to a node or finishes | |
| conditional_map = {k: k for k in members} | |
| conditional_map["FINISH"] = END | |
| self.workflow.add_conditional_edges("supervisor", lambda x : self.check_step( x ), conditional_map) | |
| # Finally, add entrypoint | |
| self.workflow.set_entry_point("supervisor") | |