uniapp在微信小程序中实现 SSE 流式响应

前言

        在微信小程序环境中,由于无法使用标准的 EventSource API,实现服务器发送事件(SSE)流式响应需要特殊的处理方案。本文将详细介绍如何在 UniApp 微信小程序中实现完整的 SSE 流式通信。

核心实现方案

1. SSE 请求类封装

export class HmSSERequest {
  private requestTask: UniApp.RequestTask | null = null
  private buffer: string = ''
  private isConnecting: boolean = false

  async connect(config: SSERequestConfig): Promise<UniApp.RequestTask> {
    return new Promise((resolve, reject) => {
      this.requestTask = uni.request({
        url: import.meta.env.VITE_API_BASE_URL + config.url,
        data: config.data,
        method: config.method || 'POST',
        header: {
          'Content-Type': 'application/json',
          Accept: 'text/event-stream',
          'X-DashScope-SSE': 'enable',
          Authorization: Bearer xjahb.cn'',  // 根据需求设定Token
          ...config.headers
        },
        enableChunked: true, // 关键:启用分块传输
        responseType: 'arraybuffer', // 关键:使用 arraybuffer 接收数据
        success: res => {
          config.onComplete?.()
          resolve(this.requestTask!)
        },
        fail: err => {
          this.isConnecting = false
          config.onError?.(err)
          reject(err)
        }
      }) as any

      // 监听数据块
      if (this.requestTask && (this.requestTask as any).onChunkReceived) {
        ;(this.requestTask as any).onChunkReceived((chunk: any) => {
          this.processChunk(chunk, config)
        })
      }
    })
  }
}

2. 数据块处理关键代码

private processChunk(chunk: any, config: SSERequestConfig) {
  let chunkText: string

  // 跨平台解码处理
  if (typeof TextDecoder !== 'undefined') {
    // 浏览器环境
    const uint8Array = new Uint8Array(chunk.data)
    const textDecoder = new TextDecoder('utf-8')
    chunkText = textDecoder.decode(uint8Array)
  } else {
    // 微信小程序环境使用自定义 UTF-8 解码
    const uint8Array = new Uint8Array(chunk.data)
    chunkText = this.decodeUtf8(uint8Array)
  }

  // 将新数据添加到缓冲区
  this.buffer += chunkText

  // 查找完整的消息边界(以空行分隔的消息)
  let lastProcessedIndex = 0
  let messageStart = 0

  while (messageStart < this.buffer.length) {
    const messageEnd = this.buffer.indexOf('\n\n', messageStart)
    if (messageEnd === -1) break

    const messageText = this.buffer.substring(messageStart, messageEnd)
    this.processSingleMessage(messageText, config)

    lastProcessedIndex = messageEnd + 2
    messageStart = lastProcessedIndex
  }

  // 更新缓冲区,保留未处理的部分
  this.buffer = this.buffer.substring(lastProcessedIndex)
}

3. SSE 消息解析

private processSingleMessage(messageText: string, config: SSERequestConfig) {
  const lines = messageText.split('\n')
  let currentEvent = ''
  const dataLines: string[] = []

  for (const line of lines) {
    if (line.startsWith('event:')) {
      currentEvent = line.substring(6).trim()
    } else if (line.startsWith('data:')) {
      const dataValue = line.substring(5)
      dataLines.push(dataValue)
    }
  }

  if (currentEvent && dataLines.length > 0) {
    let parsedData
    if (currentEvent === 'content') {
      // 对于内容消息,保留所有数据行的原始内容
      parsedData = this.processContentData(dataLines)
    } else {
      // 对于 tool_calls 类型,尝试解析 JSON
      try {
        parsedData = JSON.parse(dataLines.join(''))
      } catch {
        parsedData = dataLines.join('')
      }
    }

    const message: SSEMessage = {
      type: currentEvent as 'tool_calls' | 'content',
      data: parsedData,
      raw: messageText
    }

    config.onMessage(message)
  }
}

在 Vue 页面中的使用

1. 初始化 SSE 连接

// SSE 相关
const sseRequest = createSSERequest()
let markdownProcessor: ReturnType<typeof createMarkdownProcessor> | null = null

// 开始流式对话
const startStreamingChat = async (dataInfo:any) => {
  return new Promise<void>((resolve, reject) => {
    markdownProcessor = createMarkdownProcessor({
      onContentUpdate: content => {
        console.log('SSE流式content返回内容', content)
      },
      onToolCallsUpdate: newToolCalls => {
        // 更新工具调用信息
        console.log('SSE流式思考newToolCalls返回内容', newToolCalls)
      },
      onStreamEnd: () => {
        resolve()
      }
    })

    sseRequest.connect({
      url: '后端给的接口地址',
      data: dataInfo,
      onMessage: (message: SSEMessage) => {
        markdownProcessor!.processMessage(message)
      },
      onError: error => {
        // 忽略连接中断的错误
        if (error?.errMsg?.includes('abort') || error?.errMsg?.includes('fail')) {
          console.log('连接已安全中断')
          return
        }
        // 错误处理
      },
      onComplete: () => {
        markdownProcessor!.flush()
        resolve()
      }
    })
  })
}

2. 消息发送与停止控制

// 发送消息
const handleSend = async () => {
  await stopStreaming()

  // 添加用户消息
  msgList.value.push({
    role: 'user',
    content: userMessage,
    imgUrl: userImg
  })

  // 添加AI消息占位
  const aiMessageIndex = msgList.value.push({
    role: 'assistant',
    content: '',
    isBuffering: true,
    toolCalls: []
  }) - 1

  try {
    await startStreamingChat(dataInfo)
  } catch (error) {
    // 错误处理
  }
}

