本文最后更新于:2024年8月21日 下午
背景 Poe 是一个 AI 聊天机器人,它支持多种 AI 模型,包括 GPT-4o、Claude 3.5 Sonnet、Gemini Pro 等。还支持各种类型的 Bot,其中 Server Bot 是最自由的,可以自己编写 Bot 的逻辑。但是,Poe 的 Server Bot 官方仅支持 Python,而吾辈更喜欢 JavaScript,所以研究了一下怎么实现。
初始化项目 一开始使用 express 实现服务端,但后面发现 express 无法部署到 edge runtime,例如 Cloudflare Workers,所以改用 hono.js 实现。
首先使用 hono.js 创建一个项目并选择 cloudflare-workers 模板
1 2 3 pnpm create hono@latest ? Target directory hono-demo ? Which template do you want to use? cloudflare-workers
src/index.ts 是项目入口,内容如下
1 2 3 4 5 6 7 import { Hono } from 'hono' const app = new Hono () app.get ('/' , (c ) => c.text ('Hello, world!' ))export default app
首先在 Poe 网站上创建 Server Bot ,得到一个 Name
和 Access Key
。
协议分析 根据 Poe 协议规范 ,实现一个 Bot 需要实现一个特定的 post 请求,具体如下
API 接口传入的参数会有两个固定字段,type 是请求类型,version 是协议版本。
1 2 3 4 type BotRequest = { version : string type : 'query' | 'settings' | 'report_feedback' | 'report_error' }
其中 query 和 settings 是必须实现的,report_feedback 和 report_error 是可选的。
下面来实现一个基本的 post 请求结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 app.post ('/' , async (c) => { const request = await c.req .json () const { version, type } = request function handleQuery (c: Context ) { throw new Error ('Not implemented' ) } function handleSettings (c: Context ) { throw new Error ('Not implemented' ) } switch (type ) { case 'query' : return handleQuery (c) case 'settings' : return handleSettings (c) default : throw new Error ('Invalid request type' ) } })
Settings 请求 Settings 请求没有额外的参数,仅要求返回这个 bot 相关的一些设置。
1 2 3 4 5 6 7 8 9 interface SettingsResponse { server_bot_dependencies?: Record <string , number > allow_attachments?: boolean introduction_message?: string expand_text_attachments?: boolean enable_image_comprehension?: boolean enforce_author_role_alternation?: boolean enable_multi_bot_chat_prompting?: boolean }
例如,实现上面的 settings 请求,这是一个简单的响应
1 2 3 4 5 6 7 8 function handleSettings (c: Context ) { return c.json ({ server_bot_dependencies : { 'GPT-4o' : 1 , }, introduction_message : 'Hello, I am a server bot.' , }) }
参考 https://creator.poe.com/docs/poe-protocol-specification#settings
Query 请求 query 是关键部分,不管是请求还是响应都很复杂。
下面是请求的类型定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 type Identifier = string type ContentType = 'text/markdown' | 'text/plain' type FeedbackType = 'like' | 'dislike' interface MessageFeedback { type : FeedbackType reason?: string }interface Attachment { url : string content_type : string name : string parsed_content?: string }interface ProtocolMessage { role : 'system' | 'user' | 'bot' content : string content_type : ContentType timestamp : number message_id : Identifier feedback : MessageFeedback [] attachments : Attachment [] }interface QueryRequest { query : ProtocolMessage [] user_id : Identifier conversation_id : Identifier message_id : Identifier access_key : string temperature?: number skip_system_prompt?: boolean logit_bias?: Record <string , number > stop_sequences?: string [] language_code?: string }
响应要求返回 SSE 流式响应多条消息,具体也有很多类型
meta 类型,应该返回的第一条消息,主要是用来声明一些设置
1 2 3 4 5 6 7 interface MetaMessage { event : 'meta' data : { content_type?: 'text/markdown' | 'text/plain' suggested_replies?: boolean } }
接下来是两种消息类型,区别只在于是否替换之前已经发送的消息
1 2 3 4 5 6 7 8 9 10 11 12 interface TextMessage { event : 'text' data : { text : string } }interface ReplaceMessage { event : 'replace_response' data : { text : string } }
还有两种特殊格式的消息,一种是用来返回 JSON 数据(通常给 OpenAI 这种支持函数调用的 Bot 使用),另一种是建议回复的消息,这会出现在回复消息的下方。
1 2 3 4 5 6 7 8 9 10 interface JsonMessage { event : 'json' data : Record <string , any > }interface SuggestedReplyMessage { event : 'suggested_reply' data : { text : string } }
最后是结束和错误消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 interface ErrorMessage { event : 'error' data : { allow_retry : boolean text : string raw_response : string error_type : string } }interface DoneMessage { event : 'done' data : {} }
现在来实现一个简单的 query 请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 function handleQuery (c: Context ) { return streamSSE (c, async (stream) => { const writeSSE = ( message: | MetaMessage | TextMessage | ReplaceMessage | JsonMessage | SuggestedReplyMessage | ErrorMessage | DoneMessage, ) => { stream.writeSSE ({ event : message.event , data : JSON .stringify (message.data ), }) } writeSSE ({ event : 'meta' , data : { content_type : 'text/markdown' , suggested_replies : true }, }) writeSSE ({ event : 'text' , data : { text : 'Hello, World!' } }) writeSSE ({ event : 'done' , data : {} }) }) }
参考:https://creator.poe.com/docs/poe-protocol-specification#query
发布到 Cloudflare Workers 现在发布到 Cloudflare Workers 上,得到一个 URL,例如 https://xxx.workers.dev
。
现在在 Poe 网站上填写 Server URL,然后点击 Run check,如果成功,继续创建 Bot 就可以在 Poe 上使用了。
验证请求 根据 Poe 官方的建议,还应该为 post 请求添加验证,确定是来自 Poe 的请求。首先添加环境变量
1 2 3 ACCESS_KEY="<YOUR_ACCESS_KEY>" echo ACCESS_KEY=\"$ACCESS_KEY \" > .dev.vars echo $ACCESS_KEY | pnpm wrangler secret put ACCESS_KEY
然后在 src/index.ts 中添加验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type Bindings = { ACCESS_KEY : string }const app = new Hono <{ Bindings : Bindings }>() app.post ('/' , async (c) => { const request = await c.req .json () const authHeader = c.req .header ().authorization if (authHeader !== `Bearer ${c.env.ACCESS_KEY} ` ) { return c.text ('Unauthorized' , 401 ) } })
主动调用 Bot 除了可以被 Poe 调用,也可以主动调用 Poe 的 API 来实现一些功能,下面介绍其中两个。
刷新 Bot Settings 修改 Bot Settings 的实现后,还需要主动通知 Poe 调用接口刷新设置。
例如修改了 handleSettings 函数,更新了 server_bot_dependencies
,不再使用 GPT-4o,而是使用 Claude-3.5-Sonnet
。
1 2 3 4 5 6 7 function handleSettings (c: Context ) { return c.json ({ server_bot_dependencies : { 'Claude-3.5-Sonnet' : 1 , }, }) }
然后在项目初始化的时候主动通知 Poe 刷新设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 async function syncBotSettings ( botName: string , accessKey: string = '' , ): Promise <void > { const PROTOCOL_VERSION = '1.0' const baseUrl = 'https://api.poe.com/bot/fetch_settings' const resp = await fetch ( `${baseUrl} /${botName} /${accessKey} /${PROTOCOL_VERSION} ` , { method : 'post' }, ) const text = await resp.text () if (!resp.ok ) { throw new Error (`Error fetching settings for bot ${botName} : ${text} ` ) } console .log (text) } app.get ('/sync-bot-settings' , async (c) => { await syncBotSettings ('BotT6R4NKNGZ9' , c.env .ACCESS_KEY ) return c.text ('Synced' ) })
然后在浏览器中直接访问这个 URL 就可以通知 Poe 刷新设置了。
参考:https://creator.poe.com/docs/server-bots-functional-guides#3-make-a-post-request-to-poes-refetch-settings-endpoint-with-your-bot-name-and-access-key
调用其他 Bot 接下来,说明如何调用第三方的 Bot,这里仅以文本 => 文本的 Bot 为例(除此之外,现在的 Bot 还支持附件文件、图片、音视频等)。遗憾的是,官方文档没有记录 API 接口,只说明了使用 python 模块 fastapi_poe 来实现,所以只能分析 fastapi_poe 的源码来实现。
在其中可以找到关键接口 https://api.poe.com/bot/<botName>
,然后接口会以 SSE 流式响应多条消息,先做个测试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 async function * requestStream ( request : QueryRequest , botName : string , accessKey : string , ): AsyncGenerator <{ event : 'text' data : any }> { const response = await fetch (`https://api.poe.com/bot/${botName} ` , { method : 'post' , headers : { 'Content-Type' : 'application/json' , Authorization : `Bearer ${accessKey} ` , Accept : 'text/event-stream' , 'Cache-Control' : 'no-cache' , }, body : JSON .stringify (request), }) if (!response.ok || !response.body ) { console .error (response.statusText ) throw new Error (`HTTP error! status: ${response.status} ` ) } const reader = response.body !.pipeThrough (new TextDecoderStream ()).getReader () let chunk = await reader.read () while (!chunk.done ) { console .log ('chunk: ' , chunk.value ) chunk = await reader.read () } }function handleQuery (c: Context<{ Bindings: Bindings }> ) { return streamSSE (c, async (stream) => { const writeSSE = ( message: | MetaMessage | TextMessage | ReplaceMessage | JsonMessage | SuggestedReplyMessage | ErrorMessage | DoneMessage, ) => { stream.writeSSE ({ event : message.event , data : JSON .stringify (message.data ), }) } writeSSE ({ event : 'meta' , data : { content_type : 'text/markdown' , suggested_replies : true }, }) for await (const chunk of requestStream ( request, 'GPT-4o' , c.env .ACCESS_KEY , )) { if (chunk.event === 'text' ) { writeSSE (chunk) } } writeSSE ({ event : 'done' , data : {} }) }) }
终端打印的结果。可以消息分为三种:
text: 普通文本消息,但可能会被拆分成多个 chunk 返回
done: 结束消息
空消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 chunk: event: text data: {"text" : chunk: "" } event: text data: {"text" : "Hello" } chunk: event: text data: {"text" : chunk: "!" } event: text data: {"text" : " How" } chunk: event: text data: {"text" : chunk: " can" } event: text data: {"text" : " I" } chunk: event: text data: {"text" : chunk: " assist" } event: text data: {"text" : " you" } chunk: event: text data: {"text" : chunk: " today" } event: text data: {"text" : "?" } chunk: event: text data: {"text" : chunk: "" } chunk: event: done data: {} chunk:
因此需要实现一个 TransformStream 来将 SSE 文本流转换为结构化的数据,并处理 text 多条消息合并。实现本身并不复杂,但主要的问题是多个模型拆分规则可能规则会不一致,例如 GPT-4o chunk 中可能包含一条完整消息,也可能不包含,而 Claude 3.5 Sonnet 中则总是由两个 chunk 组成一条完整消息。还有一些模型会返回 ping 消息,而且 ping 消息的格式也略有不同,像是 ping
或 : ping
等。
下面是 GPT-4o 的消息示例
1 2 3 4 ;[ 'event: text\r\ndata: {"text": ' , '""}\r\n\r\nevent: text\r\ndata: {"text": "Hi"}\r\n\r\n' , ]
Claude 3.5 Sonnet 的消息示例
1 ;['event: text\r\ndata: {"text": ' , '"Hello"}\r\n\r\n' ]
所以实现的 TransformStream 如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 function sseTransformStream ( ) { let buffer = '' return new TransformStream < string , { event : 'text' data : any } >({ transform (chunk: string , controller: TransformStreamDefaultController ) { buffer += chunk const textRegexp = /^event: text[\r\n]+data: ({[\s\S]*?})/ const doneRegexp = /^event: done[\r\n]+data: ({})/ while (buffer) { buffer = buffer.trimStart () if (textRegexp.test (buffer)) { const match = buffer.match (textRegexp) if (match) { controller.enqueue ({ event : 'text' , data : JSON .parse (match[1 ]), }) buffer = buffer.replace (match[0 ], '' ).trimStart () } } else if (doneRegexp.test (buffer)) { const match = buffer.match (doneRegexp) if (match) { buffer = buffer.replace (match[0 ], '' ).trimStart () controller.terminate () return } } else if (/^ping$/m .test (buffer)) { const match = buffer.match (/^ping$/m ) if (match) { buffer = buffer.replace (match[0 ], '' ).trimStart () } } else if (buffer.trim () === ': ping' ) { buffer = buffer.replace (': ping' , '' ).trimStart () } else { return } } }, flush ( ) { if (buffer.trim ()) { console .warn ('Unprocessed data in buffer:' , buffer) } }, }) }
修改 requestStream 来使用这个 TransformStream
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 async function * requestStream ( request : QueryRequest , botName : string , accessKey : string , ): AsyncGenerator <string > { const reader = response .body !.pipeThrough (new TextDecoderStream ()) .pipeThrough (sseTransformStream ()) .getReader () let chunk = await reader.read () while (!chunk.done ) { yield chunk.value chunk = await reader.read () } }
完整代码
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 import { Context , Hono } from 'hono' import { streamSSE } from 'hono/streaming' interface SettingsResponse { server_bot_dependencies?: Record <string , number > allow_attachments?: boolean introduction_message?: string expand_text_attachments?: boolean enable_image_comprehension?: boolean enforce_author_role_alternation?: boolean enable_multi_bot_chat_prompting?: boolean }type Identifier = string type ContentType = 'text/markdown' | 'text/plain' type FeedbackType = 'like' | 'dislike' interface MessageFeedback { type : FeedbackType reason?: string }interface Attachment { url : string content_type : string name : string parsed_content?: string }interface ProtocolMessage { role : 'system' | 'user' | 'bot' content : string content_type : ContentType timestamp : number message_id : Identifier feedback : MessageFeedback [] attachments : Attachment [] }interface QueryRequest { query : ProtocolMessage [] user_id : Identifier conversation_id : Identifier message_id : Identifier access_key : string temperature?: number skip_system_prompt?: boolean logit_bias?: Record <string , number > stop_sequences?: string [] language_code?: string }interface MetaMessage { event : 'meta' data : { content_type?: 'text/markdown' | 'text/plain' suggested_replies?: boolean } }interface TextMessage { event : 'text' data : { text : string } }interface ReplaceMessage { event : 'replace_response' data : { text : string } }interface JsonMessage { event : 'json' data : Record <string , any > }interface SuggestedReplyMessage { event : 'suggested_reply' data : { text : string } }interface ErrorMessage { event : 'error' data : { allow_retry : boolean text : string raw_response : string error_type : string } }interface DoneMessage { event : 'done' data : {} }type Bindings = { ACCESS_KEY : string }const app = new Hono <{ Bindings : Bindings }>() app.get ('/' , (c ) => c.text ('Hello, World!' )) app.post ('/' , async (c) => { const request = await c.req .json () const authHeader = c.req .header ().authorization if (authHeader !== `Bearer ${c.env.ACCESS_KEY} ` ) { return c.text ('Unauthorized' , 401 ) } const { type } = request function handleSettings (c: Context ) { return c.json ({ server_bot_dependencies : { 'GPT-4o' : 1 , }, introduction_message : 'Hello, I am a server bot.' , } as SettingsResponse ) } function handleQuery (c: Context<{ Bindings: Bindings }> ) { return streamSSE (c, async (stream) => { const writeSSE = ( message: | MetaMessage | TextMessage | ReplaceMessage | JsonMessage | SuggestedReplyMessage | ErrorMessage | DoneMessage, ) => { stream.writeSSE ({ event : message.event , data : JSON .stringify (message.data ), }) } writeSSE ({ event : 'meta' , data : { content_type : 'text/markdown' , suggested_replies : true }, }) for await (const chunk of requestStream ( request, 'GPT-4o' , c.env .ACCESS_KEY , )) { if (chunk.event === 'text' ) { writeSSE (chunk) } } writeSSE ({ event : 'done' , data : {} }) }) } switch (type ) { case 'query' : return handleQuery (c) case 'settings' : return handleSettings (c) default : throw new Error ('Invalid request type' ) } })function sseTransformStream ( ) { let buffer = '' return new TransformStream < string , { event : 'text' data : any } >({ transform (chunk: string , controller: TransformStreamDefaultController ) { buffer += chunk const textRegexp = /^event: text[\r\n]+data: ({[\s\S]*?})/ const doneRegexp = /^event: done[\r\n]+data: ({})/ while (buffer) { buffer = buffer.trimStart () if (textRegexp.test (buffer)) { const match = buffer.match (textRegexp) if (match) { controller.enqueue ({ event : 'text' , data : JSON .parse (match[1 ]), }) buffer = buffer.replace (match[0 ], '' ).trimStart () } } else if (doneRegexp.test (buffer)) { const match = buffer.match (doneRegexp) if (match) { buffer = buffer.replace (match[0 ], '' ).trimStart () controller.terminate () return } } else if (/^ping$/m .test (buffer)) { const match = buffer.match (/^ping$/m ) if (match) { buffer = buffer.replace (match[0 ], '' ).trimStart () } } else if (buffer.trim () === ': ping' ) { buffer = buffer.replace (': ping' , '' ).trimStart () } else { return } } }, flush ( ) { if (buffer.trim ()) { console .warn ('Unprocessed data in buffer:' , buffer) } }, }) }async function * requestStream ( request : QueryRequest , botName : string , accessKey : string , ): AsyncGenerator <{ event : 'text' data : any }> { const response = await fetch (`https://api.poe.com/bot/${botName} ` , { method : 'post' , headers : { 'Content-Type' : 'application/json' , Authorization : `Bearer ${accessKey} ` , Accept : 'text/event-stream' , 'Cache-Control' : 'no-cache' , }, body : JSON .stringify (request), }) if (!response.ok || !response.body ) { console .error (response.statusText ) throw new Error (`HTTP error! status: ${response.status} ` ) } const reader = response .body !.pipeThrough (new TextDecoderStream ()) .pipeThrough (sseTransformStream ()) .getReader () let chunk = await reader.read () while (!chunk.done ) { yield chunk.value chunk = await reader.read () } }async function syncBotSettings ( botName: string , accessKey: string = '' , ): Promise <void > { const PROTOCOL_VERSION = '1.0' const baseUrl = 'https://api.poe.com/bot/fetch_settings' const resp = await fetch ( `${baseUrl} /${botName} /${accessKey} /${PROTOCOL_VERSION} ` , { method : 'post' }, ) const text = await resp.text () if (!resp.ok ) { throw new Error (`Error fetching settings for bot ${botName} : ${text} ` ) } console .log (text) } app.get ('/sync-bot-settings' , async (c) => { await syncBotSettings ('BotT6R4NKNGZ9' , c.env .ACCESS_KEY ) return c.text ('Synced' ) })export default app
现在,重新发布至 Cloudflare Workers,然后就可以向这个 Bot 聊天,并在服务端调用 GPT-4o 模型。
总结 上面只是一个非常简单的 demo,Poe Server Bot 实际上还可以做很多事情,但对 JavaScript 缺乏官方支持,让想要尝试变得比较麻烦。吾辈发布了一个 npm 模块 fastapi-poe ,来尝试像官方的 python 模块 fastapi_poe 一样使用。