WorkFlow

  1import os
  2import json
  3import configparser
  4from typing import Dict, List
  5from NodeData import NodeData
  6from crewai import Agent, Task, Crew, Process
  7from langchain_community.llms import Ollama
  8from langchain.chat_models import ChatOpenAI
  9from crewai_tools import FileReadTool, BaseTool
 10import networkx as nx
 11
 12def load_nodes_from_json(filename: str) -> Dict[str, NodeData]:
 13    with open(filename, 'r') as file:
 14        data = json.load(file)
 15        node_map = {}
 16        for node_data in data["nodes"]:
 17            node = NodeData.from_dict(node_data)
 18            node_map[node.uniq_id] = node
 19        return node_map
 20
 21def find_nodes_by_type(node_map: Dict[str, NodeData], node_type: str) -> List[NodeData]:
 22    return [node for node in node_map.values() if node.type == node_type]
 23
 24def find_node_by_type(node_map: Dict[str, NodeData], node_type: str) -> NodeData:
 25    for node in node_map.values():
 26        if node.type == node_type:
 27            return node
 28    return None
 29
 30class FileWriterTool(BaseTool):
 31    name: str = "FileWriter"
 32    description: str = "Writes given content to a specified file."
 33
 34    def _run(self, filename: str, content: str) -> str:
 35        with open(filename, 'w') as file:
 36            file.write(content)
 37        return f"Content successfully written to {filename}"
 38
 39def create_agent(node: NodeData, llm) -> Agent:
 40    tools = []
 41    tools.append(FileWriterTool())
 42    tools.append(FileReadTool())
 43    
 44    return Agent(
 45        role=node.role,
 46        goal=node.goal,
 47        backstory=node.backstory,
 48        verbose=True,
 49        allow_delegation=False,
 50        llm=llm,
 51        tools=tools
 52    )
 53
 54def create_task(node: NodeData, agent: Agent, node_map: Dict[str, NodeData], task_map: Dict[str, Task]) -> Task:
 55    steps = []
 56    for step_id in node.nexts:
 57        step_node = node_map[step_id]
 58        tool_instance = None
 59        if step_node.tool == "FileWriterTool()":
 60            tool_instance = FileWriterTool()
 61        elif step_node.tool == "FileReadTool()":
 62            tool_instance = FileReadTool()
 63        step = {
 64            'tool': tool_instance,
 65            'args': step_node.arg,
 66            'output_var': step_node.output_var
 67        }
 68        steps.append(step)
 69    
 70    # Resolve dependencies with actual Task instances
 71    dependencies = [task_map[dep_id] for dep_id in node.prevs if dep_id in task_map]
 72
 73    return Task(
 74        description=node.description,
 75        expected_output=node.expected_output,
 76        agent=agent,
 77        steps=steps,
 78        dependencies=dependencies
 79    )
 80
 81def topological_sort_tasks(task_nodes: List[NodeData]) -> List[NodeData]:
 82    graph = nx.DiGraph()
 83
 84    # Add nodes to the graph
 85    for node in task_nodes:
 86        graph.add_node(node.uniq_id)
 87
 88    # Add edges to the graph
 89    for node in task_nodes:
 90        for prev_id in node.prevs:
 91            if prev_id in graph:
 92                graph.add_edge(prev_id, node.uniq_id)
 93    
 94    # Perform topological sort
 95    sorted_ids = list(nx.topological_sort(graph))
 96    
 97    # Return nodes in sorted order
 98    id_to_node = {node.uniq_id: node for node in task_nodes}
 99    sorted_tasks = [id_to_node[node_id] for node_id in sorted_ids]
