概述
EventBus(事件总线)是一种基于发布-订阅模式的组件间通信机制。它通过一个全局的事件中心,实现模块间松耦合的消息传递,避免组件间的直接依赖。
核心思想
- 发布者(Publisher):触发事件,不关心谁在监听
- 订阅者(Subscriber):注册监听特定事件,不关心谁触发
- 事件总线(Bus):中间调度层,管理所有事件的注册和分发
架构设计
核心组成
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Publisher │────▶│ EventBus │────▶│ Subscriber │
│ (发布者) │ │ (事件总线) │ │ (订阅者) │
└──────────────┘ └──────────────┘ └──────────────┘
│
┌─────┴─────┐
│ Event Map │
│ 事件注册表 │
└───────────┘
数据结构
事件总线内部维护一个事件注册表(Event Map),结构如下:
interface EventBus {
// 事件名 -> 回调函数列表
events: Map<string, Set<CallbackFunction>>;
}
执行流程
1. 订阅阶段(Subscribe)
Subscriber ──▶ EventBus.on("eventName", callback)
│
▼
events["eventName"].add(callback)
订阅者向事件总线注册一个回调函数,绑定到指定事件名称。
2. 发布阶段(Publish / Emit)
Publisher ──▶ EventBus.emit("eventName", payload)
│
▼
events["eventName"].forEach(cb => cb(payload))
发布者通过事件总线触发事件,总线遍历该事件的所有回调依次执行。
3. 取消订阅(Unsubscribe)
Subscriber ──▶ EventBus.off("eventName", callback)
│
▼
events["eventName"].delete(callback)
实现细节
TypeScript 完整实现
type EventHandler = (...args: any[]) => void;
class EventBus {
private events: Map<string, Set<EventHandler>> = new Map();
/**
* 订阅事件
*/
on(event: string, handler: EventHandler): void {
if (!this.events.has(event)) {
this.events.set(event, new Set());
}
this.events.get(event)!.add(handler);
}
/**
* 一次性订阅(触发一次后自动移除)
*/
once(event: string, handler: EventHandler): void {
const wrapper: EventHandler = (...args) => {
handler(...args);
this.off(event, wrapper);
};
this.on(event, wrapper);
}
/**
* 发布事件
*/
emit(event: string, ...args: any[]): void {
const handlers = this.events.get(event);
if (handlers) {
handlers.forEach(handler => {
try {
handler(...args);
} catch (error) {
console.error(`EventBus: Error in handler for "${event}"`, error);
}
});
}
}
/**
* 取消订阅
*/
off(event: string, handler: EventHandler): void {
const handlers = this.events.get(event);
if (handlers) {
handlers.delete(handler);
if (handlers.size === 0) {
this.events.delete(event);
}
}
}
/**
* 移除某事件的所有监听器
*/
offAll(event?: string): void {
if (event) {
this.events.delete(event);
} else {
this.events.clear();
}
}
}
// 单例导出
export const eventBus = new EventBus();
C# 实现(Unity 适用)
using System;
using System.Collections.Generic;
public class EventBus
{
private static EventBus _instance;
public static EventBus Instance => _instance ??= new EventBus();
private readonly Dictionary<string, List<Delegate>> _events = new();
/// <summary>
/// 订阅事件
/// </summary>
public void On<T>(string eventName, Action<T> handler)
{
if (!_events.ContainsKey(eventName))
_events[eventName] = new List<Delegate>();
_events[eventName].Add(handler);
}
/// <summary>
/// 发布事件
/// </summary>
public void Emit<T>(string eventName, T data)
{
if (_events.TryGetValue(eventName, out var handlers))
{
foreach (var handler in handlers.ToArray())
{
try
{
((Action<T>)handler)?.Invoke(data);
}
catch (Exception ex)
{
UnityEngine.Debug.LogError(
$"EventBus: Error handling '{eventName}': {ex.Message}");
}
}
}
}
/// <summary>
/// 取消订阅
/// </summary>
public void Off<T>(string eventName, Action<T> handler)
{
if (_events.TryGetValue(eventName, out var handlers))
{
handlers.Remove(handler);
if (handlers.Count == 0)
_events.Remove(eventName);
}
}
/// <summary>
/// 清除所有事件
/// </summary>
public void Clear() => _events.Clear();
}
执行时序
完整的事件从注册到触发的时序流程:
时间线 ──────────────────────────────────────────────▶
ModuleA EventBus ModuleB
│ │ │
│ on("login", cb) │ │
│──────────────────▶ │ │
│ │ on("login", cb2) │
│ │ ◀────────────────── │
│ │ │
│ │ │
│ emit("login", user) │
│──────────────────▶ │ │
│ │── cb(user) ────────▶ │
│ │── cb2(user) ───────▶ │
│ │ │
│ │ off("login", cb2) │
│ │ ◀────────────────── │
│ │ │
使用场景
| 场景 | 说明 |
|---|---|
| 跨组件通信 | UI 组件间无父子关系时的消息传递 |
| 模块解耦 | 业务模块间避免直接 import 依赖 |
| 插件系统 | 主程序与插件之间通过事件交互 |
| 游戏事件 | 角色死亡、关卡完成等全局事件通知 |
| 状态同步 | 数据变化通知多个视图更新 |
MMO 游戏业务系统网状拓扑
在大型 MMO 游戏中,数十个业务系统通过 EventBus 形成复杂的网状通信拓扑。以下展示典型 MMO 架构中各子系统的事件依赖关系:
核心事件网络
┌─────────────┐
┌─────────▶│ 成就系统 │◀─────────┐
│ └──────┬──────┘ │
│ │ │
│ ▼ │
┌─────┴─────┐ ┌─────────────┐ ┌──────┴──────┐
│ 任务系统 │◀─▶│ EventBus │◀─▶│ 关卡系统 │
└─────┬─────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
│ ┌──────┼──────┐ │
▼ ▼ ▼ ▼ ▼
┌───────────┐ ┌─────┐┌─────┐┌─────┐┌───────────┐
│ 背包/道具 │ │角色 ││战斗 ││社交 ││ 副本系统 │
└─────┬─────┘ └──┬──┘└──┬──┘└──┬──┘└──────┬────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌───────────┐┌──────┐┌─────┐┌──────┐┌─────────┐
│ 交易系统 ││装备 ││技能 ││公会 ││排行榜 │
└───────────┘└──────┘└─────┘└──────┘└─────────┘
│ │ │ │ │
└──────────┴──────┴──────┴──────────┘
│
┌──────┴──────┐
│ 日志/统计 │
└─────────────┘
事件流向详解
以下是 MMO 中各系统通过 EventBus 传递的关键事件:
┌──────────────────────────────────────────────────────────────────────┐
│ EventBus 事件注册表 │
├──────────────────┬───────────────────────────────────────────────────┤
│ 事件名称 │ 订阅者 │
├──────────────────┼───────────────────────────────────────────────────┤
│ monster:killed │ 任务、成就、关卡、背包(掉落)、经验、排行榜 │
│ quest:completed │ 成就、背包(奖励)、关卡(解锁)、角色(经验)、日志 │
│ level:up │ 技能(解锁)、装备(解锁)、成就、社交(通知)、副本 │
│ item:acquired │ 任务(检查)、成就、背包、交易、日志 │
│ dungeon:cleared │ 成就、排行榜、关卡、任务、奖励、公会 │
│ player:login │ 社交、公会、邮件、活动、日志、每日任务 │
│ player:death │ 战斗、副本、统计、成就(连续存活)、日志 │
│ skill:used │ 战斗、冷却、统计、成就(技能使用次数) │
│ trade:completed │ 背包、日志、成就(交易次数)、社交 │
│ guild:event │ 社交、成就、排行榜、副本(公会本)、奖励 │
│ pvp:result │ 排行榜、成就、赛季、奖励、统计 │
│ activity:trigger │ 任务、成就、UI、奖励、日志 │
└──────────────────┴───────────────────────────────────────────────────┘
典型事件链路(击杀 Boss)
一次 Boss 击杀如何触发多系统级联响应:
玩家击杀 Boss
│
├──▶ emit("monster:killed", { id, type:"boss", zone })
│ │
│ ├──▶ [任务系统] 检查击杀任务进度 → emit("quest:progress")
│ ├──▶ [成就系统] 累计击杀数 +1 → 达标则 emit("achievement:unlock")
│ ├──▶ [背包系统] 生成掉落物品 → emit("item:acquired")
│ ├──▶ [关卡系统] 检查区域清除条件
│ ├──▶ [排行榜] 更新 DPS/击杀榜
│ ├──▶ [副本系统] 检查副本通关条件 → emit("dungeon:cleared")
│ └──▶ [日志系统] 记录战斗日志
│
├──▶ emit("item:acquired", { items: [...] })
│ ├──▶ [任务系统] 检查收集任务
│ ├──▶ [成就系统] 稀有物品收集进度
│ └──▶ [交易系统] 更新市场参考价
│
└──▶ emit("dungeon:cleared", { dungeonId, time, team })
├──▶ [成就系统] 首次通关成就
├──▶ [排行榜] 副本速通排名
├──▶ [公会系统] 公会贡献 +N
└──▶ [关卡系统] 解锁下一难度
系统间依赖矩阵
| 发布者 \ 订阅者 | 任务 | 成就 | 关卡 | 背包 | 战斗 | 社交 | 排行 | 副本 |
|---|---|---|---|---|---|---|---|---|
| 战斗系统 | ✓ | ✓ | ✓ | ✓ | - | · | ✓ | ✓ |
| 任务系统 | - | ✓ | ✓ | ✓ | · | · | · | · |
| 关卡系统 | ✓ | ✓ | - | ✓ | · | · | ✓ | ✓ |
| 角色系统 | ✓ | ✓ | ✓ | · | ✓ | ✓ | ✓ | · |
| 社交系统 | · | ✓ | · | · | · | - | · | · |
| 副本系统 | ✓ | ✓ | ✓ | ✓ | · | ✓ | ✓ | - |
| 交易系统 | · | ✓ | · | ✓ | · | · | · | · |
✓ = 有事件订阅关系 · = 无直接关系 - = 自身
设计要点
在 MMO 这种复杂网状结构中使用 EventBus 需要注意:
- 事件分层:按优先级分为 System / Game / UI 三层,避免跨层级联
- 防止循环:A→B→C→A 的事件循环会导致死锁,需设计单向事件流
- 批量合并:高频事件(如伤害计算)使用节流,每帧只处理一次
- 事件溯源:记录事件链 ID,方便定位级联 Bug
- 模块隔离:每个子系统只监听自己关心的事件,不越权处理
优缺点对比
优势
- 解耦性强:发布者和订阅者互不感知
- 灵活性高:可动态增删订阅
- 易于扩展:新模块只需订阅感兴趣的事件
劣势
- 调试困难:事件流向不直观,难以追踪
- 内存泄漏:忘记取消订阅会导致引用残留
- 执行顺序:多个订阅者的执行顺序不可控
同步回调 vs 异步回调
EventBus 最核心的设计决策之一:回调函数应该同步执行还是异步执行?两种模式在执行时序、数据一致性、性能表现上有本质差异。
执行模型对比
【同步模型】emit 时立即逐个执行回调,完成后才返回
emit("kill") handler_A() handler_B() handler_C() emit返回
│──────────▶│ │ │ │
│ │─────执行───▶│ │ │
│ │ │─────执行───▶│ │
│ │ │ │─────执行───▶│
│◀──────────────────────────────────────────────────── │
│ 总耗时 = A + B + C │
【异步模型】emit 时将回调推入队列/微任务,emit 立即返回
emit("kill") 主线程继续 事件循环处理队列
│──push queue──▶│ │
│◀──立即返回─────│ │
│ │──后续逻辑──▶ │
│ │ │──handler_A()──▶
│ │ │──handler_B()──▶
│ │ │──handler_C()──▶
同步回调详解
工作原理
// 同步 EventBus — emit 阻塞式执行
class SyncEventBus {
private events = new Map<string, Set<Function>>();
emit(event: string, ...args: any[]): void {
const handlers = this.events.get(event);
if (handlers) {
// 逐个同步调用,任何一个慢都会阻塞后续
for (const handler of handlers) {
handler(...args); // 阻塞在这里直到 handler 返回
}
}
// 所有 handler 执行完毕后才到达这里
}
}
优势
- 数据一致性强:emit 返回时所有副作用已完成,状态已更新
- 执行顺序确定:回调按注册顺序依次执行
- 调试友好:调用栈完整,断点可从 emit 一路追踪到每个 handler
- 事务性:可以在 emit 之后立即读取被 handler 修改的状态
致命问题:主线程阻塞
【问题场景】击杀 Boss 触发 7 个同步回调
帧时间预算: 16.6ms (60fps)
emit("monster:killed")
├── 任务系统检查 2ms
├── 成就系统累计 1ms
├── 背包掉落计算 3ms ← 包含随机数+权重表查询
├── 关卡状态更新 1ms
├── 排行榜写入 5ms ← 排序操作
├── 副本条件判断 2ms
└── 日志写入 4ms ← I/O 操作
────────
总计 18ms ← 超出帧预算!导致掉帧
同步分线策略
针对同步阻塞问题,有以下处理方案:
方案一:优先级分层执行
class PrioritySyncBus {
// 按优先级分桶:critical 必须当帧完成,normal 可延迟
private buckets = {
critical: new Map<string, Set<Function>>(), // 立即执行
normal: new Map<string, Set<Function>>(), // 当帧执行但在 critical 之后
deferred: new Map<string, Set<Function>>(), // 延迟到下一帧
};
emit(event: string, ...args: any[]): void {
// 1. 立即执行关键回调(如战斗伤害结算)
this.buckets.critical.get(event)?.forEach(h => h(...args));
// 2. 执行普通回调
this.buckets.normal.get(event)?.forEach(h => h(...args));
// 3. 延迟回调推入下一帧队列
const deferred = this.buckets.deferred.get(event);
if (deferred) {
requestAnimationFrame(() => {
deferred.forEach(h => h(...args));
});
}
}
}
方案二:时间片切割
class TimeBudgetBus {
private readonly BUDGET_MS = 4; // 每次 emit 最多占用 4ms
emit(event: string, ...args: any[]): void {
const handlers = [...(this.events.get(event) || [])];
const start = performance.now();
let i = 0;
// 在时间预算内尽可能多执行
while (i < handlers.length) {
handlers[i](...args);
i++;
if (performance.now() - start > this.BUDGET_MS && i < handlers.length) {
// 超时,剩余回调推迟到下一帧
const remaining = handlers.slice(i);
requestAnimationFrame(() => {
remaining.forEach(h => h(...args));
});
break;
}
}
}
}
方案三:C# 协程分帧(Unity)
public class CoroutineEventBus : MonoBehaviour
{
public void Emit(string eventName, object data)
{
StartCoroutine(EmitCoroutine(eventName, data));
}
private IEnumerator EmitCoroutine(string eventName, object data)
{
var handlers = GetHandlers(eventName);
float frameStart = Time.realtimeSinceStartup;
foreach (var handler in handlers)
{
handler.Invoke(data);
// 每个 handler 执行后检查是否超出帧预算
if (Time.realtimeSinceStartup - frameStart > 0.004f) // 4ms
{
yield return null; // 让出到下一帧
frameStart = Time.realtimeSinceStartup;
}
}
}
}
异步回调详解
工作原理
// 异步 EventBus — emit 非阻塞
class AsyncEventBus {
private events = new Map<string, Set<Function>>();
emit(event: string, ...args: any[]): void {
const handlers = this.events.get(event);
if (handlers) {
for (const handler of handlers) {
// 推入微任务队列,不阻塞当前执行
queueMicrotask(() => handler(...args));
}
}
// 立即返回,handler 尚未执行
}
// 或使用 Promise 包装
async emitAsync(event: string, ...args: any[]): Promise<void> {
const handlers = this.events.get(event);
if (handlers) {
await Promise.all(
[...handlers].map(h => Promise.resolve().then(() => h(...args)))
);
}
}
}
优势
- 不阻塞主线程:emit 立即返回,游戏帧率不受影响
- 天然并发:多个 handler 可并行处理(如果运行在 Worker 中)
- 容错性好:单个 handler 异常不影响其他 handler 的调度
致命问题:数据镜像与一致性
【问题场景】异步回调读到的数据可能已经过时
帧 N: 玩家血量 = 100
emit("player:damaged", { hp: 100, damage: 30 })
│── handler 推入异步队列
│── 主线程继续执行
│── 又收到一次伤害 → 血量变为 40
帧 N+1: 异步 handler 开始执行
handler 拿到的数据: { hp: 100, damage: 30 }
但此时玩家实际血量 = 40 ← 数据已过期!
handler 按 hp=100 计算 → 逻辑错误
数据镜像问题的解决方案
方案一:深拷贝快照(Snapshot)
class SnapshotEventBus {
emit(event: string, data: any): void {
// 在 emit 时刻冻结数据快照
const snapshot = structuredClone(data);
const handlers = this.events.get(event);
if (handlers) {
for (const handler of handlers) {
queueMicrotask(() => handler(snapshot));
}
}
}
}
// 使用:handler 拿到的永远是 emit 时刻的一致数据
eventBus.emit("player:damaged", {
hp: player.hp, // 发射时的值
damage: 30,
timestamp: Date.now() // 附带时间戳供 handler 判断时效性
});
方案二:版本号校验
class VersionedEventBus {
private version = new Map<string, number>();
emit(event: string, data: any): void {
const ver = (this.version.get(event) || 0) + 1;
this.version.set(event, ver);
const handlers = this.events.get(event);
if (handlers) {
for (const handler of handlers) {
const capturedVer = ver;
queueMicrotask(() => {
// handler 执行时检查:如果版本已更新,说明有更新的事件
if (this.version.get(event) === capturedVer) {
handler(data); // 仍是最新 → 执行
} else {
// 已过期 → 跳过或合并处理
console.log(`Skipped stale ${event} v${capturedVer}`);
}
});
}
}
}
}
方案三:不可变数据 + 事件溯源
interface GameEvent<T = any> {
readonly type: string;
readonly payload: Readonly<T>; // 不可变
readonly eventId: string; // 唯一 ID
readonly timestamp: number; // 发生时间
readonly frameId: number; // 发生的帧号
}
class ImmutableEventBus {
private frameCounter = 0;
emit<T>(type: string, payload: T): void {
// 创建不可变事件对象
const event: GameEvent<T> = Object.freeze({
type,
payload: Object.freeze({ ...payload }),
eventId: crypto.randomUUID(),
timestamp: performance.now(),
frameId: this.frameCounter,
});
// handler 拿到的是冻结对象,无法被任何人修改
queueMicrotask(() => {
this.events.get(type)?.forEach(h => h(event));
});
}
}
同步 vs 异步 全面对比
| 维度 | 同步回调 | 异步回调 |
|---|---|---|
| 执行时机 | emit 内部立即执行 | 推入队列/微任务,延迟执行 |
| emit 返回时 | 所有 handler 已完成 | handler 尚未开始 |
| 帧率影响 | handler 越多/越慢,帧率越低 | 不影响当前帧(但可能影响下一帧) |
| 数据一致性 | 强一致:读到的就是 emit 时刻的状态 | 弱一致:数据可能已被后续操作修改 |
| 调用栈 | 完整可追踪 | 被截断,需额外工具追踪 |
| 异常传播 | handler 异常会中断后续 handler | handler 异常互不影响 |
| 执行顺序 | 确定(注册顺序) | 不确定(取决于调度器) |
| 调试难度 | 低 | 高(时序问题难复现) |
| 吞吐量 | 低(串行阻塞) | 高(可并行调度) |
| 适用场景 | 状态查询、数据校验、事务操作 | UI 更新、日志、统计、网络请求 |
游戏中的选型判断依据
根据业务特征选择同步或异步:
是否需要 handler 的返回值/副作用?
│
┌─── YES ─┴── NO ───┐
│ │
结果是否影响 对帧率是否敏感?
当前帧的逻辑? │
│ ┌─ YES ┴─ NO ─┐
┌─YES─┴─NO──┐ │ │
│ │ 【异步】 【都行】
【同步】 handler 优先同步
必须同步 是否耗时? (简单直观)
│
┌─YES─┴─ NO ─┐
│ │
【异步】 【同步】
推入下帧 当帧完成
按业务系统推荐
| 系统 | 推荐模式 | 理由 |
|---|---|---|
| 伤害结算 | 同步 | 后续逻辑依赖结果(死亡判定、护盾计算) |
| 任务进度检查 | 同步 | 需要立即知道是否达成完成条件 |
| 技能冷却触发 | 同步 | 影响当前帧的输入响应 |
| 背包物品增删 | 同步 | UI 需要立即刷新,避免闪烁 |
| 成就累计统计 | 异步 | 纯累加操作,不影响游戏逻辑 |
| 排行榜更新 | 异步 | 排序耗时,不影响当前帧 |
| 日志/埋点上报 | 异步 | 纯记录,零游戏逻辑依赖 |
| 社交通知推送 | 异步 | 网络 I/O,必须异步 |
| 副本通关判定 | 同步 | 需要立即触发结算流程和奖励发放 |
| 音效/特效播放 | 异步 | 容忍 1-2 帧延迟,减轻主线程压力 |
混合模式最佳实践
class HybridEventBus {
private syncHandlers = new Map<string, Set<Function>>();
private asyncHandlers = new Map<string, Set<Function>>();
// 注册时指定同步/异步
on(event: string, handler: Function, mode: 'sync' | 'async' = 'sync') {
const map = mode === 'sync' ? this.syncHandlers : this.asyncHandlers;
if (!map.has(event)) map.set(event, new Set());
map.get(event)!.add(handler);
}
emit(event: string, ...args: any[]): void {
// 第一阶段:同步回调立即执行(保证数据一致性)
this.syncHandlers.get(event)?.forEach(h => h(...args));
// 第二阶段:异步回调推入微任务(不阻塞主线程)
const snapshot = structuredClone(args); // 快照数据
this.asyncHandlers.get(event)?.forEach(h => {
queueMicrotask(() => h(...snapshot));
});
}
}
// 使用示例
const bus = new HybridEventBus();
// 伤害结算必须同步 —— 后续需要知道目标是否死亡
bus.on('monster:killed', handleQuestProgress, 'sync');
bus.on('monster:killed', handleLootDrop, 'sync');
// 统计和日志可以异步 —— 不影响游戏逻辑
bus.on('monster:killed', handleAchievementCount, 'async');
bus.on('monster:killed', handleBattleLog, 'async');
bus.on('monster:killed', handleLeaderboard, 'async');
最佳实践
1. 使用常量定义事件名
// events.ts
export const EVENTS = {
USER_LOGIN: 'user:login',
USER_LOGOUT: 'user:logout',
DATA_LOADED: 'data:loaded',
THEME_CHANGED: 'theme:changed',
} as const;
2. 组件销毁时必须取消订阅
class MyComponent {
private handler = (data: any) => { /* ... */ };
mount() {
eventBus.on(EVENTS.DATA_LOADED, this.handler);
}
unmount() {
eventBus.off(EVENTS.DATA_LOADED, this.handler);
}
}
3. 带类型约束的事件定义
interface EventMap {
'user:login': { userId: string; timestamp: number };
'user:logout': void;
'data:loaded': { items: any[]; total: number };
}
class TypedEventBus {
on<K extends keyof EventMap>(
event: K,
handler: (payload: EventMap[K]) => void
): void { /* ... */ }
emit<K extends keyof EventMap>(
event: K,
payload: EventMap[K]
): void { /* ... */ }
}
与其他模式对比
| 模式 | 耦合度 | 通信方向 | 适用规模 |
|---|---|---|---|
| 直接调用 | 高 | 一对一 | 小型 |
| 回调函数 | 中 | 一对一 | 小型 |
| EventBus | 低 | 多对多 | 中型 |
| Redux/Vuex | 低 | 单向流 | 大型 |
| RxJS/响应式 | 低 | 流式 | 复杂异步 |