使用 JavaScript 创建 PoeAI 的 服务端 bot

本文最后更新于:2024年8月21日 下午

背景

Poe 是一个 AI 聊天机器人,它支持多种 AI 模型,包括 GPT-4o、Claude 3.5 Sonnet、Gemini Pro 等。还支持各种类型的 Bot,其中 Server Bot 是最自由的,可以自己编写 Bot 的逻辑。但是,Poe 的 Server Bot 官方仅支持 Python,而吾辈更喜欢 JavaScript,所以研究了一下怎么实现。

初始化项目

一开始使用 express 实现服务端,但后面发现 express 无法部署到 edge runtime,例如 Cloudflare Workers,所以改用 hono.js 实现。

首先使用 hono.js 创建一个项目并选择 cloudflare-workers 模板

1
2
3
pnpm create hono@latest
? Target directory hono-demo
? Which template do you want to use? cloudflare-workers

src/index.ts 是项目入口,内容如下

1
2
3
4
5
6
7
import { Hono } from 'hono'

const app = new Hono()

app.get('/', (c) => c.text('Hello, world!'))

export default app

首先在 Poe 网站上创建 Server Bot,得到一个 NameAccess Key

1724235940301.jpg

协议分析

根据 Poe 协议规范,实现一个 Bot 需要实现一个特定的 post 请求,具体如下

API 接口传入的参数会有两个固定字段,type 是请求类型,version 是协议版本。

1
2
3
4
type BotRequest = {
version: string
type: 'query' | 'settings' | 'report_feedback' | 'report_error'
}

其中 query 和 settings 是必须实现的,report_feedback 和 report_error 是可选的。

下面来实现一个基本的 post 请求结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 其他代码...

app.post('/', async (c) => {
const request = await c.req.json()
const { version, type } = request
function handleQuery(c: Context) {
throw new Error('Not implemented')
}
function handleSettings(c: Context) {
throw new Error('Not implemented')
}

switch (type) {
case 'query':
return handleQuery(c)
case 'settings':
return handleSettings(c)
default:
throw new Error('Invalid request type')
}
})

Settings 请求

Settings 请求没有额外的参数,仅要求返回这个 bot 相关的一些设置。

1
2
3
4
5
6
7
8
9
interface SettingsResponse {
server_bot_dependencies?: Record<string, number> // 声明依赖的其他 bot
allow_attachments?: boolean // 是否允许附件
introduction_message?: string // 初始化消息
expand_text_attachments?: boolean // 是否扩展文本附件
enable_image_comprehension?: boolean // 是否启用图像理解,如果启用,则图片会被 Poe 解析为文本传给当前 Bot
enforce_author_role_alternation?: boolean // 是否强制交替用户/机器人角色
enable_multi_bot_chat_prompting?: boolean
}

例如,实现上面的 settings 请求,这是一个简单的响应

1
2
3
4
5
6
7
8
function handleSettings(c: Context) {
return c.json({
server_bot_dependencies: {
'GPT-4o': 1,
},
introduction_message: 'Hello, I am a server bot.',
})
}

参考 https://creator.poe.com/docs/poe-protocol-specification#settings

Query 请求

query 是关键部分,不管是请求还是响应都很复杂。

下面是请求的类型定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type Identifier = string // Matches the identifier format described in the spec
type ContentType = 'text/markdown' | 'text/plain'
type FeedbackType = 'like' | 'dislike'
interface MessageFeedback {
type: FeedbackType
reason?: string
}
interface Attachment {
url: string
content_type: string
name: string
parsed_content?: string
}
interface ProtocolMessage {
role: 'system' | 'user' | 'bot'
content: string
content_type: ContentType
timestamp: number
message_id: Identifier
feedback: MessageFeedback[]
attachments: Attachment[]
}
interface QueryRequest {
query: ProtocolMessage[]
user_id: Identifier
conversation_id: Identifier
message_id: Identifier
access_key: string
temperature?: number
skip_system_prompt?: boolean
logit_bias?: Record<string, number>
stop_sequences?: string[]
language_code?: string
}

响应要求返回 SSE 流式响应多条消息,具体也有很多类型

meta 类型,应该返回的第一条消息,主要是用来声明一些设置

1
2
3
4
5
6
7
interface MetaMessage {
event: 'meta'
data: {
content_type?: 'text/markdown' | 'text/plain' // 内容类型,默认为 text/markdown
suggested_replies?: boolean // Poe 是否显示建议的回复,默认为 false
}
}

