MQTT用戶端入門 - 五、.NET/C# 用戶端程式設計
.NET 平台可以使用套件 System.Net.Mqtt 設計 MQTT 用戶端程式。從套件的名稱空間可知,這是 .NET 平台正式的 MQTT 套件。由微軟旗下的 xamarin 團隊負責開發,專案托管於 System.Net.Mqtt 專案。
MQTT 概觀請看 一、在 Debian 8 安裝 mosquitto。
本文範例使用 C# 語言撰寫。並且說明三點使用須知:
- client ID
- 同步方法 Connect
- 分別主題處理方法
本文範例完整程式碼:
因為 System.Net.Mqtt 並不包含在 .NET SDK 預裝內容之中,所以想在 .NET 專案中使用 MQTT 套件時,首先要從套件庫取得並加入專案檔。我習慣使用 dotnet 命令列工具操作。操作指令如下:
dotnet add package System.Net.Mqtt --version 0.6.3-beta
client ID
使用 System.Net.Mqtt 時,如果目標 MQTT broker 要求連線認證的話,則我們需要先配置 MqttClientCredentials 實例。 MqttClientCredentials 建構方法有三個參數:
- clientId - 連線時,在 MQTT broker 註冊的用戶端識別代號。
- username - MQTT broker 的連線用戶名稱。
- password - MQTT broker 的連線用戶密碼。
在這三個參數之中,需要特別說明 clientId 的用途。 MQTT 規範 clientId 的主要用途是避免我的程式收到自己發佈的訊息。換句話說,如果有多個 MQTT 用戶端使用了同樣的 clientId ,那麼他們不會收到彼此發佈的訊息。
基本上,clientId 是選擇性參數,如果 clientId 為 null 或空字串,則套件庫會隨機產生一個。但是,如果我們要指定一個具規律性的識別代號的話,System.Net.Mqtt 就不太方便了。
MQTT 規格對 clientId 的用字有些限制。 Python, JavaScript 的 MQTT 套件庫會對我們傳入的 clientId 字串做一些處理,以免違背 MQTT 規格。但是 .NET 的 System.Net.Mqtt 套件庫不會這麼做。如果我們使用了不合規格的 clientId ,則 System.Net.Mqtt 會擲出錯誤。
System.Net.Mqtt 按照 MQTT 規格, clientId 有下列限制:
- 只允許使用 a-z, A-Z, 0-9 字元。
- MQTT 規格早期版本限制 clientId 長度 23 字元。
基於 Python 和 JavaScript 的使用經驗,我在配置 MqttClientCredentials 前,會將 clientId 用 MD5 編碼後取固定長度內容,免除使用者限制。我的第一個範例 mqtt-publish 設計的 Publisher 類別,就是這麼做。
class Publisher
{
public Publisher(
string broker,
int port,
string username,
string password,
string clientId=null)
: this(broker, port)
{
if (!string.IsNullOrEmpty(clientId)) {
var data = MD5.Create().ComputeHash(Encoding.UTF8.GetBytes(clientId));
// 用 MD5 將使用者輸入的 client id 轉成可用字元。
StringBuilder sbuf = new StringBuilder();
// 從雜湊結果取22字元(11位元組)。
for (var i = 0; i < 11; i++) {
sbuf.Append(data[i].ToString("x2"));
}
clientId = sbuf.ToString();
}
// 若省略 clientId (null or ""), MqttClientCredentials 會自己編一個
this.credentials = new MqttClientCredentials(clientId, username, password);
}
public async void PublishAsync(
string topic,
byte[] payload,
MqttQualityOfService qos = MqttQualityOfService.AtMostOnce)
{
var sessionState = await Client.ConnectAsync(Credentials);
var message = new MqttApplicationMessage(topic, payload);
await Client.PublishAsync(message, qos);
await Client.DisconnectAsync();
return;
}
}
class Program
{
static void Main(string[] args)
{
var broker = "localhost";
var clientId = "mqtt-publisher_test.pid";
// 我呼叫前會把 client id 用 MD5 編碼後取固定長度內容,不受規格限制。
var username = "test";
var password = "testtest";
var pub = new Publisher(
broker,
1883,
username,
password,
clientId
);
pub.PublishAsync("/test", Encoding.UTF8.GetBytes("hello"));
}
}
Publisher 類別不連續發佈訊息,也不訂閱主題。它適用於偶爾發佈一次訊息的情境。例如使用電池的 IoT 裝置,通常就用此方法發佈裝置狀態。 完整內容在 github 上:mqtt-publish。
同步方法 Connect
System.Net.Mqtt 設計的服務連接方法 ConnectAsync()
不太好用。
顧名思義,ConnectAsync()
是非同步方法。在和 MQTT broker 完成連線交握前,方法就返回了。然而:
- 在完成連線前,我們呼叫
PublishAsync()
發佈的內容成了空包彈。 - 在完成連線前,不能訂閱主題。如果在
ConnectAsync()
之後立即呼叫SubscribeAsync()
,會擲出錯誤。
基於以上兩點,所以我們實際上得要把 ConnectAsync()
變成同步工作才行。
在非同步方法中呼叫 ConnectAsync()
時,固定寫法如下:
var sessionState = await mqClient.ConnectAsync();
在同步方法中呼叫 ConnectAsync()
時,寫法如下:
var sessionState = Client.ConnectAsync().Result;
在同樣以非同步模式設計的 JavaScript MQTT 庫中,提供了 Connected 事件。我們可以利用此事件訂閱主題。請看 MQTT用戶端入門 - 二、JavaScript 用戶端程式設計。但我不知道為何 System.Net.Mqtt 雖有 Disconnected 事件,但從 v0.5 一直到 v0.6.3,都不提供 Connected 事件。
因為 ConnectAsync()
並不實用。所以我寫的 SimpleMqttClient 直接提供了同步方法。
public class SimpleMqttClient
{
internal static string GetAnonymousClientId() =>
string.Format(
"anonymous{0}",
Guid.NewGuid().ToString().Replace("-", string.Empty).Substring(0, 10)
);
/*
我不知道為何 System.Net.Mqtt (~v0.6.3) 沒有 Connected 事件。
*/
public SessionState Connect()
{
// Task.Run(()=>this.Client.ConnectAsync(Credentials)).Wait();
return Client.ConnectAsync(Credentials, cleanSession: true).Result;
}
}
除了同步方法 Connect()
之外, SimpleMqttClient 也改良了產生隨機 clientId 的方式。當參數省略 clientId 時,其內部方法 GetAnonymousClientId()
使用 Guid 作為隨機 clientId 的基底內容。
區分不同主題的處理方法
System.Net.Mqtt 採用 observer 設計模式處理訊息接收工作。先用 client 的 SubscribeAsync()
方法訂閱主題。再用 client 的 MessageStream.Subscribe()
指派訊息處理方法。我們可以在 client 串好幾個方法負責處理收到的訊息。
注意,SubscribeAsync()
的 qos 參數必須小於或等於 MqttConfiguration 的 MaximumQualityOfService
。
詳情請看 .NET MQTT 用戶端訂閱方法使用時的陷阱,關於 MaximumQualityOfService。
但 MQTT 客戶端有時會同時訂閱好幾個主題,而附加訊息處理方法的 Subscribe()
方法並沒有指定主題的參數。那麼我們的訊息處理方法要如何區分訊息主題呢?實務上,訊息處理方法會按照主題切割成好幾個小方法。每個方法各自處理自己感興趣的主題的訊息。
第一個直接做法,就是在處理方法中判斷訊息的 Topic 屬性。如下:
var mqClient = new SimpleMqttClient();
// 假設此處理方法只負責 tw/test/1 主題的訊息
mqClient.Client.MessageStream
.Subscribe(msg => {
if (msg.Topic != "tw/test/1")
return;
Console.WriteLine(String.Format("topic {0}\n{1}",
msg.Topic,
Encoding.UTF8.GetString(msg.Payload)));
return;
});
第二個做法是引入 System.Reactive.Linq ,使用 LINQ 提供的擴充方法 Where()
。
using System.Reactive.Linq;
// 加上這個 Linq 套件, MessageStream 才能接 Where
var mqClient = new SimpleMqttClient();
// 用 Where 過濾其他主題的訊息,所以處理方法只會看到 tw/test/2 主題的訊息
mqClient.Client.MessageStream
.Where(msg => msg.Topic == "tw/test/2")
.Subscribe(msg => {
Console.WriteLine(Encoding.UTF8.GetString(msg.Payload));
});
完整內容在 github 上:SimpleMqttClient。