100    
101    return sorted_tasks
102
103def RunWorkFlow(node: NodeData, node_map: Dict[str, NodeData], llm):
104    print(f"Start root ID: {node.uniq_id}")
105
106    # from root find team
107    sub_node_map = {next_id: node_map[next_id] for next_id in node.nexts}
108    team_node = find_node_by_type(sub_node_map, "Team")
109    if not team_node:
110        print("No Team node found")
111        return
112
113    print(f"Processing Team {team_node.name} ID: {team_node.uniq_id}")
114
115    # from team find agents
116    agent_map = {next_id: node_map[next_id] for next_id in team_node.nexts}
117    agent_nodes = find_nodes_by_type(node_map, "Agent")
118    agents = {agent_node.name: create_agent(agent_node, llm) for agent_node in agent_nodes}
119    for agent_node in agent_nodes:
120        print(f"Agent {agent_node.name} ID: {agent_node.uniq_id}")
121
122    # Use BFS to collect all task nodes
123    task_nodes = []
124    queue = find_nodes_by_type(sub_node_map, "Task")
125    
126    while queue:
127        current_node = queue.pop(0)
128        if current_node not in task_nodes:
129            print(f"Processing task_node ID: {current_node.uniq_id}")
130            task_nodes.append(current_node)
131            next_sub_node_map = {next_id: node_map[next_id] for next_id in current_node.nexts}
132            queue.extend(find_nodes_by_type(next_sub_node_map, "Task"))
133
134    # Sort tasks topologically to respect dependencies
135    sorted_task_nodes = topological_sort_tasks(task_nodes)
136
137    tasks = []
138    task_map = {}
139    
140    # Create tasks with dependencies resolved
141    for task_node in sorted_task_nodes:
142        if task_node:
143            print(f"Processing task_node ID: {task_node.description}")
144            agent = agents[task_node.agent]
145            task = create_task(task_node, agent, node_map, task_map)
146            tasks.append(task)
147            task_map[task_node.uniq_id] = task
148        else:
149            print("No task_node found")
150            return
151
152    crew = Crew(
153        agents=list(agents.values()),
154        tasks=tasks,
155        verbose=2
156    )
157    
158    result = crew.kickoff()
159    print("######################")
160    print(result)
161
162def run_workflow_from_file(filename: str, llm):
163    node_map = load_nodes_from_json(filename)
164    start_nodes = find_nodes_by_type(node_map, "Start")
165    for start_node in start_nodes:
166        RunWorkFlow(start_node, node_map, llm)
def load_nodes_from_json(filename: str) -> Dict[str, NodeData.NodeData]:
13def load_nodes_from_json(filename: str) -> Dict[str, NodeData]:
14    with open(filename, 'r') as file:
15        data = json.load(file)
16        node_map = {}
17        for node_data in data["nodes"]:
18            node = NodeData.from_dict(node_data)
19            node_map[node.uniq_id] = node
20        return node_map
def find_nodes_by_type( node_map: Dict[str, NodeData.NodeData], node_type: str) -> List[NodeData.NodeData]:
22def find_nodes_by_type(node_map: Dict[str, NodeData], node_type: str) -> List[NodeData]:
23    return [node for node in node_map.values() if node.type == node_type]
def find_node_by_type( node_map: Dict[str, NodeData.NodeData], node_type: str) -> NodeData.NodeData:
25def find_node_by_type(node_map: Dict[str, NodeData], node_type: str) -> NodeData:
26    for node in node_map.values():
27        if node.type == node_type:
28            return node
29    return None
class FileWriterTool(crewai_tools.tools.base_tool.BaseTool):
31class FileWriterTool(BaseTool):
32    name: str = "FileWriter"
33    description: str = "Writes given content to a specified file."
34
35    def _run(self, filename: str, content: str) -> str:
36        with open(filename, 'w') as file:
37            file.write(content)
38        return f"Content successfully written to {filename}"

Usage docs: https://docs.pydantic.dev/2.8/concepts/models/

A base class for creating Pydantic models.

Attributes: __class_vars__: The names of classvars defined on the model. __private_attributes__: Metadata about the private attributes of the model. __signature__: The signature for instantiating the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a `RootModel`.
__pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.

__pydantic_extra__: An instance attribute with the values of extra fields from validation when
    `model_config['extra'] == 'allow'`.
__pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
__pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
name: str

The unique name of the tool that clearly communicates its purpose.

description: str

Used to tell the model how/when/why to use the tool.