// 停止当前流式生成
const stopStreaming = async () => {
  // 更新所有消息的缓冲状态
  msgList.value.forEach(msg => {
    if (msg.role === 'assistant') {
      msg.isBuffering = false
      if (!msg.content || msg.content.startsWith('正在思考中')) {
        msg.content = '生成已中断'
      }
    }
  })

  // 停止SSE连接
  try {
    sseRequest?.forceClose(100)
  } catch (error) {
    console.log('SSE连接已关闭')
  }

  clearAllCounters()
  // 清理处理器
  if (markdownProcessor) {
    markdownProcessor.destroy()
    markdownProcessor = null
  }
}

关键技术要点

1. 微信小程序特有的配置

{
  enableChunked: true, // 启用分块传输
  responseType: 'arraybuffer', // 使用 arraybuffer
  header: {
    'Accept': 'text/event-stream',
    'X-DashScope-SSE': 'enable' // 阿里云 SSE 特殊头部
  }
}

2. 跨平台编码解码

// UTF-8 解码方法(小程序兼容)
private decodeUtf8(uint8Array: Uint8Array): string {
  // 处理单字节、双字节、三字节、四字节字符
    let result = ''
    let i = 0
    const length = uint8Array.length

    while (i < length) {
      const byte1 = uint8Array[i++]
      if (byte1 === undefined) break

      // 单字节字符 (0xxxxxxx)
      if ((byte1 & 0x80) === 0) {
        result += String.fromCharCode(byte1)
        continue
      }

      // 两字节字符 (110xxxxx 10xxxxxx)
      if ((byte1 & 0xe0) === 0xc0) {
        if (i >= length) break
        const byte2Value = uint8Array[i++]
        if (byte2Value === undefined) break
        const byte2 = byte2Value & 0x3f
        result += String.fromCharCode(((byte1 & 0x1f) << 6) | byte2)
        continue
      }

      // 三字节字符 (1110xxxx 10xxxxxx 10xxxxxx)
      if ((byte1 & 0xf0) === 0xe0) {
        if (i + 1 >= length) break
        const byte2Value = uint8Array[i++]
        const byte3Value = uint8Array[i++]
        if (byte2Value === undefined || byte3Value === undefined) break
        const byte2 = byte2Value & 0x3f
        const byte3 = byte3Value & 0x3f
        result += String.fromCharCode(
          ((byte1 & 0x0f) << 12) | (byte2 << 6) | byte3
        )
        continue
      }

      // 四字节字符 (11110xxx 10xxxxxx 10xxxxxx 10xxxxxx)
      if ((byte1 & 0xf8) === 0xf0) {
        if (i + 2 >= length) break
        const byte2Value = uint8Array[i++]
        const byte3Value = uint8Array[i++]
        const byte4Value = uint8Array[i++]
        if (
          byte2Value === undefined ||
          byte3Value === undefined ||
          byte4Value === undefined
        )
          break
        const byte2 = byte2Value & 0x3f
        const byte3 = byte3Value & 0x3f
        const byte4 = byte4Value & 0x3f
        let codepoint =
          ((byte1 & 0x07) << 18) | (byte2 << 12) | (byte3 << 6) | byte4
        codepoint -= 0x10000
        result += String.fromCharCode(
          (codepoint >> 10) + 0xd800,
          (codepoint & 0x3ff) + 0xdc00
        )
      }
    }

    return result
}

3. 连接管理

// 强制关闭连接(带超时保护)
forceClose(timeoutMs: number = 5000) {
  return new Promise<void>(resolve => {
    const timeoutId = setTimeout(() => {
      this.cleanup()
      resolve()
    }, timeoutMs)

    try {
      this.close()
      clearTimeout(timeoutId)
      resolve()
    } catch (error) {
      this.cleanup()
      resolve()
    }
  })
}

UI 交互优化

1. 实时 Markdown 渲染

export class MarkdownStreamProcessor {
  private renderTimer: number | null = null
  private isStreaming: boolean = false
  private currentContent: string = ''

  processMessage(message: SSEMessage) {
    if (!this.isStreaming) {
      this.isStreaming = true
      this.config.onStreamStart?.()
    }

    switch (message.type) {
      case 'tool_calls':
        this.handleToolCalls(message.data)
        break
      case 'content':
        this.handleContent(message.data)
        break
    }
  }

  private scheduleRender(content: string) {
    if (this.renderTimer) {
      clearTimeout(this.renderTimer)
    }

    // 使用极短的防抖时间(10ms)确保实时性
    this.renderTimer = setTimeout(() => {
      const processedContent = content.trim()
      this.config.onContentUpdate(processedContent)
    }, 10) as unknown as number
  }
}

总结

这个解决方案成功在 UniApp 微信小程序环境中实现了完整的 SSE 流式响应功能,主要特点包括:

  1. 跨平台兼容:同时支持浏览器和微信小程序环境

  2. 完整的连接管理:包括连接建立、消息处理、错误处理和连接关闭

  3. 实时性优化:通过极短的防抖时间确保内容实时更新

  4. 用户体验优化:智能滚动控制、加载状态显示、中断处理等

  5. 类型安全:完整的 TypeScript 类型定义

这套方案已经在实际项目中验证稳定可靠,可以作为在微信小程序中实现流式通信的参考实现。

 

© 版权声明
THE END
喜欢就支持一下吧
点赞10 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容