本文讨论的源代码发布在此存储库中
互联网上有很多例子可以让您在没有工具的情况下将 ChatGPT 3.5 连接到电报机器人。但是,当涉及大量用户时,没有跨多个进程进行负载均衡的示例:Internet 上的所有教程都运行具有单个副本的单体式
https://github.com/telegraf/telegraf/issues/423
另外,在使用 NodeJS 的实践中,我遇到了一个问题,即许多处于 pending 状态的 Promise 会随着垃圾回收器的负载而减慢应用程序的速度。通过添加第三方工具(允许 ChatGPT 通过查询数据库来调用外部函数),您至少需要考虑创建整体副本,以便膨胀等待数据库数据的队列
应用程序架构
对于负载均衡,最明显的工具是 Nginx 上游——该工具在端口 80 上接收新客户端的 WebSocket 连接,并根据副本数依次代理到 8081、8082、...、8085。如果客户端 15 分钟不活动,则连接中断,如果有新消息,将重新创建。
https://nginx.org/en/docs/http/ngx_http_upstream_module.html
副本将聊天历史记录存储在 Redis 中,即使消息正在由新进程处理,您也可以重新创建上下文。副本将由 PM2 创建——这是 NodeJS 堆栈应用程序最原生的方式
https://pm2.io/docs/plus/overview/
此外,使用 PM2,每月 40 美元,您可以通过 Slack / 电子邮件购买现成的事件通知,按标准指标进行服务器监控,例如 CPU:内核、硬件线程、虚拟线程、内存:容量、网络接口、存储设备:I/O、容量、响应时间等。这样可以在项目自我维持之前节省开发运营费用。
配置文件
为了不让开发人员的机器上为一个项目专门化 Linux,让我们将 Nginx 包装在 Docker 中。为此,让我们编写docker-compose.yaml
version: '3.8'
services:
nginx:
image: nginx:1.27.4
ports:
- "80:80"
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- ./config/nginx.conf:/etc/nginx/nginx.conf:ro
- ./logs/nginx:/var/log/nginx
并创建一个带有副本枚举的 Companion One./config/nginx.config
user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
use epoll;
multi_accept on;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
upstream local_websocket_servers {
server host.docker.internal:8081; # Using host.docker.internal from hosts shared from host machine
server host.docker.internal:8082;
server host.docker.internal:8083;
server host.docker.internal:8084;
server host.docker.internal:8085;
least_conn;
}
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 80;
server_name localhost;
location / {
proxy_pass http://local_websocket_servers$is_args$args;
# WebSocket-specific headers
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
# Preserve original headers and connection details
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Close upstream if client disconnects
proxy_ignore_client_abort on;
# Long-lived connection settings
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
# Buffer and performance settings
proxy_buffer_size 128k;
proxy_buffers 4 256k;
proxy_busy_buffers_size 256k;
}
}
}
要通过 pm2 运行副本,请创建一个 .该链接包含一个package.json,其中包含用于运行项目的脚本pm2.config.cjs
npm start
const path = require("path");
const os = require("os");
const dotenv = require('dotenv');
const readConfig = () => dotenv.parse("./.env");
const getPath = (unixPath) => {
return path.resolve(unixPath.replace('~', os.homedir()));
};
const createBun = (index) => ({
name: `bun-ws-${index}`,
script: "./src/server.ts",
interpreter: getPath("~/.bun/bin/bun"),
args: ["--server", `--port=808${index}`],
out_file: `./logs/pm2/bun-ws-${index}-out.log`,
error_file: `./logs/pm2/bun-ws-${index}-error.log`,
log_date_format: "YYYY-MM-DD HH:mm:ss",
merge_logs: true,
env: readConfig(),
});
module.exports = {
apps: [
/*
{
name: "bun-ws-1",
script: "./src/server.ts",
interpreter: getPath("~/.bun/bin/bun"),
args: ["--server", "--port=8081"],
out_file: "./logs/pm2/bun-ws-1-out.log",
error_file: "./logs/pm2/bun-ws-1-error.log",
log_date_format: "YYYY-MM-DD HH:mm:ss",
merge_logs: true,
},
*/
createBun(1),
createBun(2),
createBun(3),
createBun(4),
createBun(5),
]
}
如您所见,我们通过命令行参数传递用于编排的端口号。这是最方便的,因为文件保持静态而不更改运行时。为了启动项目,我们使用 Bun - NodeJS 的加速模拟,在速度上可与 Golang 相媲美.env
代理群
代理是 telegraf 中场景的类似物,telegraf 是一种使用隔离系统提示符的 LLM 模型。Swarm 中的当前代理可以通过函数调用来更改 - 类似于单击按钮后在 telegram bot 中浏览场景changeToAgent
import { Adapter, addAgent, addCompletion, addSwarm } from "agent-swarm-kit";
import { OpenAI } from "openai";
export const OPENAI_COMPLETION = addCompletion({
completionName: "openai_completion",
getCompletion: Adapter.fromOpenAI(new OpenAI({ apiKey: process.env.OPENAI_API_KEY }))
});
export const TEST_AGENT = addAgent({
docDescription: "This agent operates within the nginx-balancer-chat project as a test agent, utilizing the OpenaiCompletion to inform users about the actual server port of one of 5 chat instances running on different ports and upstreamed by Nginx to port 80, extracting the port details from the chat history’s system message.",
agentName: "test_agent",
completion: OPENAI_COMPLETION,
prompt: `You are a test agent for Nginx Upstream. Tell user the server port from the chat history (system message)`,
dependsOn: [],
});
export const TEST_SWARM = addSwarm({
docDescription: "This swarm serves as the core structure for the nginx-balancer-chat project, managing a single TestAgent as both the sole member and default agent to handle user interactions, leveraging the CohereCompletion to report the specific port of one of 5 upstreamed chat instances balanced by Nginx to port 80.",
swarmName: "test_swarm",
agentList: [TEST_AGENT],
defaultAgent: TEST_AGENT,
});
此示例中的代码经过编程,以便 LLM 模型告诉您 WebSocket 请求是从哪个端口代理的。相反,对于每个代理,您可以使用 repo 中的更多详细信息OPENAI_COMPLETION
LMStudio
Ollama
Cohere
import { Chat, getAgentName, Schema, History } from "agent-swarm-kit";
import type { ServerWebSocket } from "bun";
import { parseArgs } from "util";
import { TEST_SWARM } from "./lib/swarm";
declare function parseInt(value: unknown): number;
type WebSocketData = {
clientId: string;
};
const { values } = parseArgs({
args: process.argv,
options: {
server: {
type: "boolean",
},
port: {
type: "string",
},
},
strict: true,
allowPositionals: true,
});
History.useHistoryCallbacks({
getSystemPrompt: () => [
`The server port is ${SERVER_PORT}. Tell him that port ASAP`
]
});
const SERVER_PORT = parseInt(values.port);
if (isNaN(SERVER_PORT)) {
throw new Error(`Server port is not a number: ${values.port}`);
}
if (values.server) {
Bun.serve({
fetch(req, server) {
const clientId = new URL(req.url).searchParams.get("clientId")!;
if (!clientId) {
return new Response("Invalid clientId", { status: 500 });
}
console.log(`Connected clientId=${clientId} port=${SERVER_PORT}`);
server.upgrade(req, {
data: {
clientId,
},
});
},
websocket: {
async open(ws: ServerWebSocket) {
await Chat.beginChat(ws.data.clientId, TEST_SWARM);
await Schema.writeSessionMemory(ws.data.clientId, { port: SERVER_PORT });
},
async message(ws: ServerWebSocket, message: string) {
const answer = await Chat.sendMessage(ws.data.clientId, message, TEST_SWARM);
ws.send(
JSON.stringify({
data: answer,
agentName: await getAgentName(ws.data.clientId),
})
);
},
async close(ws: ServerWebSocket) {
console.log(`Disconnected clientId=${ws.data.clientId} port=${SERVER_PORT}`);
await Chat.dispose(ws.data.clientId, TEST_SWARM);
},
},
port: SERVER_PORT,
});
}
console.log(`Server listening http://localhost:${SERVER_PORT}`)
如果您有兴趣开发与多个代理的聊天,请查看文档