WorkFlow
1import os 2import json 3import configparser 4from typing import Dict, List 5from NodeData import NodeData 6from crewai import Agent, Task, Crew, Process 7from langchain_community.llms import Ollama 8from langchain.chat_models import ChatOpenAI 9from crewai_tools import FileReadTool, BaseTool 10import networkx as nx 11 12def load_nodes_from_json(filename: str) -> Dict[str, NodeData]: 13 with open(filename, 'r') as file: 14 data = json.load(file) 15 node_map = {} 16 for node_data in data["nodes"]: 17 node = NodeData.from_dict(node_data) 18 node_map[node.uniq_id] = node 19 return node_map 20 21def find_nodes_by_type(node_map: Dict[str, NodeData], node_type: str) -> List[NodeData]: 22 return [node for node in node_map.values() if node.type == node_type] 23 24def find_node_by_type(node_map: Dict[str, NodeData], node_type: str) -> NodeData: 25 for node in node_map.values(): 26 if node.type == node_type: 27 return node 28 return None 29 30class FileWriterTool(BaseTool): 31 name: str = "FileWriter" 32 description: str = "Writes given content to a specified file." 33 34 def _run(self, filename: str, content: str) -> str: 35 with open(filename, 'w') as file: 36 file.write(content) 37 return f"Content successfully written to {filename}" 38 39def create_agent(node: NodeData, llm) -> Agent: 40 tools = [] 41 tools.append(FileWriterTool()) 42 tools.append(FileReadTool()) 43 44 return Agent( 45 role=node.role, 46 goal=node.goal, 47 backstory=node.backstory, 48 verbose=True, 49 allow_delegation=False, 50 llm=llm, 51 tools=tools 52 ) 53 54def create_task(node: NodeData, agent: Agent, node_map: Dict[str, NodeData], task_map: Dict[str, Task]) -> Task: 55 steps = [] 56 for step_id in node.nexts: 57 step_node = node_map[step_id] 58 tool_instance = None 59 if step_node.tool == "FileWriterTool()": 60 tool_instance = FileWriterTool() 61 elif step_node.tool == "FileReadTool()": 62 tool_instance = FileReadTool() 63 step = { 64 'tool': tool_instance, 65 'args': step_node.arg, 66 'output_var': step_node.output_var 67 } 68 steps.append(step) 69 70 # Resolve dependencies with actual Task instances 71 dependencies = [task_map[dep_id] for dep_id in node.prevs if dep_id in task_map] 72 73 return Task( 74 description=node.description, 75 expected_output=node.expected_output, 76 agent=agent, 77 steps=steps, 78 dependencies=dependencies 79 ) 80 81def topological_sort_tasks(task_nodes: List[NodeData]) -> List[NodeData]: 82 graph = nx.DiGraph() 83 84 # Add nodes to the graph 85 for node in task_nodes: 86 graph.add_node(node.uniq_id) 87 88 # Add edges to the graph 89 for node in task_nodes: 90 for prev_id in node.prevs: 91 if prev_id in graph: 92 graph.add_edge(prev_id, node.uniq_id) 93 94 # Perform topological sort 95 sorted_ids = list(nx.topological_sort(graph)) 96 97 # Return nodes in sorted order 98 id_to_node = {node.uniq_id: node for node in task_nodes} 99 sorted_tasks = [id_to_node[node_id] for node_id in sorted_ids] 100 101 return sorted_tasks 102 103def RunWorkFlow(node: NodeData, node_map: Dict[str, NodeData], llm): 104 print(f"Start root ID: {node.uniq_id}") 105 106 # from root find team 107 sub_node_map = {next_id: node_map[next_id] for next_id in node.nexts} 108 team_node = find_node_by_type(sub_node_map, "Team") 109 if not team_node: 110 print("No Team node found") 111 return 112 113 print(f"Processing Team {team_node.name} ID: {team_node.uniq_id}") 114 115 # from team find agents 116 agent_map = {next_id: node_map[next_id] for next_id in team_node.nexts} 117 agent_nodes = find_nodes_by_type(node_map, "Agent") 118 agents = {agent_node.name: create_agent(agent_node, llm) for agent_node in agent_nodes} 119 for agent_node in agent_nodes: 120 print(f"Agent {agent_node.name} ID: {agent_node.uniq_id}") 121 122 # Use BFS to collect all task nodes 123 task_nodes = [] 124 queue = find_nodes_by_type(sub_node_map, "Task") 125 126 while queue: 127 current_node = queue.pop(0) 128 if current_node not in task_nodes: 129 print(f"Processing task_node ID: {current_node.uniq_id}") 130 task_nodes.append(current_node) 131 next_sub_node_map = {next_id: node_map[next_id] for next_id in current_node.nexts} 132 queue.extend(find_nodes_by_type(next_sub_node_map, "Task")) 133 134 # Sort tasks topologically to respect dependencies 135 sorted_task_nodes = topological_sort_tasks(task_nodes) 136 137 tasks = [] 138 task_map = {} 139 140 # Create tasks with dependencies resolved 141 for task_node in sorted_task_nodes: 142 if task_node: 143 print(f"Processing task_node ID: {task_node.description}") 144 agent = agents[task_node.agent] 145 task = create_task(task_node, agent, node_map, task_map) 146 tasks.append(task) 147 task_map[task_node.uniq_id] = task 148 else: 149 print("No task_node found") 150 return 151 152 crew = Crew( 153 agents=list(agents.values()), 154 tasks=tasks, 155 verbose=2 156 ) 157 158 result = crew.kickoff() 159 print("######################") 160 print(result) 161 162def run_workflow_from_file(filename: str, llm): 163 node_map = load_nodes_from_json(filename) 164 start_nodes = find_nodes_by_type(node_map, "Start") 165 for start_node in start_nodes: 166 RunWorkFlow(start_node, node_map, llm)
def
find_nodes_by_type( node_map: Dict[str, NodeData.NodeData], node_type: str) -> List[NodeData.NodeData]:
def
find_node_by_type( node_map: Dict[str, NodeData.NodeData], node_type: str) -> NodeData.NodeData:
class
FileWriterTool(crewai_tools.tools.base_tool.BaseTool):
31class FileWriterTool(BaseTool): 32 name: str = "FileWriter" 33 description: str = "Writes given content to a specified file." 34 35 def _run(self, filename: str, content: str) -> str: 36 with open(filename, 'w') as file: 37 file.write(content) 38 return f"Content successfully written to {filename}"
Usage docs: https://docs.pydantic.dev/2.8/concepts/models/
A base class for creating Pydantic models.
Attributes: __class_vars__: The names of classvars defined on the model. __private_attributes__: Metadata about the private attributes of the model. __signature__: The signature for instantiating the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a `RootModel`.
__pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_extra__: An instance attribute with the values of extra fields from validation when
`model_config['extra'] == 'allow'`.
__pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
__pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
model_fields =
{'name': FieldInfo(annotation=str, required=False, default='FileWriter'), 'description': FieldInfo(annotation=str, required=False, default='Writes given content to a specified file.'), 'args_schema': FieldInfo(annotation=Type[BaseModel], required=False, default_factory=BaseTool._ArgsSchemaPlaceholder, validate_default=True), 'description_updated': FieldInfo(annotation=bool, required=False, default=False), 'cache_function': FieldInfo(annotation=Union[Callable, NoneType], required=False, default=<function BaseTool.<lambda>>)}
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- crewai_tools.tools.base_tool.BaseTool
- args_schema
- description_updated
- cache_function
- model_post_init
- run
- to_langchain
def
create_task( node: NodeData.NodeData, agent: crewai.agent.Agent, node_map: Dict[str, NodeData.NodeData], task_map: Dict[str, crewai.task.Task]) -> crewai.task.Task:
55def create_task(node: NodeData, agent: Agent, node_map: Dict[str, NodeData], task_map: Dict[str, Task]) -> Task: 56 steps = [] 57 for step_id in node.nexts: 58 step_node = node_map[step_id] 59 tool_instance = None 60 if step_node.tool == "FileWriterTool()": 61 tool_instance = FileWriterTool() 62 elif step_node.tool == "FileReadTool()": 63 tool_instance = FileReadTool() 64 step = { 65 'tool': tool_instance, 66 'args': step_node.arg, 67 'output_var': step_node.output_var 68 } 69 steps.append(step) 70 71 # Resolve dependencies with actual Task instances 72 dependencies = [task_map[dep_id] for dep_id in node.prevs if dep_id in task_map] 73 74 return Task( 75 description=node.description, 76 expected_output=node.expected_output, 77 agent=agent, 78 steps=steps, 79 dependencies=dependencies 80 )
82def topological_sort_tasks(task_nodes: List[NodeData]) -> List[NodeData]: 83 graph = nx.DiGraph() 84 85 # Add nodes to the graph 86 for node in task_nodes: 87 graph.add_node(node.uniq_id) 88 89 # Add edges to the graph 90 for node in task_nodes: 91 for prev_id in node.prevs: 92 if prev_id in graph: 93 graph.add_edge(prev_id, node.uniq_id) 94 95 # Perform topological sort 96 sorted_ids = list(nx.topological_sort(graph)) 97 98 # Return nodes in sorted order 99 id_to_node = {node.uniq_id: node for node in task_nodes} 100 sorted_tasks = [id_to_node[node_id] for node_id in sorted_ids] 101 102 return sorted_tasks
104def RunWorkFlow(node: NodeData, node_map: Dict[str, NodeData], llm): 105 print(f"Start root ID: {node.uniq_id}") 106 107 # from root find team 108 sub_node_map = {next_id: node_map[next_id] for next_id in node.nexts} 109 team_node = find_node_by_type(sub_node_map, "Team") 110 if not team_node: 111 print("No Team node found") 112 return 113 114 print(f"Processing Team {team_node.name} ID: {team_node.uniq_id}") 115 116 # from team find agents 117 agent_map = {next_id: node_map[next_id] for next_id in team_node.nexts} 118 agent_nodes = find_nodes_by_type(node_map, "Agent") 119 agents = {agent_node.name: create_agent(agent_node, llm) for agent_node in agent_nodes} 120 for agent_node in agent_nodes: 121 print(f"Agent {agent_node.name} ID: {agent_node.uniq_id}") 122 123 # Use BFS to collect all task nodes 124 task_nodes = [] 125 queue = find_nodes_by_type(sub_node_map, "Task") 126 127 while queue: 128 current_node = queue.pop(0) 129 if current_node not in task_nodes: 130 print(f"Processing task_node ID: {current_node.uniq_id}") 131 task_nodes.append(current_node) 132 next_sub_node_map = {next_id: node_map[next_id] for next_id in current_node.nexts} 133 queue.extend(find_nodes_by_type(next_sub_node_map, "Task")) 134 135 # Sort tasks topologically to respect dependencies 136 sorted_task_nodes = topological_sort_tasks(task_nodes) 137 138 tasks = [] 139 task_map = {} 140 141 # Create tasks with dependencies resolved 142 for task_node in sorted_task_nodes: 143 if task_node: 144 print(f"Processing task_node ID: {task_node.description}") 145 agent = agents[task_node.agent] 146 task = create_task(task_node, agent, node_map, task_map) 147 tasks.append(task) 148 task_map[task_node.uniq_id] = task 149 else: 150 print("No task_node found") 151 return 152 153 crew = Crew( 154 agents=list(agents.values()), 155 tasks=tasks, 156 verbose=2 157 ) 158 159 result = crew.kickoff() 160 print("######################") 161 print(result)
def
run_workflow_from_file(filename: str, llm):