历史流式读
概述
IDE 趋势图拉历史时, 一次性把万点装进 list 会让进程卡顿 + OOM。read_history_stream 返回 AsyncIterator[DataValue], 业务侧用 async for 拿一条处理一条, 边拉边渲染。
对应规范段: Part 11 §6.5 (HistoryRead)。
API
| 方法 | 类别 | 读写 | 说明 |
|---|---|---|---|
| ua.read_history_stream(node_id, t0, t1, batch_size, cancel) | 历史 | 读 | 异步流式读 |
| 参数 | 默认 | 说明 |
|---|---|---|
| node_id | — | 历史节点 |
| start_utc | — | 起始 UTC |
| end_utc | — | 结束 UTC |
| batch_size | 1000 | 每次 RPC 最多点数 |
| cancel | None | asyncio.Event 取消 |
代码示例
import asyncio
from datetime import datetime, timedelta, timezone
from opcua import DarraOpcUa
async def main():
with DarraOpcUa("opc.tcp://localhost:4840") as ua:
ua.connect()
t1 = datetime.now(timezone.utc)
t0 = t1 - timedelta(hours=24)
# 1) 边拉边渲染
async for dv in ua.read_history_stream("ns=2;s=Temp", t0, t1):
chart_series.add_point(dv.source_timestamp, dv.value.as_double)
# 2) 取消
cancel = asyncio.Event()
asyncio.get_event_loop().call_later(5.0, cancel.set)
try:
async for dv in ua.read_history_stream("ns=2;s=Temp", t0, t1, cancel=cancel):
process(dv)
except asyncio.CancelledError:
print("已取消")
# 3) 计数坏值
bad = 0
async for dv in ua.read_history_stream("ns=2;s=Temp", t0, t1):
if dv.status != StatusCode.GOOD:
bad += 1
print(f"24h 内 {bad} 个坏值")
asyncio.run(main())
实现限制
- 当前底层
read_history不暴露 ContinuationPoint, 一次 RPC 返回区间内所有点 (受 batch_size 限) - 大区间业务侧自行切片
- 取消立即返回当前已拿部分
最佳实践
- 长查询给"取消"按钮
- 区间 > 1 小时建议外层切片
- UI 渲染节流: 每 100 条 refresh
跨语言对照
| C# | Python | Java | C++ | Rust | C |
|---|---|---|---|---|---|
| ReadHistoryStreamAsync | read_history_stream | readHistoryStream | ReadHistoryStreamAsync | read_history_stream | DarraUa_Session_ReadHistoryStream |