前言
在微信小程序环境中,由于无法使用标准的 EventSource API,实现服务器发送事件(SSE)流式响应需要特殊的处理方案。本文将详细介绍如何在 UniApp 微信小程序中实现完整的 SSE 流式通信。
核心实现方案
1. SSE 请求类封装
export class HmSSERequest {
private requestTask: UniApp.RequestTask | null = null
private buffer: string = ''
private isConnecting: boolean = false
async connect(config: SSERequestConfig): Promise<UniApp.RequestTask> {
return new Promise((resolve, reject) => {
this.requestTask = uni.request({
url: import.meta.env.VITE_API_BASE_URL + config.url,
data: config.data,
method: config.method || 'POST',
header: {
'Content-Type': 'application/json',
Accept: 'text/event-stream',
'X-DashScope-SSE': 'enable',
Authorization: Bearer xjahb.cn'', // 根据需求设定Token
...config.headers
},
enableChunked: true, // 关键:启用分块传输
responseType: 'arraybuffer', // 关键:使用 arraybuffer 接收数据
success: res => {
config.onComplete?.()
resolve(this.requestTask!)
},
fail: err => {
this.isConnecting = false
config.onError?.(err)
reject(err)
}
}) as any
// 监听数据块
if (this.requestTask && (this.requestTask as any).onChunkReceived) {
;(this.requestTask as any).onChunkReceived((chunk: any) => {
this.processChunk(chunk, config)
})
}
})
}
}
2. 数据块处理关键代码
private processChunk(chunk: any, config: SSERequestConfig) {
let chunkText: string
// 跨平台解码处理
if (typeof TextDecoder !== 'undefined') {
// 浏览器环境
const uint8Array = new Uint8Array(chunk.data)
const textDecoder = new TextDecoder('utf-8')
chunkText = textDecoder.decode(uint8Array)
} else {
// 微信小程序环境使用自定义 UTF-8 解码
const uint8Array = new Uint8Array(chunk.data)
chunkText = this.decodeUtf8(uint8Array)
}
// 将新数据添加到缓冲区
this.buffer += chunkText
// 查找完整的消息边界(以空行分隔的消息)
let lastProcessedIndex = 0
let messageStart = 0
while (messageStart < this.buffer.length) {
const messageEnd = this.buffer.indexOf('\n\n', messageStart)
if (messageEnd === -1) break
const messageText = this.buffer.substring(messageStart, messageEnd)
this.processSingleMessage(messageText, config)
lastProcessedIndex = messageEnd + 2
messageStart = lastProcessedIndex
}
// 更新缓冲区,保留未处理的部分
this.buffer = this.buffer.substring(lastProcessedIndex)
}
3. SSE 消息解析
private processSingleMessage(messageText: string, config: SSERequestConfig) {
const lines = messageText.split('\n')
let currentEvent = ''
const dataLines: string[] = []
for (const line of lines) {
if (line.startsWith('event:')) {
currentEvent = line.substring(6).trim()
} else if (line.startsWith('data:')) {
const dataValue = line.substring(5)
dataLines.push(dataValue)
}
}
if (currentEvent && dataLines.length > 0) {
let parsedData
if (currentEvent === 'content') {
// 对于内容消息,保留所有数据行的原始内容
parsedData = this.processContentData(dataLines)
} else {
// 对于 tool_calls 类型,尝试解析 JSON
try {
parsedData = JSON.parse(dataLines.join(''))
} catch {
parsedData = dataLines.join('')
}
}
const message: SSEMessage = {
type: currentEvent as 'tool_calls' | 'content',
data: parsedData,
raw: messageText
}
config.onMessage(message)
}
}
在 Vue 页面中的使用
1. 初始化 SSE 连接
// SSE 相关
const sseRequest = createSSERequest()
let markdownProcessor: ReturnType<typeof createMarkdownProcessor> | null = null
// 开始流式对话
const startStreamingChat = async (dataInfo:any) => {
return new Promise<void>((resolve, reject) => {
markdownProcessor = createMarkdownProcessor({
onContentUpdate: content => {
console.log('SSE流式content返回内容', content)
},
onToolCallsUpdate: newToolCalls => {
// 更新工具调用信息
console.log('SSE流式思考newToolCalls返回内容', newToolCalls)
},
onStreamEnd: () => {
resolve()
}
})
sseRequest.connect({
url: '后端给的接口地址',
data: dataInfo,
onMessage: (message: SSEMessage) => {
markdownProcessor!.processMessage(message)
},
onError: error => {
// 忽略连接中断的错误
if (error?.errMsg?.includes('abort') || error?.errMsg?.includes('fail')) {
console.log('连接已安全中断')
return
}
// 错误处理
},
onComplete: () => {
markdownProcessor!.flush()
resolve()
}
})
})
}
2. 消息发送与停止控制
// 发送消息
const handleSend = async () => {
await stopStreaming()
// 添加用户消息
msgList.value.push({
role: 'user',
content: userMessage,
imgUrl: userImg
})
// 添加AI消息占位
const aiMessageIndex = msgList.value.push({
role: 'assistant',
content: '',
isBuffering: true,
toolCalls: []
}) - 1
try {
await startStreamingChat(dataInfo)
} catch (error) {
// 错误处理
}
}
// 停止当前流式生成
const stopStreaming = async () => {
// 更新所有消息的缓冲状态
msgList.value.forEach(msg => {
if (msg.role === 'assistant') {
msg.isBuffering = false
if (!msg.content || msg.content.startsWith('正在思考中')) {
msg.content = '生成已中断'
}
}
})
// 停止SSE连接
try {
sseRequest?.forceClose(100)
} catch (error) {
console.log('SSE连接已关闭')
}
clearAllCounters()
// 清理处理器
if (markdownProcessor) {
markdownProcessor.destroy()
markdownProcessor = null
}
}
关键技术要点
1. 微信小程序特有的配置
{
enableChunked: true, // 启用分块传输
responseType: 'arraybuffer', // 使用 arraybuffer
header: {
'Accept': 'text/event-stream',
'X-DashScope-SSE': 'enable' // 阿里云 SSE 特殊头部
}
}
2. 跨平台编码解码
// UTF-8 解码方法(小程序兼容)
private decodeUtf8(uint8Array: Uint8Array): string {
// 处理单字节、双字节、三字节、四字节字符
let result = ''
let i = 0
const length = uint8Array.length
while (i < length) {
const byte1 = uint8Array[i++]
if (byte1 === undefined) break
// 单字节字符 (0xxxxxxx)
if ((byte1 & 0x80) === 0) {
result += String.fromCharCode(byte1)
continue
}
// 两字节字符 (110xxxxx 10xxxxxx)
if ((byte1 & 0xe0) === 0xc0) {
if (i >= length) break
const byte2Value = uint8Array[i++]
if (byte2Value === undefined) break
const byte2 = byte2Value & 0x3f
result += String.fromCharCode(((byte1 & 0x1f) << 6) | byte2)
continue
}
// 三字节字符 (1110xxxx 10xxxxxx 10xxxxxx)
if ((byte1 & 0xf0) === 0xe0) {
if (i + 1 >= length) break
const byte2Value = uint8Array[i++]
const byte3Value = uint8Array[i++]
if (byte2Value === undefined || byte3Value === undefined) break
const byte2 = byte2Value & 0x3f
const byte3 = byte3Value & 0x3f
result += String.fromCharCode(
((byte1 & 0x0f) << 12) | (byte2 << 6) | byte3
)
continue
}
// 四字节字符 (11110xxx 10xxxxxx 10xxxxxx 10xxxxxx)
if ((byte1 & 0xf8) === 0xf0) {
if (i + 2 >= length) break
const byte2Value = uint8Array[i++]
const byte3Value = uint8Array[i++]
const byte4Value = uint8Array[i++]
if (
byte2Value === undefined ||
byte3Value === undefined ||
byte4Value === undefined
)
break
const byte2 = byte2Value & 0x3f
const byte3 = byte3Value & 0x3f
const byte4 = byte4Value & 0x3f
let codepoint =
((byte1 & 0x07) << 18) | (byte2 << 12) | (byte3 << 6) | byte4
codepoint -= 0x10000
result += String.fromCharCode(
(codepoint >> 10) + 0xd800,
(codepoint & 0x3ff) + 0xdc00
)
}
}
return result
}
3. 连接管理
// 强制关闭连接(带超时保护)
forceClose(timeoutMs: number = 5000) {
return new Promise<void>(resolve => {
const timeoutId = setTimeout(() => {
this.cleanup()
resolve()
}, timeoutMs)
try {
this.close()
clearTimeout(timeoutId)
resolve()
} catch (error) {
this.cleanup()
resolve()
}
})
}
UI 交互优化
1. 实时 Markdown 渲染
export class MarkdownStreamProcessor {
private renderTimer: number | null = null
private isStreaming: boolean = false
private currentContent: string = ''
processMessage(message: SSEMessage) {
if (!this.isStreaming) {
this.isStreaming = true
this.config.onStreamStart?.()
}
switch (message.type) {
case 'tool_calls':
this.handleToolCalls(message.data)
break
case 'content':
this.handleContent(message.data)
break
}
}
private scheduleRender(content: string) {
if (this.renderTimer) {
clearTimeout(this.renderTimer)
}
// 使用极短的防抖时间(10ms)确保实时性
this.renderTimer = setTimeout(() => {
const processedContent = content.trim()
this.config.onContentUpdate(processedContent)
}, 10) as unknown as number
}
}
总结
这个解决方案成功在 UniApp 微信小程序环境中实现了完整的 SSE 流式响应功能,主要特点包括:
-
跨平台兼容:同时支持浏览器和微信小程序环境
-
完整的连接管理:包括连接建立、消息处理、错误处理和连接关闭
-
实时性优化:通过极短的防抖时间确保内容实时更新
-
用户体验优化:智能滚动控制、加载状态显示、中断处理等
-
类型安全:完整的 TypeScript 类型定义
这套方案已经在实际项目中验证稳定可靠,可以作为在微信小程序中实现流式通信的参考实现。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END




暂无评论内容