Events — ua.events.*
OpcUaSession.events 是 SDK 内部事件统一通道, 把会话生命周期 / 订阅生命周期 / 数据变化 / 通讯异常都汇集到一个对象上, 方便上层 UI 做日志 / 状态栏 / 调试.
ua.events.*— SDK 内部事件 (本节, 与协议无关)ua.subscribe_events(...)— OPC UA 协议事件 (Alarms & Conditions), 见 事件订阅
Python 事件机制
Python 没有 C# EventHandler<T> 强类型机制, SDK 用 List[Callable] 模拟:
ua.events.connected.append(callback) # 注册
ua.events.connected.remove(callback) # 取消
ua.events.connected.clear() # 清空
每个事件就是一个 list, 触发时遍历调用所有 callable. 单个回调抛异常被 SDK 内部 try/except 吞掉, 不影响其他回调和主线程.
完整事件清单
会话生命周期
connected(ConnectionEventArgs: endpoint_url, status_code, message, timestamp_utc)— 首次连接成功disconnected(ConnectionEventArgs: 同上)— 主动 / 被动断开reconnecting(ConnectionEventArgs: 同上)— 检测到断线, 启动自动重连reconnected(ConnectionEventArgs: 同上)— 自动重连成功keep_alive(KeepAliveEventArgs: server_time_utc, server_status, timestamp_utc)— KeepAlive 心跳成功state_changed(SessionStateChangedEventArgs: old_state, new_state, reason, timestamp_utc)— 任何 SessionState 变化
异常
communication_error(CommunicationErrorEventArgs: status_code, category, message, timestamp_utc)— 通讯失败 (KeepAlive 失败 / read 异常等)subscription_lost(SubscriptionLostEventArgs: subscription_id, reason, will_reconnect, timestamp_utc)— 订阅被服务端清理
统一通道 events.any
所有类别事件都会同步触发此通道, 字段类 OpcUaEventEntry:
category(OpcUaEventCategory)— 类别枚举severity(OpcUaEventSeverity)— 严重度 (Trace/Debug/Info/Warn/Error/Fatal)source(str)— 事件源 (NodeId / 子组件名)message(str)— 事件文本status_code(StatusCode)— 关联状态码timestamp_utc(datetime)— 触发时刻 (UTC)original(Any)— 强类型 EventArgs (Connected → ConnectionEventArgs 等), 可空
OpcUaEventCategory 枚举
events.any 接收的 OpcUaEventEntry.category 完整清单:
Connected, Disconnected, Reconnecting, Reconnected, KeepAlive, StateChanged,
SubscriptionCreated, SubscriptionLost, SubscriptionRestored,
MonitoredItemAdded, MonitoredItemRemoved, SubscriptionCleared,
DataChange, ServerEvent, Alarm,
Read, Write, Browse, Call, HistoryRead,
CommunicationError, SecurityError, ProtocolError,
Info, Diagnostic,
# 节点 / 会话级新增分类 (2026-04-25)
NodeRemoved = 30 # 服务端删除某节点 (BadNodeIdUnknown)
NodeAccessDenied = 31 # 当前用户对该节点权限不足
NodeTypeChanged = 32 # 节点 DataType 变了 (BadTypeMismatch)
SessionLost = 33 # Session 失效 (BadSessionClosed/BadSessionIdInvalid)
ServerStateChanged = 34 # 服务端 Running → Failed/Shutdown 等
Typed 通道 (语法糖) — 6 个新事件
除了 any 统一通道, 6 个高频 Category 提供专属事件:
| 事件 | Category | 触发场景 |
|---|---|---|
| node_removed | NodeRemoved | 命中 BadNodeIdUnknown |
| node_access_denied | NodeAccessDenied | 命中 BadUserAccessDenied |
| node_type_changed | NodeTypeChanged | Write 时命中 BadTypeMismatch |
| session_lost | SessionLost | Session 失效, 重连前预警 |
| server_state_changed | ServerStateChanged | 服务端状态切换 |
| subscription_restored | SubscriptionRestored | 订阅恢复成功 |
每个 typed 事件回调签名是 Callable[[OpcUaEventEntry], None]。
def on_node_removed(e):
print(f"⚠ 节点已被服务端删除: {e.source} ({e.status_code})")
tree_view.remove_node(e.source)
ua.events.node_removed.append(on_node_removed)
ua.events.node_access_denied.append(
lambda e: print(f"⛔ 权限不足: {e.source}"))
ua.events.session_lost.append(
lambda e: status_bar.setText("⚠ 会话失效, 即将重连..."))
ua.events.server_state_changed.append(
lambda e: print(f"服务端状态: {e.message}"))
ua.events.subscription_restored.append(
lambda e: print(f"订阅已恢复: {e.source}"))
OpcUaEventSeverity 枚举
Trace, Debug, Info, Warn, Error, Fatal
用法示例
通用日志钩子 (一行打印 SDK 内所有事件)
def on_any(e):
print(f"[{e.severity.name}] [{e.category.name}] {e.source}: {e.message} ({e.status_code.name})")
ua.events.any.append(on_any)
调试期非常省事 — 不订阅具体事件, 一行 hook 看全部.
状态栏绑定 (PyQt / Tk)
def on_state_changed(e):
# PyQt 必须切回 GUI 线程: QMetaObject.invokeMethod / signal-slot
label.setText(e.new_state.name)
color = "green" if e.new_state == SessionState.Connected else "red"
label.setStyleSheet(f"color: {color}")
ua.events.state_changed.append(on_state_changed)
错误统计
import threading
lock = threading.Lock()
counter = {"err": 0}
def on_err(e):
with lock:
counter["err"] += 1
ua.events.communication_error.append(on_err)
与订阅 data_changed 的关系
订阅 sub.data_changed 仅收到该订阅的事件; ua.events.any 通道里 category == DataChange 的项收到 SDK 下所有订阅的事件 (统一通道). 两者并存:
- 单订阅业务逻辑 →
sub.data_changed - 全局日志 / 录波 / 调试 →
ua.events.any
线程模型
事件回调在 C 层 Publish 线程上调用 (Python 端会先拿 GIL). UI 操作必须自行 marshal 到主线程 (PyQt: QMetaObject.invokeMethod / Tk: root.after / asyncio: loop.call_soon_threadsafe).
不要在事件里执行长操作 (>100 ms), 否则会阻塞 Publish 队列. 把数据放队列, 让另一个线程消费:
import queue, threading
q = queue.Queue()
ua.events.any.append(lambda e: q.put(e)) # 短回调
def consumer():
while True:
e = q.get()
# 慢处理 (写文件 / 数据库 / DNN 推理)
threading.Thread(target=consumer, daemon=True).start()