历史流式读
概述
IDE 趋势图拉历史时, 一次性把万点装进 Vec 会让进程 OOM。read_history_stream 返回 impl Stream<Item = DataValue> (基于 futures::stream), 业务侧用 while let Some(dv) = stream.next().await 一条条拿。
对应规范段: Part 11 §6.5。
API
| 方法 | 类别 | 读写 | 说明 |
|---|---|---|---|
| ua.read_history_stream(node_id, t0, t1, batch_size) | 历史 | 读 | 返回 impl Stream<Item = DataValue> |
代码示例
use opcua::DarraOpcUa;
use chrono::{Utc, Duration};
use futures::stream::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ua = DarraOpcUa::new("opc.tcp://localhost:4840")?;
ua.connect()?;
let t1 = Utc::now();
let t0 = t1 - Duration::hours(24);
// 1) 边拉边渲染
let mut s = ua.read_history_stream("ns=2;s=Temp", t0, t1, 1000);
while let Some(dv) = s.next().await {
chart_series.add_point(dv.source_timestamp, dv.value.as_double()?);
}
// 2) 取消 — 用 take_while
let mut s = ua.read_history_stream("ns=2;s=Temp", t0, t1, 1000)
.take_while(|_| futures::future::ready(!cancel_requested.load(Ordering::Relaxed)));
while let Some(dv) = s.next().await {
process(dv);
}
// 3) 计数坏值
use opcua::StatusCode;
let bad = ua.read_history_stream("ns=2;s=Temp", t0, t1, 1000)
.filter(|dv| futures::future::ready(dv.status != StatusCode::Good))
.count()
.await;
println!("24h 内 {} 个坏值", bad);
Ok(())
}
实现限制
- 当前底层 read_history 不暴露 ContinuationPoint, 一次 RPC 返回区间内所有点 (受 batch_size 限)
- 大区间业务侧自行切片
- 取消用 stream combinator (take_while / take_until)
最佳实践
- 长查询给取消令牌
- 区间 > 1 小时建议外层切片
- UI 渲染节流: chunks(100) 后再 send 给 UI
跨语言对照
| C# | Python | Java | C++ | Rust | C |
|---|---|---|---|---|---|
| ReadHistoryStreamAsync | read_history_stream | readHistoryStream | ReadHistoryStream | read_history_stream | DarraUa_Session_ReadHistoryStream |