0%

基于大模型实现一个ReAct Agent

下面是基于通义千问大模型实现的一个 ReAct Agent demo,它的功能是可以调用多个工具并返回结果,就类似于调用大模型接口时传入 tools 参数.

demo 中主要包含三个文件:qwen_agent.py、agent_tools.py、test.py。

qwen_agent.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import re
import json
import time

import dashscope

import agent_tools


class QWenReactAgent:
system_prompt = """Answer the following questions as best you can.
Please try to get response by yourself if possible, use the following tools if necessary:
{tools}

You must use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action, must contains the tool argument and its value in JSON format
Observation: the result of the action
(this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question, 'Final Answer:' must be output as a whole and cannot be disassembled
"""

react_prompt = """
Begin!
Question: {input}
Thought: """

prompt_template = """Answer the following questions as best you can.
Please try to get response by yourself if possible, use the following tools if necessary:
{tools}

You must use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action, must contains the tool argument and its value in JSON format
Observation: the result of the action
(this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Begin!
Question: {input}
Thought: """

def __init__(self, model_config, tools):
self.model_config = model_config
self.tools = tools
self.prompt = None
self.messages = []
self.curr_round = 0
self.max_round = 5
self.run_continue = True
self.final_answer = False # 是否是最终答案
self.show_process = True # 是否显示中间过程日志
self.stream_interval = 0.05

def load_prompt(self, question):
tool_names = ','.join(self.tools.keys())
self.prompt = self.prompt_template.format(tools=self.tools,
tool_names=tool_names,
input=question)

self.system_prompt = self.system_prompt.format(tools=self.tools, tool_names=tool_names)
self.react_prompt = self.react_prompt.format(input=question)

self.messages.append({
"role": 'system', "content": self.system_prompt
})
self.messages.append({
"role": 'user', "content": self.react_prompt
})

def run_llm_prompt(self):
while self.run_continue and self.curr_round < self.max_round:
llm_response = dashscope.Generation.call(
model=self.model_config.get("model"),
prompt=self.prompt,
result_format='message'
)

if llm_response.get("status_code") == 200:
llm_output = llm_response.get("output")
choices = llm_output.get("choices")[0]
content = choices.get("message").get("content")
self.prompt += content
print(">>>>>>>>>>")
print(f"=====第{self.curr_round + 1}轮prompt: {self.prompt}")

if "Final Answer" in content:
self.run_continue = False
break

# 获取Action和Action Input
split_content = re.split('Action: |Action Input: ', content)
if len(split_content) > 2:
tool_name = split_content[1].replace('\n', '')
tool_args = split_content[2].replace('\n', '')

# 执行函数
if hasattr(agent_tools, tool_name):
func = getattr(agent_tools, tool_name)
result = func(**json.loads(tool_args))
self.prompt += f"Observation: {result}\n"

self.curr_round += 1
else:
self.run_continue = False
break

print(">>>>>>>>>>")
print(f"=====Final Answer: {self.prompt}")

return self.prompt

def run_llm_messages(self):
while self.run_continue and self.curr_round < self.max_round:
llm_response = dashscope.Generation.call(
model=self.model_config.get("model"),
messages=self.messages,
result_format='message'
)

if llm_response.get("status_code") == 200:
llm_output = llm_response.get("output")
choices = llm_output.get("choices")[0]
content = choices.get("message").get("content")
self.react_prompt += content

print(">>>>>>>>>>")
print(f"=====第{self.curr_round + 1}轮prompt: {self.react_prompt}")

if "Final Answer" in content:
self.run_continue = False
break

# 获取Action和Action Input
split_content = re.split('Action: |Action Input: ', content)
if len(split_content) > 2:
tool_name = split_content[1].replace('\n', '')
tool_args = split_content[2].replace('\n', '')

# 执行函数
if hasattr(agent_tools, tool_name):
func = getattr(agent_tools, tool_name)
result = func(**json.loads(tool_args))
self.react_prompt += f"Observation: {result}\n"

self.curr_round += 1

self.messages = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": self.react_prompt}
]
else:
self.run_continue = False
break

print(">>>>>>>>>>")
print(f"=====Final Answer: {self.react_prompt}")

return self.messages

def run_llm_stream(self):
"""流式输出结果
"""
final_answer_content = []
while self.run_continue and self.curr_round < self.max_round:
llm_response = dashscope.Generation.call(
model=self.model_config.get("model"),
messages=self.messages,
result_format='message',
incremental_output=True,
stream=True,
)

current_resp_content = ''
for response in llm_response:
# print(f"=====response: {response}")

if response.get("status_code") == 200:
llm_output = response.get("output")
choices = llm_output.get("choices")[0]
content = choices.get("message").get("content")

if not self.show_process:
# 如果不需要输出中间结果,只输出final answer就行了
if self.final_answer:
yield content.lstrip()
else:
if 'Final Answer' in content:
self.final_answer = True
c = content.split("Final Answer:")
if len(c) > 1 and c[1]:
yield c[1]
else:
if self.final_answer:
final_answer_content.append(content)
else:
if 'Final Answer' in content:
self.final_answer = True
c = content.split("Final Answer:")
if len(c) > 1 and c[1]:
final_answer_content.append("\n" + c[1])

current_resp_content += content
if choices.get("finish_reason") == "stop":
break

if current_resp_content:
if "Final Answer" in current_resp_content:
self.run_continue = False
self.react_prompt += current_resp_content

if self.show_process:
yield current_resp_content
break

# 输出中间过程结果
if self.show_process:
if self.curr_round == 0:
for msg in (self.react_prompt + current_resp_content).split('\n')[:-1]:
time.sleep(self.stream_interval)
yield msg
else:
for msg in current_resp_content.split('\n')[:-1]:
time.sleep(self.stream_interval)
yield msg

self.react_prompt += current_resp_content

# 获取Action和Action Input
split_content = re.split('Action: |Action Input: ', current_resp_content)
if len(split_content) > 2:
tool_name = split_content[1].replace('\n', '')
tool_args = split_content[2].replace('\n', '')

# 执行函数
if hasattr(agent_tools, tool_name):
func = getattr(agent_tools, tool_name)
result = func(**json.loads(tool_args))
self.react_prompt += f"Observation: {result}\n"

if self.show_process:
yield f"Observation: {result}"

self.curr_round += 1
self.messages = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": self.react_prompt}
]

# 输出最终结果
if self.show_process:
print(f"====================")
for msg in final_answer_content:
time.sleep(self.stream_interval)
yield msg.lstrip()

async def run_llm_async_stream(self):
pass

agent_tools.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def get_city_weather(city, date):
"""这是一个根据城市名称和日期查询城市天气的函数
"""
if city == "武汉" and date == "2024-07-15":
return "晴天"
elif city == "武汉" and date == "2024-07-16":
return "阴天"

if city == "北京":
return "阴天"
return "未知"


def book_ticket(weather, city, date):
"""这是一个预订机票的函数
"""
if weather == "晴天":
if city == "武汉" and date == "2024-07-15":
return "机票预订成功"
return "机票预订失败"


def book_hotel(booked_hotel, city, date):
"""这是一个预订酒店的函数
"""
if booked_hotel == "机票预订成功":
if city == "武汉" and date == "2024-07-15":
return "酒店预订成功"

return "酒店预订失败"

test.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import inspect

from qwen_agent import QWenReactAgent

from agent_tools import *


tools = {
"get_city_weather": inspect.getsource(get_city_weather),
"book_ticket": inspect.getsource(book_ticket),
"book_hotel": inspect.getsource(book_hotel)
}


def qwen():
# model = 'qwen-plus'
# model = 'qwen-turbo'
# model = 'qwen-max'
model = 'qwen-max-longcontext'
agent = QWenReactAgent({"model": model}, tools)

prompt = "帮我查询2024年7月15日武汉的天气情况;如果是晴天,请帮我预定当天的机票;如果机票预订成功,再帮我预定当天的酒店"
agent.load_prompt(prompt)

resp = agent.run_llm_stream()
for r in resp:
print(r)


if __name__ == '__main__':
qwen()