接下来是两种消息类型,区别只在于是否替换之前已经发送的消息

1
2
3
4
5
6
7
8
9
10
11
12
interface TextMessage {
event: 'text'
data: {
text: string
}
}
interface ReplaceMessage {
event: 'replace_response'
data: {
text: string
}
}

还有两种特殊格式的消息,一种是用来返回 JSON 数据(通常给 OpenAI 这种支持函数调用的 Bot 使用),另一种是建议回复的消息,这会出现在回复消息的下方。

1724233538152.jpg

1
2
3
4
5
6
7
8
9
10
interface JsonMessage {
event: 'json'
data: Record<string, any>
}
interface SuggestedReplyMessage {
event: 'suggested_reply'
data: {
text: string
}
}

最后是结束和错误消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
interface ErrorMessage {
event: 'error'
data: {
allow_retry: boolean
text: string
raw_response: string
error_type: string
}
}
interface DoneMessage {
event: 'done'
data: {}
}

现在来实现一个简单的 query 请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
function handleQuery(c: Context) {
return streamSSE(c, async (stream) => {
const writeSSE = (
message:
| MetaMessage
| TextMessage
| ReplaceMessage
| JsonMessage
| SuggestedReplyMessage
| ErrorMessage
| DoneMessage,
) => {
stream.writeSSE({
event: message.event,
data: JSON.stringify(message.data),
})
}
writeSSE({
event: 'meta',
data: { content_type: 'text/markdown', suggested_replies: true },
})
writeSSE({ event: 'text', data: { text: 'Hello, World!' } })
writeSSE({ event: 'done', data: {} })
})
}

参考:https://creator.poe.com/docs/poe-protocol-specification#query

发布到 Cloudflare Workers

现在发布到 Cloudflare Workers 上,得到一个 URL,例如 https://xxx.workers.dev

1
pnpm run deploy

现在在 Poe 网站上填写 Server URL,然后点击 Run check,如果成功,继续创建 Bot 就可以在 Poe 上使用了。

1724236097284.jpg
1724236179345.jpg

验证请求

根据 Poe 官方的建议,还应该为 post 请求添加验证,确定是来自 Poe 的请求。首先添加环境变量

1
2
3
ACCESS_KEY="<YOUR_ACCESS_KEY>"
echo ACCESS_KEY=\"$ACCESS_KEY\" > .dev.vars # 在本地添加
echo $ACCESS_KEY | pnpm wrangler secret put ACCESS_KEY # 在生产环境中添加

然后在 src/index.ts 中添加验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Bindings = {
ACCESS_KEY: string
}
// 声明环境变量绑定
const app = new Hono<{ Bindings: Bindings }>()

app.post('/', async (c) => {
const request = await c.req.json()
const authHeader = c.req.header().authorization
if (authHeader !== `Bearer ${c.env.ACCESS_KEY}`) {
return c.text('Unauthorized', 401)
}
// 其他代码...
})

主动调用

Bot 除了可以被 Poe 调用,也可以主动调用 Poe 的 API 来实现一些功能,下面介绍其中两个。

刷新 Bot Settings

修改 Bot Settings 的实现后,还需要主动通知 Poe 调用接口刷新设置。

例如修改了 handleSettings 函数,更新了 server_bot_dependencies,不再使用 GPT-4o,而是使用 Claude-3.5-Sonnet

1
2
3
4
5
6
7
function handleSettings(c: Context) {
return c.json({
server_bot_dependencies: {
'Claude-3.5-Sonnet': 1,
},
})
}

然后在项目初始化的时候主动通知 Poe 刷新设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async function syncBotSettings(
botName: string,
accessKey: string = '',
): Promise<void> {
const PROTOCOL_VERSION = '1.0'
const baseUrl = 'https://api.poe.com/bot/fetch_settings'
const resp = await fetch(
`${baseUrl}/${botName}/${accessKey}/${PROTOCOL_VERSION}`,
{ method: 'post' },
)
const text = await resp.text()
if (!resp.ok) {
throw new Error(`Error fetching settings for bot ${botName}: ${text}`)
}
console.log(text)
}

app.get('/sync-bot-settings', async (c) => {
await syncBotSettings('BotT6R4NKNGZ9', c.env.ACCESS_KEY)
return c.text('Synced')
})

然后在浏览器中直接访问这个 URL 就可以通知 Poe 刷新设置了。

参考:https://creator.poe.com/docs/server-bots-functional-guides#3-make-a-post-request-to-poes-refetch-settings-endpoint-with-your-bot-name-and-access-key

