跳到主要内容

Events — ua.events.*

OpcUaSession.events 是 SDK 内部事件统一通道, 把会话生命周期 / 订阅生命周期 / 数据变化 / 通讯异常都汇集到一个对象上, 方便上层 UI 做日志 / 状态栏 / 调试.

与 OPC UA 协议事件区分
  • ua.events.* — SDK 内部事件 (本节, 与协议无关)
  • ua.subscribe_events(...) — OPC UA 协议事件 (Alarms & Conditions), 见 事件订阅

Python 事件机制

Python 没有 C# EventHandler<T> 强类型机制, SDK 用 List[Callable] 模拟:

ua.events.connected.append(callback)        # 注册
ua.events.connected.remove(callback) # 取消
ua.events.connected.clear() # 清空

每个事件就是一个 list, 触发时遍历调用所有 callable. 单个回调抛异常被 SDK 内部 try/except 吞掉, 不影响其他回调和主线程.

完整事件清单

会话生命周期

  • connected (ConnectionEventArgs: endpoint_url, status_code, message, timestamp_utc) — 首次连接成功
  • disconnected (ConnectionEventArgs: 同上) — 主动 / 被动断开
  • reconnecting (ConnectionEventArgs: 同上) — 检测到断线, 启动自动重连
  • reconnected (ConnectionEventArgs: 同上) — 自动重连成功
  • keep_alive (KeepAliveEventArgs: server_time_utc, server_status, timestamp_utc) — KeepAlive 心跳成功
  • state_changed (SessionStateChangedEventArgs: old_state, new_state, reason, timestamp_utc) — 任何 SessionState 变化

异常

  • communication_error (CommunicationErrorEventArgs: status_code, category, message, timestamp_utc) — 通讯失败 (KeepAlive 失败 / read 异常等)
  • subscription_lost (SubscriptionLostEventArgs: subscription_id, reason, will_reconnect, timestamp_utc) — 订阅被服务端清理

统一通道 events.any

所有类别事件都会同步触发此通道, 字段类 OpcUaEventEntry:

  • category (OpcUaEventCategory) — 类别枚举
  • severity (OpcUaEventSeverity) — 严重度 (Trace/Debug/Info/Warn/Error/Fatal)
  • source (str) — 事件源 (NodeId / 子组件名)
  • message (str) — 事件文本
  • status_code (StatusCode) — 关联状态码
  • timestamp_utc (datetime) — 触发时刻 (UTC)
  • original (Any) — 强类型 EventArgs (Connected → ConnectionEventArgs 等), 可空

OpcUaEventCategory 枚举

events.any 接收的 OpcUaEventEntry.category 完整清单:

Connected, Disconnected, Reconnecting, Reconnected, KeepAlive, StateChanged,
SubscriptionCreated, SubscriptionLost, SubscriptionRestored,
MonitoredItemAdded, MonitoredItemRemoved, SubscriptionCleared,
DataChange, ServerEvent, Alarm,
Read, Write, Browse, Call, HistoryRead,
CommunicationError, SecurityError, ProtocolError,
Info, Diagnostic,

# 节点 / 会话级新增分类 (2026-04-25)
NodeRemoved = 30 # 服务端删除某节点 (BadNodeIdUnknown)
NodeAccessDenied = 31 # 当前用户对该节点权限不足
NodeTypeChanged = 32 # 节点 DataType 变了 (BadTypeMismatch)
SessionLost = 33 # Session 失效 (BadSessionClosed/BadSessionIdInvalid)
ServerStateChanged = 34 # 服务端 Running → Failed/Shutdown 等

Typed 通道 (语法糖) — 6 个新事件

除了 any 统一通道, 6 个高频 Category 提供专属事件:

事件Category触发场景
node_removedNodeRemoved命中 BadNodeIdUnknown
node_access_deniedNodeAccessDenied命中 BadUserAccessDenied
node_type_changedNodeTypeChangedWrite 时命中 BadTypeMismatch
session_lostSessionLostSession 失效, 重连前预警
server_state_changedServerStateChanged服务端状态切换
subscription_restoredSubscriptionRestored订阅恢复成功

每个 typed 事件回调签名是 Callable[[OpcUaEventEntry], None]

def on_node_removed(e):
print(f"⚠ 节点已被服务端删除: {e.source} ({e.status_code})")
tree_view.remove_node(e.source)

ua.events.node_removed.append(on_node_removed)

ua.events.node_access_denied.append(
lambda e: print(f"⛔ 权限不足: {e.source}"))

ua.events.session_lost.append(
lambda e: status_bar.setText("⚠ 会话失效, 即将重连..."))

ua.events.server_state_changed.append(
lambda e: print(f"服务端状态: {e.message}"))

ua.events.subscription_restored.append(
lambda e: print(f"订阅已恢复: {e.source}"))

详见 自动重连订阅自恢复

OpcUaEventSeverity 枚举

Trace, Debug, Info, Warn, Error, Fatal

用法示例

通用日志钩子 (一行打印 SDK 内所有事件)

def on_any(e):
print(f"[{e.severity.name}] [{e.category.name}] {e.source}: {e.message} ({e.status_code.name})")

ua.events.any.append(on_any)

调试期非常省事 — 不订阅具体事件, 一行 hook 看全部.

状态栏绑定 (PyQt / Tk)

def on_state_changed(e):
# PyQt 必须切回 GUI 线程: QMetaObject.invokeMethod / signal-slot
label.setText(e.new_state.name)
color = "green" if e.new_state == SessionState.Connected else "red"
label.setStyleSheet(f"color: {color}")

ua.events.state_changed.append(on_state_changed)

错误统计

import threading
lock = threading.Lock()
counter = {"err": 0}

def on_err(e):
with lock:
counter["err"] += 1

ua.events.communication_error.append(on_err)

与订阅 data_changed 的关系

订阅 sub.data_changed 仅收到该订阅的事件; ua.events.any 通道里 category == DataChange 的项收到 SDK 下所有订阅的事件 (统一通道). 两者并存:

  • 单订阅业务逻辑 → sub.data_changed
  • 全局日志 / 录波 / 调试 → ua.events.any

线程模型

GIL + Publish 线程

事件回调在 C 层 Publish 线程上调用 (Python 端会先拿 GIL). UI 操作必须自行 marshal 到主线程 (PyQt: QMetaObject.invokeMethod / Tk: root.after / asyncio: loop.call_soon_threadsafe).

不要在事件里执行长操作 (>100 ms), 否则会阻塞 Publish 队列. 把数据放队列, 让另一个线程消费:

import queue, threading
q = queue.Queue()

ua.events.any.append(lambda e: q.put(e)) # 短回调

def consumer():
while True:
e = q.get()
# 慢处理 (写文件 / 数据库 / DNN 推理)

threading.Thread(target=consumer, daemon=True).start()

下一步