Skip to content

扩展功能

概述

基于迷你 Agent,添加更实用的功能:

  1. 流式输出
  2. 用户确认
  3. 循环检测

1. 流式输出

让用户实时看到 Agent 的思考过程。

typescript
async function runAgentStream(userMessage: string) {
  const chat = model.startChat()

  // 使用流式 API
  const result = await chat.sendMessageStream(userMessage)

  for await (const chunk of result.stream) {
    // 实时输出文本
    const text = chunk.text()
    if (text) {
      process.stdout.write(text)
    }

    // 检查工具调用
    const functionCalls = chunk.functionCalls()
    if (functionCalls?.length) {
      // 处理工具调用...
    }
  }
}

完整的流式实现

typescript
async function runAgentWithStreaming(userMessage: string): Promise<void> {
  const chat = model.startChat()
  let pendingFunctionCalls: any[] = []

  async function processStream(stream: AsyncIterable<any>) {
    let fullText = ''

    for await (const chunk of stream) {
      // 实时输出文本
      const text = chunk.text()
      if (text) {
        process.stdout.write(text)
        fullText += text
      }

      // 收集工具调用
      const calls = chunk.functionCalls()
      if (calls) {
        pendingFunctionCalls.push(...calls)
      }
    }

    return { text: fullText, functionCalls: pendingFunctionCalls }
  }

  let response = await chat.sendMessageStream(userMessage)
  let turns = 0

  while (turns < 10) {
    turns++
    pendingFunctionCalls = []

    const { functionCalls } = await processStream(response.stream)

    if (!functionCalls.length) {
      console.log() // 换行
      return
    }

    // 执行工具
    const results = []
    for (const call of functionCalls) {
      console.log(`\n[执行 ${call.name}]`)
      const tool = tools[call.name as keyof typeof tools]
      const result = await tool.execute(call.args as any)
      results.push({
        functionResponse: { name: call.name, response: result }
      })
    }

    response = await chat.sendMessageStream(results)
  }
}

2. 用户确认

危险操作前请求用户确认。

typescript
import * as readline from 'readline'

// 简单的确认提示
async function confirm(message: string): Promise<boolean> {
  const rl = readline.createInterface({
    input: process.stdin,
    output: process.stdout
  })

  return new Promise(resolve => {
    rl.question(`${message} (y/n) `, answer => {
      rl.close()
      resolve(answer.toLowerCase() === 'y')
    })
  })
}

// 改进的工具定义
const tools = {
  run_command: {
    declaration: { /* ... */ },

    // 判断是否需要确认
    needsConfirmation: (args: { command: string }) => {
      const safeCommands = ['ls', 'pwd', 'cat', 'echo', 'which']
      const firstWord = args.command.split(' ')[0]
      return !safeCommands.includes(firstWord)
    },

    execute: async (args: { command: string }) => {
      // 执行逻辑...
    }
  }
}

// 在 Agent 循环中添加确认
async function executeToolWithConfirmation(call: FunctionCall) {
  const tool = tools[call.name as keyof typeof tools]

  // 检查是否需要确认
  if (tool.needsConfirmation?.(call.args as any)) {
    console.log(`\n准备执行: ${call.name}`)
    console.log(`参数: ${JSON.stringify(call.args)}`)

    const approved = await confirm('是否继续?')
    if (!approved) {
      return { error: '用户取消了操作' }
    }
  }

  return tool.execute(call.args as any)
}

3. 循环检测

防止 Agent 陷入无限循环。

typescript
class LoopDetector {
  private history: string[] = []
  private maxHistory = 10

  addAction(action: string) {
    this.history.push(action)
    if (this.history.length > this.maxHistory) {
      this.history.shift()
    }
  }

  isLooping(): boolean {
    if (this.history.length < 4) return false

    // 检查最近 2 次是否与之前 2 次相同
    const recent = this.history.slice(-2).join('|')
    const before = this.history.slice(-4, -2).join('|')

    return recent === before
  }

  reset() {
    this.history = []
  }
}

// 在 Agent 循环中使用
const loopDetector = new LoopDetector()

for (const call of functionCalls) {
  const actionKey = `${call.name}:${JSON.stringify(call.args)}`
  loopDetector.addAction(actionKey)

  if (loopDetector.isLooping()) {
    console.log('\n检测到循环,中断执行')
    return '检测到重复操作,已停止'
  }

  // 执行工具...
}

完整增强版 Agent

typescript
// enhanced-agent.ts
import { GoogleGenerativeAI } from '@google/generative-ai'
import * as fs from 'fs/promises'
import { exec } from 'child_process'
import { promisify } from 'util'
import * as readline from 'readline'

const execAsync = promisify(exec)

