继上一篇 基于 GPUI 实现 WebSocket 服务端之 UI 篇, 这篇聚焦于如何实现 WebSocket 服务和 GPUI 的交互。
解耦
通过 event bus 来解耦 UI 和 服务
交互流程图
简单地说:
UI向服务发送命令send_command,服务接收到命令然后做对应的处理。服务向UI发送事件emit_event,UI接收到事件然后做对应的处理(更新 UI)

目录结构
├─src
│ event_bus.rs
│ lib.rs
│ log.rs
│ main.rs
│ ws_server.rs
│ ws_ui.rs
│
event_bus.rs
use tokio::sync::mpsc;
#[derive(Clone, Debug)]
pub struct ClientInfo {
pub id: String,
pub addr: String,
}
#[derive(Clone, Debug)]
pub enum ServerCommand {
Start,
Stop,
SendToClient { client_id: String, message: String },
DisconnectClient { client_id: String },
Broadcast { message: String },
}
#[derive(Clone, Debug)]
pub enum ServerEvent {
Started,
Stopped,
Error(String),
}
#[derive(Clone)]
pub struct EventBus {
pub command_tx: mpsc::UnboundedSender<ServerCommand>,
pub event_tx: mpsc::UnboundedSender<ServerEvent>,
}
impl EventBus {
pub fn new() -> (
Self,
mpsc::UnboundedReceiver<ServerEvent>,
mpsc::UnboundedReceiver<ServerCommand>,
) {
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (command_tx, command_rx) = mpsc::unbounded_channel();
(
Self {
event_tx,
command_tx,
},
event_rx,
command_rx,
)
}
pub fn emit_event(&self, event: ServerEvent) {
let _ = self.event_tx.send(event);
}
pub fn send_command(&self, cmd: ServerCommand) {
let _ = self.command_tx.send(cmd);
}
}
lib.rs
导出各个模块
pub mod event_bus;
pub mod log;
pub mod ws_server;
pub mod ws_ui;
log.rs
自定义日志格式
use std::fs::OpenOptions;
use log::LevelFilter;
use simplelog::{
ColorChoice, CombinedLogger, Config, ConfigBuilder, TermLogger, TerminalMode, WriteLogger,
format_description,
};
pub fn init_logger(log_file_name: &str) {
let mut config = ConfigBuilder::new();
config.set_time_format_custom(format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second][offset_hour sign:mandatory]:[offset_minute]"
));
CombinedLogger::init(vec![
TermLogger::new(
LevelFilter::Warn,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Auto,
),
WriteLogger::new(
LevelFilter::Info,
config.build(),
OpenOptions::new()
.create(true)
.append(true)
.open(log_file_name)
.unwrap(),
),
])
.unwrap();
}
main.rs
程序入口
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
use gpui::{App, AppContext, Application, Bounds, WindowBounds, WindowOptions, px, size};
use gpui_tokio_bridge::{Tokio, init};
use ws_gpui::{
event_bus::EventBus, log::init_logger, ws_server::WebSocketServer, ws_ui::WebSocketUi,
};
fn on_finish_launching(cx: &mut App) {
init(cx); // 非常重要,留到后面介绍。
let (event_bus, mut event_rx, command_rx) = EventBus::new();
// 创建 WebSocket 服务
let ws_service = WebSocketServer::new(event_bus.clone());
let bounds = Bounds::centered(None, size(px(500.), px(250.0)), cx);
let window_handle = cx
.open_window(
WindowOptions {
window_bounds: Some(WindowBounds::Windowed(bounds)),
..Default::default()
},
move |_win, cx| {
cx.new(|cx| {
// 在 tokio 任务处理来自 UI 的命令
Tokio::spawn(cx, async move {
ws_service.start(command_rx).await;
})
.detach();
WebSocketUi::new(event_bus)
})
},
)
.unwrap();
// 在独立的任务处理来自服务的 事件
cx.spawn(async move |async_cx| {
while let Some(event) = event_rx.recv().await {
let event_clone = event.clone();
// 异步更新 UI,避免阻塞
let _ = window_handle.update(async_cx, |client, _win, cx| {
client.handle_event(event_clone, cx);
});
}
})
.detach();
}
fn main() {
init_logger("ws-gui.log"); // 初始化日志
let app = Application::new();
app.run(on_finish_launching);
}
ws_server.rs
基于 tokio_tungstenite 实现 ws 服务,写得不够简洁,有时间可以再优化一下。
use crate::event_bus::{ClientInfo, EventBus, ServerCommand, ServerEvent};
use futures_util::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{RwLock, broadcast, mpsc};
use tokio_tungstenite::accept_async;
use uuid::Uuid;
type ClientSender = mpsc::UnboundedSender<String>;
pub struct WebSocketServer {
clients: Arc<RwLock<HashMap<String, ClientSender>>>,
event_bus: EventBus,
}
impl WebSocketServer {
pub fn new(event_bus: EventBus) -> Self {
Self {
clients: Arc::new(RwLock::new(HashMap::new())),
event_bus,
}
}
// 新增一个带关闭信号的客户端处理函数
async fn handle_client_with_shutdown(
stream: tokio::net::TcpStream,
addr: std::net::SocketAddr,
clients: Arc<RwLock<HashMap<String, ClientSender>>>,
event_tx: mpsc::UnboundedSender<ServerEvent>,
mut shutdown_rx: broadcast::Receiver<()>,
) {
let client_id = Uuid::new_v4().to_string();
let client_info = ClientInfo {
id: client_id.clone(),
addr: addr.to_string(),
};
let ws_stream = match accept_async(stream).await {
Ok(ws) => ws,
Err(e) => {
println!("WebSocket 握手失败: {}", e);
return;
}
};
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
clients.write().await.insert(client_id.clone(), tx);
let _ = event_tx.send(ServerEvent::ClientConnected(client_info));
let client_id_for_log = client_id.clone();
// 使用原子布尔值来标记服务器是否正在关闭
let server_is_shutting_down = Arc::new(std::sync::atomic::AtomicBool::new(false));
// 启动消息发送任务,同时监听服务器关闭信号
let server_is_shutting_down_clone = server_is_shutting_down.clone();
tokio::spawn(async move {
loop {
tokio::select! {
msg = rx.recv() => {
match msg {
Some(msg_content) => {
// 检查是否是特殊的关闭消息
if msg_content == "CLOSE" {
if ws_sender
.send(tokio_tungstenite::tungstenite::Message::Close(None))
.await
.is_err()
{
break;
}
} else {
if ws_sender
.send(tokio_tungstenite::tungstenite::Message::Text(msg_content.into()))
.await
.is_err()
{
break;
}
}
}
None => break, // 通道已关闭
}
}
res = shutdown_rx.recv() => {
// 检查是否收到关闭信号或发送方已关闭
match res {
Ok(()) => {
// 收到服务器关闭信号,设置关闭标志
server_is_shutting_down_clone.store(true, std::sync::atomic::Ordering::Relaxed);
// 发送关闭消息并退出
if ws_sender
.send(tokio_tungstenite::tungstenite::Message::Close(None))
.await
.is_ok() {
// 等待一小段时间让消息发送完成
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
Err(broadcast::error::RecvError::Closed) => {
// 发送方已关闭,同样需要设置关闭标志
server_is_shutting_down_clone.store(true, std::sync::atomic::Ordering::Relaxed);
if ws_sender
.send(tokio_tungstenite::tungstenite::Message::Close(None))
.await
.is_ok() {
// 等待一小段时间让消息发送完成
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
Err(broadcast::error::RecvError::Lagged(_)) => {
// 一些消息滞后了,但我们仍然认为应该设置关闭标志
server_is_shutting_down_clone.store(true, std::sync::atomic::Ordering::Relaxed);
if ws_sender
.send(tokio_tungstenite::tungstenite::Message::Close(None))
.await
.is_ok() {
// 等待一小段时间让消息发送完成
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
break;
}
}
}
});
while let Some(msg) = ws_receiver.next().await {
match msg {
Ok(tokio_tungstenite::tungstenite::Message::Text(text)) => {
println!("收到来自 {} 的消息: {}", client_id_for_log, text);
}
Ok(tokio_tungstenite::tungstenite::Message::Close(_)) => {
println!("客户端 {} 关闭连接", client_id_for_log);
break;
}
Err(_) => {
break;
}
_ => {}
}
}
// 检查服务器是否正在关闭,如果是,则不发送断开事件
let is_server_shutdown = server_is_shutting_down.load(std::sync::atomic::Ordering::Relaxed);
clients.write().await.remove(&client_id);
if !is_server_shutdown {
let _ = event_tx.send(ServerEvent::ClientDisconnected(client_id));
}
}
pub async fn start(&self, mut command_rx: mpsc::UnboundedReceiver<ServerCommand>) {
let mut server_handle: Option<tokio::task::JoinHandle<()>> = None;
let mut shutdown_tx: Option<tokio::sync::broadcast::Sender<()>> = None;
let clients = self.clients.clone();
let event_bus = self.event_bus.clone();
// 主命令处理循环
while let Some(cmd) = command_rx.recv().await {
match cmd {
ServerCommand::Start => {
if server_handle.is_none() {
// 创建停止通道 - 使用 broadcast 通道,可以被多个客户端订阅
let (server_shutdown_tx, _) = tokio::sync::broadcast::channel::<()>(100);
shutdown_tx = Some(server_shutdown_tx.clone());
// 克隆必要的变量供异步任务使用
let event_bus_clone = event_bus.clone();
let clients_clone = clients.clone();
// 启动服务器任务
server_handle = Some(tokio::spawn(async move {
// 绑定TCP监听器
let listener = match tokio::net::TcpListener::bind("127.0.0.1:9001")
.await
{
Ok(l) => l,
Err(e) => {
event_bus_clone
.emit_event(ServerEvent::Error(format!("启动失败: {}", e)));
println!("绑定地址失败: {}", e);
return;
}
};
event_bus_clone.emit_event(ServerEvent::Started);
println!("WebSocket 服务器已启动在 127.0.0.1:9001");
let mut server_shutdown_rx = server_shutdown_tx.subscribe(); // 为服务器主线程创建一个订阅
loop {
tokio::select! {
// 接受新连接
accept_result = listener.accept() => {
match accept_result {
Ok((stream, addr)) => {
let clients = clients_clone.clone();
let event_tx = event_bus_clone.event_tx.clone();
let shutdown_rx = server_shutdown_tx.subscribe(); // 为每个客户端创建一个订阅
tokio::spawn(async move {
WebSocketServer::handle_client_with_shutdown(stream, addr, clients, event_tx, shutdown_rx).await;
});
}
Err(e) => {
println!("接受连接失败: {}", e);
}
}
}
// 监听停止信号
_ = server_shutdown_rx.recv() => {
println!("服务器停止信号已接收");
break;
}
}
}
// 服务器停止时主动断开所有客户端连接 - 异步处理以避免阻塞
let clients_clone_for_cleanup = clients_clone.clone();
tokio::spawn(async move {
let clients_map = clients_clone.read().await;
let client_ids: Vec<String> = clients_map.keys().cloned().collect();
drop(clients_map); // 释放读锁
// 并发向所有客户端发送断开连接消息
let mut handles = Vec::new();
for client_id in client_ids {
let clients = clients_clone.clone();
let handle = tokio::spawn(async move {
let clients = clients.read().await;
if let Some(tx) = clients.get(&client_id) {
let _ = tx.send("CLOSE".to_string()); // 特殊消息表示关闭
}
drop(clients);
});
handles.push(handle);
}
// 等待所有发送任务完成
for handle in handles {
let _ = handle.await;
}
// 清理客户端列表
{
let mut clients = clients_clone_for_cleanup.write().await;
clients.clear();
}
});
// 添加短暂延迟以确保清理完成
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// 发送停止事件
event_bus_clone.emit_event(ServerEvent::Stopped);
println!("WebSocket 服务器已完全停止");
}));
}
}
ServerCommand::Stop => {
if let Some(handle) = server_handle.take() {
if let Some(tx) = shutdown_tx.take() {
let _ = tx.send(()); // 发送停止信号给所有订阅者
}
// 异步等待服务器任务结束,避免阻塞UI线程
tokio::spawn(async move {
let _ = handle.await;
});
} else {
// 如果服务器没有运行,也要发送停止事件
event_bus.emit_event(ServerEvent::Stopped);
}
}
ServerCommand::Broadcast { message } => {
if server_handle.is_some() {
// 仅当服务器正在运行时才处理命令
let clients = clients.clone();
let msg = message.clone();
tokio::spawn(async move {
let clients = clients.read().await;
for tx in clients.values() {
let _ = tx.send(msg.clone());
}
});
}
}
}
}
}
}
ws_ui.rs
UI 部分,完整代码可以参考 基于 GPUI 实现 WebSocket 服务端之 UI 篇
use gpui::{ClickEvent, CursorStyle, Window, div, prelude::*, rgb};
use crate::event_bus::{ClientInfo, EventBus, ServerCommand, ServerEvent};
pub struct WebSocketUi {
is_running: bool,
event_bus: EventBus,
clients: Vec<ClientInfo>,
}
impl WebSocketUi {
pub fn new(event_bus: EventBus) -> Self {
Self {
is_running: false,
event_bus,
clients: vec![],
}
}
fn start(
self: &mut WebSocketUi,
_evt: &ClickEvent,
_win: &mut Window,
_cx: &mut Context<Self>,
) {
if !self.is_running {
self.event_bus.send_command(ServerCommand::Start);
}
}
fn disconnect(
self: &mut WebSocketUi,
_evt: &ClickEvent,
_win: &mut Window,
_cx: &mut Context<Self>,
) {
if self.is_running {
self.event_bus.send_command(ServerCommand::Stop);
}
}
fn send_test_message(
self: &mut WebSocketUi,
_evt: &ClickEvent,
_win: &mut Window,
_cx: &mut Context<Self>,
) {
if self.is_running {
let message = "Hello, World from ws server!".into();
self.event_bus
.send_command(ServerCommand::Broadcast { message });
}
}
... 省略一些代码
pub fn handle_event(&mut self, event: ServerEvent, cx: &mut Context<Self>) {
match event {
ServerEvent::Started => {
self.is_running = true;
}
ServerEvent::Stopped => {
self.is_running = false;
self.clients.clear();
}
ServerEvent::ClientConnected(info) => {
self.clients.push(info);
}
ServerEvent::ClientDisconnected(id) => {
self.clients.retain(|c| c.id != id);
}
ServerEvent::Error(msg) => {
println!("错误: {}", msg);
}
}
cx.notify();
}
}
...省略一些代码
踩的坑
UI 没有响应
现象:此时,UI 已经没有响应。

根本原因是:存在两个异步运行时。
解决方法:引入 gpui-tokio-bridge,参考 gpui_tokio
复现:

测试
网页测试