model_config = {}
model_fields = {'name': FieldInfo(annotation=str, required=False, default='FileWriter'), 'description': FieldInfo(annotation=str, required=False, default='Writes given content to a specified file.'), 'args_schema': FieldInfo(annotation=Type[BaseModel], required=False, default_factory=BaseTool._ArgsSchemaPlaceholder, validate_default=True), 'description_updated': FieldInfo(annotation=bool, required=False, default=False), 'cache_function': FieldInfo(annotation=Union[Callable, NoneType], required=False, default=<function BaseTool.<lambda>>)}
model_computed_fields = {}
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
crewai_tools.tools.base_tool.BaseTool
args_schema
description_updated
cache_function
model_post_init
run
to_langchain
def create_agent(node: NodeData.NodeData, llm) -> crewai.agent.Agent:
40def create_agent(node: NodeData, llm) -> Agent:
41    tools = []
42    tools.append(FileWriterTool())
43    tools.append(FileReadTool())
44    
45    return Agent(
46        role=node.role,
47        goal=node.goal,
48        backstory=node.backstory,
49        verbose=True,
50        allow_delegation=False,
51        llm=llm,
52        tools=tools
53    )
def create_task( node: NodeData.NodeData, agent: crewai.agent.Agent, node_map: Dict[str, NodeData.NodeData], task_map: Dict[str, crewai.task.Task]) -> crewai.task.Task:
55def create_task(node: NodeData, agent: Agent, node_map: Dict[str, NodeData], task_map: Dict[str, Task]) -> Task:
56    steps = []
57    for step_id in node.nexts:
58        step_node = node_map[step_id]
59        tool_instance = None
60        if step_node.tool == "FileWriterTool()":
61            tool_instance = FileWriterTool()
62        elif step_node.tool == "FileReadTool()":
63            tool_instance = FileReadTool()
64        step = {
65            'tool': tool_instance,
66            'args': step_node.arg,
67            'output_var': step_node.output_var
68        }
69        steps.append(step)
70    
71    # Resolve dependencies with actual Task instances
72    dependencies = [task_map[dep_id] for dep_id in node.prevs if dep_id in task_map]
73
74    return Task(
75        description=node.description,
76        expected_output=node.expected_output,
77        agent=agent,
78        steps=steps,
79        dependencies=dependencies
80    )
def topological_sort_tasks(task_nodes: List[NodeData.NodeData]) -> List[NodeData.NodeData]:
 82def topological_sort_tasks(task_nodes: List[NodeData]) -> List[NodeData]:
 83    graph = nx.DiGraph()
 84
 85    # Add nodes to the graph
 86    for node in task_nodes:
 87        graph.add_node(node.uniq_id)
 88
 89    # Add edges to the graph
 90    for node in task_nodes:
 91        for prev_id in node.prevs:
 92            if prev_id in graph:
 93                graph.add_edge(prev_id, node.uniq_id)
 94    
 95    # Perform topological sort
 96    sorted_ids = list(nx.topological_sort(graph))
 97    
 98    # Return nodes in sorted order
 99    id_to_node = {node.uniq_id: node for node in task_nodes}
100    sorted_tasks = [id_to_node[node_id] for node_id in sorted_ids]
101    
102    return sorted_tasks
def RunWorkFlow(node: NodeData.NodeData, node_map: Dict[str, NodeData.NodeData], llm):
104def RunWorkFlow(node: NodeData, node_map: Dict[str, NodeData], llm):
105    print(f"Start root ID: {node.uniq_id}")
106
107    # from root find team
108    sub_node_map = {next_id: node_map[next_id] for next_id in node.nexts}
109    team_node = find_node_by_type(sub_node_map, "Team")
110    if not team_node:
111        print("No Team node found")
112        return
113
114    print(f"Processing Team {team_node.name} ID: {team_node.uniq_id}")
115
116    # from team find agents
117    agent_map = {next_id: node_map[next_id] for next_id in team_node.nexts}
118    agent_nodes = find_nodes_by_type(node_map, "Agent")
119    agents = {agent_node.name: create_agent(agent_node, llm) for agent_node in agent_nodes}
120    for agent_node in agent_nodes:
121        print(f"Agent {agent_node.name} ID: {agent_node.uniq_id}")
122
123    # Use BFS to collect all task nodes
124    task_nodes = []
125    queue = find_nodes_by_type(sub_node_map, "Task")
126    
127    while queue:
128        current_node = queue.pop(0)
129        if current_node not in task_nodes:
130            print(f"Processing task_node ID: {current_node.uniq_id}")
131            task_nodes.append(current_node)
132            next_sub_node_map = {next_id: node_map[next_id] for next_id in current_node.nexts}
133            queue.extend(find_nodes_by_type(next_sub_node_map, "Task"))
134
135    # Sort tasks topologically to respect dependencies
136    sorted_task_nodes = topological_sort_tasks(task_nodes)
137
138    tasks = []
139    task_map = {}
140    
141    # Create tasks with dependencies resolved
142    for task_node in sorted_task_nodes:
143        if task_node:
144            print(f"Processing task_node ID: {task_node.description}")
145            agent = agents[task_node.agent]
146            task = create_task(task_node, agent, node_map, task_map)
147            tasks.append(task)
148            task_map[task_node.uniq_id] = task
149        else:
150            print("No task_node found")
151            return
152
153    crew = Crew(
154        agents=list(agents.values()),
155        tasks=tasks,
156        verbose=2
157    )
158    
159    result = crew.kickoff()
160    print("######################")
161    print(result)
def run_workflow_from_file(filename: str, llm):
163def run_workflow_from_file(filename: str, llm):
164    node_map = load_nodes_from_json(filename)
165    start_nodes = find_nodes_by_type(node_map, "Start")
166    for start_node in start_nodes:
167        RunWorkFlow(start_node, node_map, llm)