跳到主要内容

历史流式读

概述

IDE 趋势图拉历史时, 一次性把万点装进 Vec 会让进程 OOM。read_history_stream 返回 impl Stream<Item = DataValue> (基于 futures::stream), 业务侧用 while let Some(dv) = stream.next().await 一条条拿。

对应规范段: Part 11 §6.5。

API

方法类别读写说明
ua.read_history_stream(node_id, t0, t1, batch_size)历史返回 impl Stream<Item = DataValue>

代码示例

use opcua::DarraOpcUa;
use chrono::{Utc, Duration};
use futures::stream::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ua = DarraOpcUa::new("opc.tcp://localhost:4840")?;
ua.connect()?;

let t1 = Utc::now();
let t0 = t1 - Duration::hours(24);

// 1) 边拉边渲染
let mut s = ua.read_history_stream("ns=2;s=Temp", t0, t1, 1000);
while let Some(dv) = s.next().await {
chart_series.add_point(dv.source_timestamp, dv.value.as_double()?);
}

// 2) 取消 — 用 take_while
let mut s = ua.read_history_stream("ns=2;s=Temp", t0, t1, 1000)
.take_while(|_| futures::future::ready(!cancel_requested.load(Ordering::Relaxed)));
while let Some(dv) = s.next().await {
process(dv);
}

// 3) 计数坏值
use opcua::StatusCode;
let bad = ua.read_history_stream("ns=2;s=Temp", t0, t1, 1000)
.filter(|dv| futures::future::ready(dv.status != StatusCode::Good))
.count()
.await;
println!("24h 内 {} 个坏值", bad);

Ok(())
}

实现限制

  • 当前底层 read_history 不暴露 ContinuationPoint, 一次 RPC 返回区间内所有点 (受 batch_size 限)
  • 大区间业务侧自行切片
  • 取消用 stream combinator (take_while / take_until)

最佳实践

  • 长查询给取消令牌
  • 区间 > 1 小时建议外层切片
  • UI 渲染节流: chunks(100) 后再 send 给 UI

跨语言对照

C#PythonJavaC++RustC
ReadHistoryStreamAsyncread_history_streamreadHistoryStreamReadHistoryStreamread_history_streamDarraUa_Session_ReadHistoryStream

下一步