调用其他 Bot

接下来,说明如何调用第三方的 Bot,这里仅以文本 => 文本的 Bot 为例(除此之外,现在的 Bot 还支持附件文件、图片、音视频等)。遗憾的是,官方文档没有记录 API 接口,只说明了使用 python 模块 fastapi_poe 来实现,所以只能分析 fastapi_poe 的源码来实现。

在其中可以找到关键接口 https://api.poe.com/bot/<botName>,然后接口会以 SSE 流式响应多条消息,先做个测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
async function* requestStream(
request: QueryRequest,
botName: string,
accessKey: string,
): AsyncGenerator<{
event: 'text'
data: any
}> {
// region 解析 SSE 流
const response = await fetch(`https://api.poe.com/bot/${botName}`, {
method: 'post',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${accessKey}`,
Accept: 'text/event-stream',
'Cache-Control': 'no-cache',
},
body: JSON.stringify(request),
})
if (!response.ok || !response.body) {
console.error(response.statusText)
throw new Error(`HTTP error! status: ${response.status}`)
}
const reader = response.body!.pipeThrough(new TextDecoderStream()).getReader()
let chunk = await reader.read()
while (!chunk.done) {
console.log('chunk: ', chunk.value)
chunk = await reader.read()
}
// endregion
}

function handleQuery(c: Context<{ Bindings: Bindings }>) {
return streamSSE(c, async (stream) => {
const writeSSE = (
message:
| MetaMessage
| TextMessage
| ReplaceMessage
| JsonMessage
| SuggestedReplyMessage
| ErrorMessage
| DoneMessage,
) => {
stream.writeSSE({
event: message.event,
data: JSON.stringify(message.data),
})
}
writeSSE({
event: 'meta',
data: { content_type: 'text/markdown', suggested_replies: true },
})
for await (const chunk of requestStream(
request,
'GPT-4o',
c.env.ACCESS_KEY,
)) {
if (chunk.event === 'text') {
writeSSE(chunk)
}
}
writeSSE({ event: 'done', data: {} })
})
}

终端打印的结果。可以消息分为三种:

  • text: 普通文本消息,但可能会被拆分成多个 chunk 返回
  • done: 结束消息
  • 空消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
chunk:  event: text
data: {"text":
chunk: ""}

event: text
data: {"text": "Hello"}
chunk: event: text
data: {"text":
chunk: "!"}

event: text
data: {"text": " How"}
chunk: event: text
data: {"text":
chunk: " can"}

event: text
data: {"text": " I"}
chunk: event: text
data: {"text":
chunk: " assist"}

event: text
data: {"text": " you"}
chunk: event: text
data: {"text":
chunk: " today"}

event: text
data: {"text": "?"}
chunk: event: text
data: {"text":
chunk: ""}
chunk: event: done
data: {}
chunk:

因此需要实现一个 TransformStream 来将 SSE 文本流转换为结构化的数据,并处理 text 多条消息合并。实现本身并不复杂,但主要的问题是多个模型拆分规则可能规则会不一致,例如 GPT-4o chunk 中可能包含一条完整消息,也可能不包含,而 Claude 3.5 Sonnet 中则总是由两个 chunk 组成一条完整消息。还有一些模型会返回 ping 消息,而且 ping 消息的格式也略有不同,像是 ping: ping 等。

下面是 GPT-4o 的消息示例

1
2
3
4
;[
'event: text\r\ndata: {"text": ',
'""}\r\n\r\nevent: text\r\ndata: {"text": "Hi"}\r\n\r\n',
]

Claude 3.5 Sonnet 的消息示例

1
;['event: text\r\ndata: {"text": ', '"Hello"}\r\n\r\n']

所以实现的 TransformStream 如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
function sseTransformStream() {
let buffer = ''
return new TransformStream<
string,
{
event: 'text'
data: any
}
>({
transform(chunk: string, controller: TransformStreamDefaultController) {
buffer += chunk
const textRegexp = /^event: text[\r\n]+data: ({[\s\S]*?})/
const doneRegexp = /^event: done[\r\n]+data: ({})/
while (buffer) {
// console.log(
// !/^ping$/m.test(buffer) &&
// !doneRegexp.test(buffer) &&
// !textRegexp.test(buffer) &&
// buffer !== '' &&
// buffer !== 'event: text\r\ndata: {"text": ' &&
// buffer.trim() !== ': ping',
// )
buffer = buffer.trimStart()
if (textRegexp.test(buffer)) {
const match = buffer.match(textRegexp)
if (match) {
controller.enqueue({
event: 'text',
data: JSON.parse(match[1]),
})
buffer = buffer.replace(match[0], '').trimStart()
}
} else if (doneRegexp.test(buffer)) {
const match = buffer.match(doneRegexp)
if (match) {
// controller.enqueue({
// event: 'done',
// data: JSON.parse(match[1]),
// })
buffer = buffer.replace(match[0], '').trimStart()
controller.terminate()
return
}
// ignore ping
} else if (/^ping$/m.test(buffer)) {
const match = buffer.match(/^ping$/m)
if (match) {
buffer = buffer.replace(match[0], '').trimStart()
}
} else if (buffer.trim() === ': ping') {
buffer = buffer.replace(': ping', '').trimStart()
} else {
return
}
}
},
flush() {
if (buffer.trim()) {
console.warn('Unprocessed data in buffer:', buffer)
}
},
})
}

修改 requestStream 来使用这个 TransformStream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async function* requestStream(
request: QueryRequest,
botName: string,
accessKey: string,
): AsyncGenerator<string> {
// 其他代码...
const reader = response
.body!.pipeThrough(new TextDecoderStream())
.pipeThrough(sseTransformStream())
.getReader()
let chunk = await reader.read()
while (!chunk.done) {
yield chunk.value
chunk = await reader.read()
}
}

完整代码

完整代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import { Context, Hono } from 'hono'
import { streamSSE } from 'hono/streaming'

interface SettingsResponse {
server_bot_dependencies?: Record<string, number> // 声明依赖的其他 bot
allow_attachments?: boolean // 是否允许附件
introduction_message?: string // 初始化消息
expand_text_attachments?: boolean // 是否扩展文本附件
enable_image_comprehension?: boolean // 是否启用图像理解,如果启用,则图片会被 Poe 解析为文本传给当前 Bot
enforce_author_role_alternation?: boolean // 是否强制交替用户/机器人角色
enable_multi_bot_chat_prompting?: boolean
}

type Identifier = string // Matches the identifier format described in the spec
type ContentType = 'text/markdown' | 'text/plain'
type FeedbackType = 'like' | 'dislike'
interface MessageFeedback {
type: FeedbackType
reason?: string
}
interface Attachment {
url: string
content_type: string
name: string
parsed_content?: string
}
interface ProtocolMessage {
role: 'system' | 'user' | 'bot'
content: string
content_type: ContentType
timestamp: number
message_id: Identifier
feedback: MessageFeedback[]
attachments: Attachment[]
}
interface QueryRequest {
query: ProtocolMessage[]
user_id: Identifier
conversation_id: Identifier
message_id: Identifier
access_key: string
temperature?: number
skip_system_prompt?: boolean
logit_bias?: Record<string, number>
stop_sequences?: string[]
language_code?: string
}
interface MetaMessage {
event: 'meta'
data: {
content_type?: 'text/markdown' | 'text/plain' // 内容类型,默认为 text/markdown
suggested_replies?: boolean // Poe 是否显示建议的回复,默认为 false
}
}
interface TextMessage {
event: 'text'
data: {
text: string
}
}
interface ReplaceMessage {
event: 'replace_response'
data: {
text: string
}
}
interface JsonMessage {
event: 'json'
data: Record<string, any>
}
interface SuggestedReplyMessage {
event: 'suggested_reply'
data: {
text: string
}
}
interface ErrorMessage {
event: 'error'
data: {
allow_retry: boolean
text: string
raw_response: string
error_type: string
}
}
interface DoneMessage {
event: 'done'
data: {}
}

type Bindings = {
ACCESS_KEY: string
}

const app = new Hono<{ Bindings: Bindings }>()

app.get('/', (c) => c.text('Hello, World!'))
app.post('/', async (c) => {
const request = await c.req.json()
const authHeader = c.req.header().authorization
if (authHeader !== `Bearer ${c.env.ACCESS_KEY}`) {
return c.text('Unauthorized', 401)
}
const { type } = request
function handleSettings(c: Context) {
return c.json({
server_bot_dependencies: {
'GPT-4o': 1,
},
introduction_message: 'Hello, I am a server bot.',
} as SettingsResponse)
}
function handleQuery(c: Context<{ Bindings: Bindings }>) {
return streamSSE(c, async (stream) => {
const writeSSE = (
message:
| MetaMessage
| TextMessage
| ReplaceMessage
| JsonMessage
| SuggestedReplyMessage
| ErrorMessage
| DoneMessage,
) => {
stream.writeSSE({
event: message.event,
data: JSON.stringify(message.data),
})
}
writeSSE({
event: 'meta',
data: { content_type: 'text/markdown', suggested_replies: true },
})
for await (const chunk of requestStream(
request,
'GPT-4o',
c.env.ACCESS_KEY,
)) {
if (chunk.event === 'text') {
writeSSE(chunk)
}
}
// writeSSE({ event: 'text', data: { text: 'Hello, World!' } })
writeSSE({ event: 'done', data: {} })
})
}
switch (type) {
case 'query':
return handleQuery(c)
case 'settings':
return handleSettings(c)
default:
throw new Error('Invalid request type')
}
})

function sseTransformStream() {
let buffer = ''
return new TransformStream<
string,
{
event: 'text'
data: any
}
>({
transform(chunk: string, controller: TransformStreamDefaultController) {
buffer += chunk
const textRegexp = /^event: text[\r\n]+data: ({[\s\S]*?})/
const doneRegexp = /^event: done[\r\n]+data: ({})/
while (buffer) {
// console.log(
// !/^ping$/m.test(buffer) &&
// !doneRegexp.test(buffer) &&
// !textRegexp.test(buffer) &&
// buffer !== '' &&
// buffer !== 'event: text\r\ndata: {"text": ' &&
// buffer.trim() !== ': ping',
// )
buffer = buffer.trimStart()
if (textRegexp.test(buffer)) {
const match = buffer.match(textRegexp)
if (match) {
controller.enqueue({
event: 'text',
data: JSON.parse(match[1]),
})
buffer = buffer.replace(match[0], '').trimStart()
}
} else if (doneRegexp.test(buffer)) {
const match = buffer.match(doneRegexp)
if (match) {
// controller.enqueue({
// event: 'done',
// data: JSON.parse(match[1]),
// })
buffer = buffer.replace(match[0], '').trimStart()
controller.terminate()
return
}
// ignore ping
} else if (/^ping$/m.test(buffer)) {
const match = buffer.match(/^ping$/m)
if (match) {
buffer = buffer.replace(match[0], '').trimStart()
}
} else if (buffer.trim() === ': ping') {
buffer = buffer.replace(': ping', '').trimStart()
} else {
return
}
}
},
flush() {
if (buffer.trim()) {
console.warn('Unprocessed data in buffer:', buffer)
}
},
})
}

async function* requestStream(
request: QueryRequest,
botName: string,
accessKey: string,
): AsyncGenerator<{
event: 'text'
data: any
}> {
const response = await fetch(`https://api.poe.com/bot/${botName}`, {
method: 'post',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${accessKey}`,
Accept: 'text/event-stream',
'Cache-Control': 'no-cache',
},
body: JSON.stringify(request),
})
if (!response.ok || !response.body) {
console.error(response.statusText)
throw new Error(`HTTP error! status: ${response.status}`)
}
const reader = response
.body!.pipeThrough(new TextDecoderStream())
.pipeThrough(sseTransformStream())
.getReader()
let chunk = await reader.read()
while (!chunk.done) {
yield chunk.value
chunk = await reader.read()
}
}

async function syncBotSettings(
botName: string,
accessKey: string = '',
): Promise<void> {
const PROTOCOL_VERSION = '1.0'
const baseUrl = 'https://api.poe.com/bot/fetch_settings'
const resp = await fetch(
`${baseUrl}/${botName}/${accessKey}/${PROTOCOL_VERSION}`,
{ method: 'post' },
)
const text = await resp.text()
if (!resp.ok) {
throw new Error(`Error fetching settings for bot ${botName}: ${text}`)
}
console.log(text)
}

app.get('/sync-bot-settings', async (c) => {
await syncBotSettings('BotT6R4NKNGZ9', c.env.ACCESS_KEY)
return c.text('Synced')
})

export default app

现在,重新发布至 Cloudflare Workers,然后就可以向这个 Bot 聊天,并在服务端调用 GPT-4o 模型。

1724255995710.jpg

总结

上面只是一个非常简单的 demo,Poe Server Bot 实际上还可以做很多事情,但对 JavaScript 缺乏官方支持,让想要尝试变得比较麻烦。吾辈发布了一个 npm 模块 fastapi-poe,来尝试像官方的 python 模块 fastapi_poe 一样使用。


使用 JavaScript 创建 PoeAI 的 服务端 bot
https://blog.rxliuli.com/p/958e1a550f804fb8b755040bbd040bd2/
作者
rxliuli
发布于
2024年8月19日
许可协议