跳到主要内容

OPC UA → MQTT 桥接

OPC UA 适合本地高质量通讯, MQTT 适合公网大规模订阅. 桥接两者是 IIoT 标准方案. 用 C# + MQTTnet 示例.

配套示例

依赖

dotnet add package DarraOpcUa
dotnet add package MQTTnet

完整代码

using DarraOpcUa_Client;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

class Program
{
static async Task Main()
{
const string opcuaEndpoint = "opc.tcp://192.168.1.100:4840"; // 替换成现场 PLC 工控机 IP
const string mqttBroker = "mqtt.example.com";
const int mqttPort = 8883;
const string topicPrefix = "factory1/opcua";

var nodes = new[]
{
"ns=2;s=Boiler1.Temperature",
"ns=2;s=Boiler1.Pressure",
"ns=2;s=Line1.Counter",
};

// 1. MQTT Client 连接
var mqttFactory = new MqttFactory();
var mqtt = mqttFactory.CreateMqttClient();
var mqttOpts = new MqttClientOptionsBuilder()
.WithTcpServer(mqttBroker, mqttPort)
.WithCredentials("factory1", "secret")
.WithTls()
.WithCleanSession()
.Build();
await mqtt.ConnectAsync(mqttOpts);
Console.WriteLine($"[OK] MQTT connected {mqttBroker}:{mqttPort}");

// 2. OPC UA Client 连接
using var ua = new DarraOpcUa(opcuaEndpoint);
ua.Connect();
Console.WriteLine($"[OK] OPC UA connected {opcuaEndpoint}");

// 3. 订阅 + 转 MQTT (JSON payload)
using var sub = ua.CreateSubscription(1_000);
sub.DataChanged += async (s, e) =>
{
var topic = $"{topicPrefix}/{e.NodeId.Replace(";", "_").Replace("=", "_")}";
var payload = JsonSerializer.Serialize(new
{
nodeId = e.NodeId,
value = e.ValueString,
status = e.Status.ToString(),
ts = e.SourceTimestamp,
});

await mqtt.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithRetainFlag()
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build());

Console.WriteLine($" -> {topic}: {e.ValueString}");
};

sub.AddMany(nodes);
Console.WriteLine($"\nBridging {nodes.Length} tags. Ctrl+C 退出...");
await Task.Delay(Timeout.Infinite);
}
}

代码要点

  • Topic 不要直接拼 NodeId — 含 ; = 分隔符; 推荐工厂级命名 <plant>/<line>/<device>/<tag>, 订阅方才能用 wildcard
  • QoS 0 (可能丢) 用于高频遥测; QoS 1 (可能重) 是默认; QoS 2 (慢) 仅计费 / 命令下发
  • Retain Flag 让 Broker 保留每个 topic 最新一条, 状态量 (开关/模式) 必开, 高频遥测可关
  • 公网带宽贵, 高频 Tag 桥接前先做死区过滤 (变化超过 X% 才推), 否则带宽爆炸
  • 反向下发 (MQTT → ua.Write) 用单独 topic 命名空间 (如 factory1/cmd/#), 与上行数据分开避免回环
  • 对接 Sparkplug B 时改 Protobuf payload, 接入 Ignition / HiveMQ 等云端工具链成本更低

注意事项

  • 公网带宽贵 — 高频 Tag 桥接前先在客户端做死区过滤 (变化超过 X% 才推), 否则带宽爆炸.
  • 反向 (MQTT 下发指令 → ua.Write) 在另一个 topic 命名空间 (factory1/cmd/#), 与上行数据分开, 避免 loop.
  • 对接 Eclipse Sparkplug B 标准时, payload 改 Protobuf 而不是 JSON, 接入云端工具链 (Ignition / HiveMQ) 成本更低.

相关链接