// 确认提示
async function confirm(message: string): Promise<boolean> {
  const rl = readline.createInterface({
    input: process.stdin,
    output: process.stdout
  })
  return new Promise(resolve => {
    rl.question(`${message} (y/n) `, answer => {
      rl.close()
      resolve(answer.toLowerCase() === 'y')
    })
  })
}

// 循环检测器
class LoopDetector {
  private history: string[] = []

  add(action: string) {
    this.history.push(action)
    if (this.history.length > 10) this.history.shift()
  }

  isLooping(): boolean {
    if (this.history.length < 4) return false
    const recent = this.history.slice(-2).join('|')
    const before = this.history.slice(-4, -2).join('|')
    return recent === before
  }
}

// 工具定义
const tools = {
  read_file: {
    declaration: {
      name: 'read_file',
      description: '读取文件内容',
      parameters: {
        type: 'object',
        properties: { path: { type: 'string' } },
        required: ['path']
      }
    },
    needsConfirmation: () => false,
    execute: async (args: { path: string }) => {
      try {
        return { content: await fs.readFile(args.path, 'utf-8') }
      } catch (e: any) {
        return { error: e.message }
      }
    }
  },

  run_command: {
    declaration: {
      name: 'run_command',
      description: '执行命令',
      parameters: {
        type: 'object',
        properties: { command: { type: 'string' } },
        required: ['command']
      }
    },
    needsConfirmation: (args: { command: string }) => {
      const safe = ['ls', 'pwd', 'cat', 'echo', 'git status', 'git log']
      return !safe.some(s => args.command.startsWith(s))
    },
    execute: async (args: { command: string }) => {
      try {
        const { stdout, stderr } = await execAsync(args.command)
        return { stdout, stderr }
      } catch (e: any) {
        return { error: e.message }
      }
    }
  },

  write_file: {
    declaration: {
      name: 'write_file',
      description: '写入文件',
      parameters: {
        type: 'object',
        properties: {
          path: { type: 'string' },
          content: { type: 'string' }
        },
        required: ['path', 'content']
      }
    },
    needsConfirmation: () => true,
    execute: async (args: { path: string; content: string }) => {
      await fs.writeFile(args.path, args.content)
      return { success: true }
    }
  }
}

// 初始化
const genAI = new GoogleGenerativeAI(process.env.GEMINI_API_KEY!)
const model = genAI.getGenerativeModel({
  model: 'gemini-2.5-flash',
  tools: [{ functionDeclarations: Object.values(tools).map(t => t.declaration) }]
})

// Agent 主循环
async function runAgent(userMessage: string): Promise<string> {
  const chat = model.startChat()
  const loopDetector = new LoopDetector()

  let response = await chat.sendMessageStream(userMessage)
  let turns = 0

  while (turns < 15) {
    turns++
    let functionCalls: any[] = []

    // 流式输出
    for await (const chunk of response.stream) {
      const text = chunk.text()
      if (text) process.stdout.write(text)

      const calls = chunk.functionCalls()
      if (calls) functionCalls.push(...calls)
    }

    if (!functionCalls.length) {
      console.log()
      return '完成'
    }

    // 执行工具
    const results: any[] = []

    for (const call of functionCalls) {
      // 循环检测
      loopDetector.add(`${call.name}:${JSON.stringify(call.args)}`)
      if (loopDetector.isLooping()) {
        return '\n检测到循环,已停止'
      }

      const tool = tools[call.name as keyof typeof tools]
      console.log(`\n[${call.name}] ${JSON.stringify(call.args)}`)

      // 确认检查
      if (tool.needsConfirmation(call.args as any)) {
        if (!(await confirm('执行此操作?'))) {
          results.push({
            functionResponse: {
              name: call.name,
              response: { error: '用户取消' }
            }
          })
          continue
        }
      }

      const result = await tool.execute(call.args as any)
      results.push({
        functionResponse: { name: call.name, response: result }
      })
    }

    response = await chat.sendMessageStream(results)
  }

  return '达到最大轮次'
}

// 运行
const task = process.argv[2] || '列出当前目录'
console.log(`任务: ${task}\n---`)
runAgent(task).then(console.log)

进一步学习

这个增强版 Agent 已经具备了基本功能。要继续深入,可以:

  1. 阅读 gemini-cli 源码

    • packages/core/src/core/client.ts - 完整的 Agent 实现
    • packages/core/src/services/ - 各种服务实现
  2. 添加更多功能

    • 对话压缩
    • 会话持久化
    • 更多工具
  3. 优化体验

    • 更好的错误处理
    • 进度显示
    • 取消操作支持

小结

通过这两个教程,你已经:

  • 理解了 Agent 的核心原理
  • 实现了一个能工作的 Agent
  • 添加了流式输出、确认、循环检测

现在你可以基于这个基础,构建自己的 AI Agent 了。


恭喜完成学习!如有问题,可以:

通过实际源码学习 AI Agent 开发