扩展功能
概述
基于迷你 Agent,添加更实用的功能:
- 流式输出
- 用户确认
- 循环检测
1. 流式输出
让用户实时看到 Agent 的思考过程。
typescript
async function runAgentStream(userMessage: string) {
const chat = model.startChat()
// 使用流式 API
const result = await chat.sendMessageStream(userMessage)
for await (const chunk of result.stream) {
// 实时输出文本
const text = chunk.text()
if (text) {
process.stdout.write(text)
}
// 检查工具调用
const functionCalls = chunk.functionCalls()
if (functionCalls?.length) {
// 处理工具调用...
}
}
}完整的流式实现
typescript
async function runAgentWithStreaming(userMessage: string): Promise<void> {
const chat = model.startChat()
let pendingFunctionCalls: any[] = []
async function processStream(stream: AsyncIterable<any>) {
let fullText = ''
for await (const chunk of stream) {
// 实时输出文本
const text = chunk.text()
if (text) {
process.stdout.write(text)
fullText += text
}
// 收集工具调用
const calls = chunk.functionCalls()
if (calls) {
pendingFunctionCalls.push(...calls)
}
}
return { text: fullText, functionCalls: pendingFunctionCalls }
}
let response = await chat.sendMessageStream(userMessage)
let turns = 0
while (turns < 10) {
turns++
pendingFunctionCalls = []
const { functionCalls } = await processStream(response.stream)
if (!functionCalls.length) {
console.log() // 换行
return
}
// 执行工具
const results = []
for (const call of functionCalls) {
console.log(`\n[执行 ${call.name}]`)
const tool = tools[call.name as keyof typeof tools]
const result = await tool.execute(call.args as any)
results.push({
functionResponse: { name: call.name, response: result }
})
}
response = await chat.sendMessageStream(results)
}
}2. 用户确认
危险操作前请求用户确认。
typescript
import * as readline from 'readline'
// 简单的确认提示
async function confirm(message: string): Promise<boolean> {
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout
})
return new Promise(resolve => {
rl.question(`${message} (y/n) `, answer => {
rl.close()
resolve(answer.toLowerCase() === 'y')
})
})
}
// 改进的工具定义
const tools = {
run_command: {
declaration: { /* ... */ },
// 判断是否需要确认
needsConfirmation: (args: { command: string }) => {
const safeCommands = ['ls', 'pwd', 'cat', 'echo', 'which']
const firstWord = args.command.split(' ')[0]
return !safeCommands.includes(firstWord)
},
execute: async (args: { command: string }) => {
// 执行逻辑...
}
}
}
// 在 Agent 循环中添加确认
async function executeToolWithConfirmation(call: FunctionCall) {
const tool = tools[call.name as keyof typeof tools]
// 检查是否需要确认
if (tool.needsConfirmation?.(call.args as any)) {
console.log(`\n准备执行: ${call.name}`)
console.log(`参数: ${JSON.stringify(call.args)}`)
const approved = await confirm('是否继续?')
if (!approved) {
return { error: '用户取消了操作' }
}
}
return tool.execute(call.args as any)
}3. 循环检测
防止 Agent 陷入无限循环。
typescript
class LoopDetector {
private history: string[] = []
private maxHistory = 10
addAction(action: string) {
this.history.push(action)
if (this.history.length > this.maxHistory) {
this.history.shift()
}
}
isLooping(): boolean {
if (this.history.length < 4) return false
// 检查最近 2 次是否与之前 2 次相同
const recent = this.history.slice(-2).join('|')
const before = this.history.slice(-4, -2).join('|')
return recent === before
}
reset() {
this.history = []
}
}
// 在 Agent 循环中使用
const loopDetector = new LoopDetector()
for (const call of functionCalls) {
const actionKey = `${call.name}:${JSON.stringify(call.args)}`
loopDetector.addAction(actionKey)
if (loopDetector.isLooping()) {
console.log('\n检测到循环,中断执行')
return '检测到重复操作,已停止'
}
// 执行工具...
}完整增强版 Agent
typescript
// enhanced-agent.ts
import { GoogleGenerativeAI } from '@google/generative-ai'
import * as fs from 'fs/promises'
import { exec } from 'child_process'
import { promisify } from 'util'
import * as readline from 'readline'
const execAsync = promisify(exec)
// 确认提示
async function confirm(message: string): Promise<boolean> {
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout
})
return new Promise(resolve => {
rl.question(`${message} (y/n) `, answer => {
rl.close()
resolve(answer.toLowerCase() === 'y')
})
})
}
// 循环检测器
class LoopDetector {
private history: string[] = []
add(action: string) {
this.history.push(action)
if (this.history.length > 10) this.history.shift()
}
isLooping(): boolean {
if (this.history.length < 4) return false
const recent = this.history.slice(-2).join('|')
const before = this.history.slice(-4, -2).join('|')
return recent === before
}
}
// 工具定义
const tools = {
read_file: {
declaration: {
name: 'read_file',
description: '读取文件内容',
parameters: {
type: 'object',
properties: { path: { type: 'string' } },
required: ['path']
}
},
needsConfirmation: () => false,
execute: async (args: { path: string }) => {
try {
return { content: await fs.readFile(args.path, 'utf-8') }
} catch (e: any) {
return { error: e.message }
}
}
},
run_command: {
declaration: {
name: 'run_command',
description: '执行命令',
parameters: {
type: 'object',
properties: { command: { type: 'string' } },
required: ['command']
}
},
needsConfirmation: (args: { command: string }) => {
const safe = ['ls', 'pwd', 'cat', 'echo', 'git status', 'git log']
return !safe.some(s => args.command.startsWith(s))
},
execute: async (args: { command: string }) => {
try {
const { stdout, stderr } = await execAsync(args.command)
return { stdout, stderr }
} catch (e: any) {
return { error: e.message }
}
}
},
write_file: {
declaration: {
name: 'write_file',
description: '写入文件',
parameters: {
type: 'object',
properties: {
path: { type: 'string' },
content: { type: 'string' }
},
required: ['path', 'content']
}
},
needsConfirmation: () => true,
execute: async (args: { path: string; content: string }) => {
await fs.writeFile(args.path, args.content)
return { success: true }
}
}
}
// 初始化
const genAI = new GoogleGenerativeAI(process.env.GEMINI_API_KEY!)
const model = genAI.getGenerativeModel({
model: 'gemini-2.5-flash',
tools: [{ functionDeclarations: Object.values(tools).map(t => t.declaration) }]
})
// Agent 主循环
async function runAgent(userMessage: string): Promise<string> {
const chat = model.startChat()
const loopDetector = new LoopDetector()
let response = await chat.sendMessageStream(userMessage)
let turns = 0
while (turns < 15) {
turns++
let functionCalls: any[] = []
// 流式输出
for await (const chunk of response.stream) {
const text = chunk.text()
if (text) process.stdout.write(text)
const calls = chunk.functionCalls()
if (calls) functionCalls.push(...calls)
}
if (!functionCalls.length) {
console.log()
return '完成'
}
// 执行工具
const results: any[] = []
for (const call of functionCalls) {
// 循环检测
loopDetector.add(`${call.name}:${JSON.stringify(call.args)}`)
if (loopDetector.isLooping()) {
return '\n检测到循环,已停止'
}
const tool = tools[call.name as keyof typeof tools]
console.log(`\n[${call.name}] ${JSON.stringify(call.args)}`)
// 确认检查
if (tool.needsConfirmation(call.args as any)) {
if (!(await confirm('执行此操作?'))) {
results.push({
functionResponse: {
name: call.name,
response: { error: '用户取消' }
}
})
continue
}
}
const result = await tool.execute(call.args as any)
results.push({
functionResponse: { name: call.name, response: result }
})
}
response = await chat.sendMessageStream(results)
}
return '达到最大轮次'
}
// 运行
const task = process.argv[2] || '列出当前目录'
console.log(`任务: ${task}\n---`)
runAgent(task).then(console.log)进一步学习
这个增强版 Agent 已经具备了基本功能。要继续深入,可以:
阅读 gemini-cli 源码
packages/core/src/core/client.ts- 完整的 Agent 实现packages/core/src/services/- 各种服务实现
添加更多功能
- 对话压缩
- 会话持久化
- 更多工具
优化体验
- 更好的错误处理
- 进度显示
- 取消操作支持
小结
通过这两个教程,你已经:
- 理解了 Agent 的核心原理
- 实现了一个能工作的 Agent
- 添加了流式输出、确认、循环检测
现在你可以基于这个基础,构建自己的 AI Agent 了。
恭喜完成学习!如有问题,可以: