基于 GPUI 实现 WebScoket 服务端之服务篇

分类:Rust     发布时间:2026-03-28     最后更新:2026-03-28     浏览数:16
这篇文章详细讲解了通过 Event Bus 来解耦 WebSocket 服务和 GPUI 的交互,希望对你有帮助。

继上一篇 基于 GPUI 实现 WebSocket 服务端之 UI 篇, 这篇聚焦于如何实现 WebSocket 服务和 GPUI 的交互。

解耦

通过 event bus 来解耦 UI服务

交互流程图

简单地说:

  1. UI服务发送命令 send_command服务接收到命令然后做对应的处理。
  2. 服务UI 发送事件 emit_eventUI 接收到事件然后做对应的处理(更新 UI)

目录结构

├─src
│      event_bus.rs
│      lib.rs
│      log.rs
│      main.rs
│      ws_server.rs
│      ws_ui.rs
│

event_bus.rs

use tokio::sync::mpsc;

#[derive(Clone, Debug)]
pub struct ClientInfo {
    pub id: String,
    pub addr: String,
}

#[derive(Clone, Debug)]
pub enum ServerCommand {
    Start,
    Stop,
    SendToClient { client_id: String, message: String },
    DisconnectClient { client_id: String },
    Broadcast { message: String },
}

#[derive(Clone, Debug)]
pub enum ServerEvent {
    Started,
    Stopped,
    Error(String),
}

#[derive(Clone)]
pub struct EventBus {
    pub command_tx: mpsc::UnboundedSender<ServerCommand>,
    pub event_tx: mpsc::UnboundedSender<ServerEvent>,
}

impl EventBus {
    pub fn new() -> (
        Self,
        mpsc::UnboundedReceiver<ServerEvent>,
        mpsc::UnboundedReceiver<ServerCommand>,
    ) {
        let (event_tx, event_rx) = mpsc::unbounded_channel();
        let (command_tx, command_rx) = mpsc::unbounded_channel();

        (
            Self {
                event_tx,
                command_tx,
            },
            event_rx,
            command_rx,
        )
    }

    pub fn emit_event(&self, event: ServerEvent) {
        let _ = self.event_tx.send(event);
    }

    pub fn send_command(&self, cmd: ServerCommand) {
        let _ = self.command_tx.send(cmd);
    }
}

lib.rs

导出各个模块

pub mod event_bus;
pub mod log;
pub mod ws_server;
pub mod ws_ui;

log.rs

自定义日志格式

use std::fs::OpenOptions;

use log::LevelFilter;
use simplelog::{
    ColorChoice, CombinedLogger, Config, ConfigBuilder, TermLogger, TerminalMode, WriteLogger,
    format_description,
};

pub fn init_logger(log_file_name: &str) {
    let mut config = ConfigBuilder::new();
    config.set_time_format_custom(format_description!(
        "[year]-[month]-[day]T[hour]:[minute]:[second][offset_hour sign:mandatory]:[offset_minute]"
    ));
    CombinedLogger::init(vec![
        TermLogger::new(
            LevelFilter::Warn,
            Config::default(),
            TerminalMode::Mixed,
            ColorChoice::Auto,
        ),
        WriteLogger::new(
            LevelFilter::Info,
            config.build(),
            OpenOptions::new()
                .create(true)
                .append(true)
                .open(log_file_name)
                .unwrap(),
        ),
    ])
    .unwrap();
}

main.rs

程序入口

#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]

use gpui::{App, AppContext, Application, Bounds, WindowBounds, WindowOptions, px, size};
use gpui_tokio_bridge::{Tokio, init};
use ws_gpui::{
    event_bus::EventBus, log::init_logger, ws_server::WebSocketServer, ws_ui::WebSocketUi,
};

fn on_finish_launching(cx: &mut App) {
    init(cx); // 非常重要,留到后面介绍。

    let (event_bus, mut event_rx, command_rx) = EventBus::new();
    // 创建 WebSocket 服务
    let ws_service = WebSocketServer::new(event_bus.clone());

    let bounds = Bounds::centered(None, size(px(500.), px(250.0)), cx);
    let window_handle = cx
        .open_window(
            WindowOptions {
                window_bounds: Some(WindowBounds::Windowed(bounds)),
                ..Default::default()
            },
            move |_win, cx| {
                cx.new(|cx| {
                    // 在 tokio 任务处理来自 UI 的命令
                    Tokio::spawn(cx, async move {
                        ws_service.start(command_rx).await;
                    })
                    .detach();

                    WebSocketUi::new(event_bus)
                })
            },
        )
        .unwrap();

    // 在独立的任务处理来自服务的 事件
    cx.spawn(async move |async_cx| {
        while let Some(event) = event_rx.recv().await {
            let event_clone = event.clone();

            // 异步更新 UI,避免阻塞
            let _ = window_handle.update(async_cx, |client, _win, cx| {
                client.handle_event(event_clone, cx);
            });
        }
    })
    .detach();
}

fn main() {
    init_logger("ws-gui.log"); // 初始化日志
    let app = Application::new();
    app.run(on_finish_launching);
}

ws_server.rs

基于 tokio_tungstenite 实现 ws 服务,写得不够简洁,有时间可以再优化一下。

use crate::event_bus::{ClientInfo, EventBus, ServerCommand, ServerEvent};
use futures_util::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{RwLock, broadcast, mpsc};
use tokio_tungstenite::accept_async;
use uuid::Uuid;

type ClientSender = mpsc::UnboundedSender<String>;

pub struct WebSocketServer {
    clients: Arc<RwLock<HashMap<String, ClientSender>>>,
    event_bus: EventBus,
}

impl WebSocketServer {
    pub fn new(event_bus: EventBus) -> Self {
        Self {
            clients: Arc::new(RwLock::new(HashMap::new())),
            event_bus,
        }
    }

    // 新增一个带关闭信号的客户端处理函数
    async fn handle_client_with_shutdown(
        stream: tokio::net::TcpStream,
        addr: std::net::SocketAddr,
        clients: Arc<RwLock<HashMap<String, ClientSender>>>,
        event_tx: mpsc::UnboundedSender<ServerEvent>,
        mut shutdown_rx: broadcast::Receiver<()>,
    ) {
        let client_id = Uuid::new_v4().to_string();
        let client_info = ClientInfo {
            id: client_id.clone(),
            addr: addr.to_string(),
        };

        let ws_stream = match accept_async(stream).await {
            Ok(ws) => ws,
            Err(e) => {
                println!("WebSocket 握手失败: {}", e);
                return;
            }
        };

        let (mut ws_sender, mut ws_receiver) = ws_stream.split();
        let (tx, mut rx) = mpsc::unbounded_channel::<String>();

        clients.write().await.insert(client_id.clone(), tx);
        let _ = event_tx.send(ServerEvent::ClientConnected(client_info));

        let client_id_for_log = client_id.clone();

        // 使用原子布尔值来标记服务器是否正在关闭
        let server_is_shutting_down = Arc::new(std::sync::atomic::AtomicBool::new(false));

        // 启动消息发送任务,同时监听服务器关闭信号
        let server_is_shutting_down_clone = server_is_shutting_down.clone();
        tokio::spawn(async move {
            loop {
                tokio::select! {
                    msg = rx.recv() => {
                        match msg {
                            Some(msg_content) => {
                                // 检查是否是特殊的关闭消息
                                if msg_content == "CLOSE" {
                                    if ws_sender
                                        .send(tokio_tungstenite::tungstenite::Message::Close(None))
                                        .await
                                        .is_err()
                                    {
                                        break;
                                    }
                                } else {
                                    if ws_sender
                                        .send(tokio_tungstenite::tungstenite::Message::Text(msg_content.into()))
                                        .await
                                        .is_err()
                                    {
                                        break;
                                    }
                                }
                            }
                            None => break, // 通道已关闭
                        }
                    }
                    res = shutdown_rx.recv() => {
                        // 检查是否收到关闭信号或发送方已关闭
                        match res {
                            Ok(()) => {
                                // 收到服务器关闭信号,设置关闭标志
                                server_is_shutting_down_clone.store(true, std::sync::atomic::Ordering::Relaxed);
                                // 发送关闭消息并退出
                                if ws_sender
                                    .send(tokio_tungstenite::tungstenite::Message::Close(None))
                                    .await
                                    .is_ok() {
                                    // 等待一小段时间让消息发送完成
                                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
                                }
                            }
                            Err(broadcast::error::RecvError::Closed) => {
                                // 发送方已关闭,同样需要设置关闭标志
                                server_is_shutting_down_clone.store(true, std::sync::atomic::Ordering::Relaxed);
                                if ws_sender
                                    .send(tokio_tungstenite::tungstenite::Message::Close(None))
                                    .await
                                    .is_ok() {
                                    // 等待一小段时间让消息发送完成
                                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
                                }
                            }
                            Err(broadcast::error::RecvError::Lagged(_)) => {
                                // 一些消息滞后了,但我们仍然认为应该设置关闭标志
                                server_is_shutting_down_clone.store(true, std::sync::atomic::Ordering::Relaxed);
                                if ws_sender
                                    .send(tokio_tungstenite::tungstenite::Message::Close(None))
                                    .await
                                    .is_ok() {
                                    // 等待一小段时间让消息发送完成
                                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
                                }
                            }
                        }
                        break;
                    }
                }
            }
        });

        while let Some(msg) = ws_receiver.next().await {
            match msg {
                Ok(tokio_tungstenite::tungstenite::Message::Text(text)) => {
                    println!("收到来自 {} 的消息: {}", client_id_for_log, text);
                }
                Ok(tokio_tungstenite::tungstenite::Message::Close(_)) => {
                    println!("客户端 {} 关闭连接", client_id_for_log);
                    break;
                }
                Err(_) => {
                    break;
                }
                _ => {}
            }
        }

        // 检查服务器是否正在关闭,如果是,则不发送断开事件
        let is_server_shutdown = server_is_shutting_down.load(std::sync::atomic::Ordering::Relaxed);
        clients.write().await.remove(&client_id);
        if !is_server_shutdown {
            let _ = event_tx.send(ServerEvent::ClientDisconnected(client_id));
        }
    }

    pub async fn start(&self, mut command_rx: mpsc::UnboundedReceiver<ServerCommand>) {
        let mut server_handle: Option<tokio::task::JoinHandle<()>> = None;
        let mut shutdown_tx: Option<tokio::sync::broadcast::Sender<()>> = None;

        let clients = self.clients.clone();
        let event_bus = self.event_bus.clone();

        // 主命令处理循环
        while let Some(cmd) = command_rx.recv().await {
            match cmd {
                ServerCommand::Start => {
                    if server_handle.is_none() {
                        // 创建停止通道 - 使用 broadcast 通道,可以被多个客户端订阅
                        let (server_shutdown_tx, _) = tokio::sync::broadcast::channel::<()>(100);
                        shutdown_tx = Some(server_shutdown_tx.clone());

                        // 克隆必要的变量供异步任务使用
                        let event_bus_clone = event_bus.clone();
                        let clients_clone = clients.clone();

                        // 启动服务器任务
                        server_handle = Some(tokio::spawn(async move {
                            // 绑定TCP监听器
                            let listener = match tokio::net::TcpListener::bind("127.0.0.1:9001")
                                .await
                            {
                                Ok(l) => l,
                                Err(e) => {
                                    event_bus_clone
                                        .emit_event(ServerEvent::Error(format!("启动失败: {}", e)));
                                    println!("绑定地址失败: {}", e);
                                    return;
                                }
                            };

                            event_bus_clone.emit_event(ServerEvent::Started);
                            println!("WebSocket 服务器已启动在 127.0.0.1:9001");

                            let mut server_shutdown_rx = server_shutdown_tx.subscribe(); // 为服务器主线程创建一个订阅

                            loop {
                                tokio::select! {
                                    // 接受新连接
                                    accept_result = listener.accept() => {
                                        match accept_result {
                                            Ok((stream, addr)) => {
                                                let clients = clients_clone.clone();
                                                let event_tx = event_bus_clone.event_tx.clone();
                                                let shutdown_rx = server_shutdown_tx.subscribe(); // 为每个客户端创建一个订阅

                                                tokio::spawn(async move {
                                                    WebSocketServer::handle_client_with_shutdown(stream, addr, clients, event_tx, shutdown_rx).await;
                                                });
                                            }
                                            Err(e) => {
                                                println!("接受连接失败: {}", e);
                                            }
                                        }
                                    }
                                    // 监听停止信号
                                    _ = server_shutdown_rx.recv() => {
                                        println!("服务器停止信号已接收");
                                        break;
                                    }
                                }
                            }

                            // 服务器停止时主动断开所有客户端连接 - 异步处理以避免阻塞
                            let clients_clone_for_cleanup = clients_clone.clone();
                            tokio::spawn(async move {
                                let clients_map = clients_clone.read().await;
                                let client_ids: Vec<String> = clients_map.keys().cloned().collect();
                                drop(clients_map); // 释放读锁

                                // 并发向所有客户端发送断开连接消息
                                let mut handles = Vec::new();
                                for client_id in client_ids {
                                    let clients = clients_clone.clone();
                                    let handle = tokio::spawn(async move {
                                        let clients = clients.read().await;
                                        if let Some(tx) = clients.get(&client_id) {
                                            let _ = tx.send("CLOSE".to_string()); // 特殊消息表示关闭
                                        }
                                        drop(clients);
                                    });
                                    handles.push(handle);
                                }

                                // 等待所有发送任务完成
                                for handle in handles {
                                    let _ = handle.await;
                                }

                                // 清理客户端列表
                                {
                                    let mut clients = clients_clone_for_cleanup.write().await;
                                    clients.clear();
                                }
                            });

                            // 添加短暂延迟以确保清理完成
                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;

                            // 发送停止事件
                            event_bus_clone.emit_event(ServerEvent::Stopped);
                            println!("WebSocket 服务器已完全停止");
                        }));
                    }
                }
                ServerCommand::Stop => {
                    if let Some(handle) = server_handle.take() {
                        if let Some(tx) = shutdown_tx.take() {
                            let _ = tx.send(()); // 发送停止信号给所有订阅者
                        }

                        // 异步等待服务器任务结束,避免阻塞UI线程
                        tokio::spawn(async move {
                            let _ = handle.await;
                        });
                    } else {
                        // 如果服务器没有运行,也要发送停止事件
                        event_bus.emit_event(ServerEvent::Stopped);
                    }
                }
                ServerCommand::Broadcast { message } => {
                    if server_handle.is_some() {
                        // 仅当服务器正在运行时才处理命令
                        let clients = clients.clone();
                        let msg = message.clone();
                        tokio::spawn(async move {
                            let clients = clients.read().await;
                            for tx in clients.values() {
                                let _ = tx.send(msg.clone());
                            }
                        });
                    }
                }
            }
        }
    }
}

ws_ui.rs

UI 部分,完整代码可以参考 基于 GPUI 实现 WebSocket 服务端之 UI 篇

use gpui::{ClickEvent, CursorStyle, Window, div, prelude::*, rgb};

use crate::event_bus::{ClientInfo, EventBus, ServerCommand, ServerEvent};

pub struct WebSocketUi {
    is_running: bool,
    event_bus: EventBus,
    clients: Vec<ClientInfo>,
}

impl WebSocketUi {
    pub fn new(event_bus: EventBus) -> Self {
        Self {
            is_running: false,
            event_bus,
            clients: vec![],
        }
    }

    fn start(
        self: &mut WebSocketUi,
        _evt: &ClickEvent,
        _win: &mut Window,
        _cx: &mut Context<Self>,
    ) {
        if !self.is_running {
            self.event_bus.send_command(ServerCommand::Start);
        }
    }

    fn disconnect(
        self: &mut WebSocketUi,
        _evt: &ClickEvent,
        _win: &mut Window,
        _cx: &mut Context<Self>,
    ) {
        if self.is_running {
            self.event_bus.send_command(ServerCommand::Stop);
        }
    }

    fn send_test_message(
        self: &mut WebSocketUi,
        _evt: &ClickEvent,
        _win: &mut Window,
        _cx: &mut Context<Self>,
    ) {
        if self.is_running {
            let message = "Hello, World from ws server!".into();
            self.event_bus
                .send_command(ServerCommand::Broadcast { message });
        }
    }

    ... 省略一些代码

    pub fn handle_event(&mut self, event: ServerEvent, cx: &mut Context<Self>) {
        match event {
            ServerEvent::Started => {
                self.is_running = true;
            }
            ServerEvent::Stopped => {
                self.is_running = false;
                self.clients.clear();
            }
            ServerEvent::ClientConnected(info) => {
                self.clients.push(info);
            }
            ServerEvent::ClientDisconnected(id) => {
                self.clients.retain(|c| c.id != id);
            }
            ServerEvent::Error(msg) => {
                println!("错误: {}", msg);
            }
        }
        cx.notify();
    }
}

...省略一些代码

踩的坑

UI 没有响应

现象:此时,UI 已经没有响应。

根本原因是:存在两个异步运行时。

解决方法:引入 gpui-tokio-bridge,参考 gpui_tokio

复现:

测试

网页测试

参考

上一篇: 基于 GPUI 实现 WebSocket 服务端之 UI 篇 下一篇: 没有了