2017.05.23

MQTTでマシンデータを集めよう!

竹澤です。

先日、テック系PodcastのRebuild
「もし世界中の人工物が壊滅した場合(人々の知識レベルはそのまま)、インターネットを復活させるには何から手を付けたらいいか」
というTV(?)の話をされていました。とても面白そうだったのでググってはみたものの、その映像は見つけられませんでした。代わりに「この世界が消えたあとの 科学文明のつくりかた」という本を見つけたので、それを今読んでいます。コンセプトは似ていて、
「パンデミックで人口が1万人ぐらいに減少したときに再び元の社会に戻るためには何から手を付けていくべきか」
というような内容で、当然ですが最初は水や食料からはじめてやがて電気やラジオといった方向に進んでいくのですが、そう考えるとインターネットを復活させるためにはずいぶん長い時間が必要なはずで、インターネットがあるこの世界は膨大な過去の遺産の積み重ねであることに気づかされて改めて驚かされます。

さて最近話題のIoTはこのインターネットのさらに先にあるものなので、先ほどの思考実験からするとIoTによる変革の時代を肌で感じられることは幸せなことかもしれません。IoTについてはもう説明の必要はないと思いますが、自動車や産業機械、家電などからインターネット経由でデータを収集し、分析して、新たな知見を得たり、有益なアクションにつなげたりする仕組みのことです。弊社のSkyOnDemandもこのIoT分野ではよく使われています。

さてSkyOnDemandにデータを集めるためのプロトコルとしてはREST on HTTPSがよく使われますが、別案としてMQTTを使いたいというお話もいただくことがあります。MQTTについてはこのあたりが分かりやすいと思いますが、HTTPよりもヘッダーが小さいことと、QoSの考え方があるところがユニークなところではないでしょうか。
MQTTには、まずブローカーと呼ばれる中継サーバーが必要です。そこに対してメッセージを投げる(Publishする)クライアントと、そのメッセージを受け取る(Subscribeする)クライアントがいますので、それらの3者が主な登場人物です。

さて、今回はAmazon EC2上にブローカーを構築して、デバイスからのデータを、SkyOnDemandで受け取る場合にどんな感じになるかをコードを交えてご説明したいと思います。

ブローカーの構築

ブローカーとしては昔からある(らしい)Mosquittoを利用します。

Mosquittoをインストールする先として、EC2は Amazon Linux AMI 2015.03 (HVM), SSD Volume Type - ami-cbf90ecb から t2.micro インスタンスを起動します。セキュリティグループ(やACL)の設定で、MQTTのデフォルトポートである1883を開けておく必要があります。

次にSSHでAmazon Linuxにログインして、Mosquittoをインストールしていきましょう。まずはダウンロードして展開します。

wget http://mosquitto.org/files/source/mosquitto-1.4.2.tar.gz
tar xzf mosquitto-1.4.2.tar.gz
cd mosquitto-1.4.2

インストール方法はINSTALLというファイル名ではなくcompiling.txtというファイル名なので少し探してしまいましたが、そこに必要なライブラリなどが書かれていますので、不足しているものを追加します。

sudo yum install gcc-c++ libuuid-devel openssl-devel c-ares-devel

ビルドの設定はconfig.mkで変更ができますが、今回はいじらずそのままビルドしてインストールします。

make
sudo make install

これでMosquittoのインストールは完了です。

デバイス側(Publishクライアント)の実装

MQTTのクライアントはJavaで実装したいと思います。少し前まではIoTに使われるデバイスでJavaというのはいろいろ使いにくい面があったのですが、Java 8から登場したコンパクトプロファイルにより使いやすくなりました。たとえばサン電子さんのRooster GXにもJava 8が搭載されています。

それでMQTTのJavaの実装としてはPahoを選択しました。最新は1.0.2でした。
ここから org.eclipse.paho.client.mqttv3-1.0.2.jar をダウンロードします。

Publish側のサンプルコードは、こちらにあるほぼそのままです。
詳細はソースコード中のコメントを参照してもらえればいいと思いますが、基本的にはブローカーに接続して「こんにちは、SkyOnDemand」というメッセージを1回投稿します。

public class MqttPublishSample {
	public static void main(String[] args) {
		// トピックはPublisherとSubscriberで同一である必要があります。
		String topic        = "MQTT on SkyOnDemand";
		// 送信するデータ
		String content      = "こんにちは、SkyOnDemand";
		// QoSレベル(0〜2。2が一番確実な伝送を実現する)
		int qos             = 2;
		// ブローカー。デフォルトでポート1883を使う。
		String broker       = "tcp://xx.xx.xx.xx:1883";
		// クライアントの識別子
		String clientId     = "Java Publish Client";
		
		try {
			log("Initializing");			
			MqttClient publishClient = new MqttClient(broker, clientId, new MemoryPersistence());
			MqttConnectOptions connOpts = new MqttConnectOptions();
			// QoSに沿った耐障害性の高い配信を行うためには、falseにセット
			connOpts.setCleanSession(true);
			
			log("Connecting to broker: " + broker);
			publishClient.connect(connOpts);
			log("Connected");
			
			log("Publishing message: " + content);
			MqttMessage message = new MqttMessage(content.getBytes());
			message.setQos(qos);
			publishClient.publish(topic, message);
			log("Message published");
			
			publishClient.disconnect();
			log("Disconnected");
		} catch(MqttException me) {
			log("reason "+me.getReasonCode());
			log("msg "+me.getMessage());
			log("loc "+me.getLocalizedMessage());
			log("cause "+me.getCause());
			log("excep "+me);
			me.printStackTrace();
		}
	}
	
	private static void log(String message) {
		System.out.println(message);
	}
}


SkyOnDemand側(Subscribeクライアント)の実装

SkyOnDemandにはトリガーという機構があり、指定した時刻が来たり、データを受け取ったりしたタイミングで、処理を起動することができます。MQTTのメッセージを受け取るのもトリガーにあたるのですが、トリガーのソースコードをお見せしても仕方ないので、同じくPahoを使ってトリガーがやっていることを概念的にご説明したいと思います。
Pahoのgitリポジトリで
org.eclipse.paho.mqtt.java/org.eclipse.paho.sample.mqttv3app/src/main/java/org/eclipse/paho/sample/mqttv3app/Sample.java
にあるコードを参考にして作ってみました。
なお、Publish側はメッセージを投げるとすぐにexitしてしまいますが、Subscribe側はENTERキーを押すまではメッセージを待ち続けて受信するたびに、メッセージを表示するようになっています。実際はメッセージを表示するのではなく、そのメッセージを連携処理に引き渡して処理を実行することになります。

public class MqttSubscribeSample implements MqttCallback {

	public static void main(String[] args) {
		//QoSレベル
		int qos 			= 2;
		//ブローカー
		String broker 		= "tcp://xx.xx.xx.xx:1883";
		//クライアントの識別子
		String clientId 	= "Java Subscribe Client";
		// トピックはPublisherとSubscriberで同一である必要があります。
		String topic		= "MQTT on SkyOnDemand";

		try {
			MqttSubscribeSample sampleClient = new MqttSubscribeSample();
			sampleClient.subscribe(broker, clientId, topic, qos);
		} catch(MqttException me) {
			// Display full details of any exception that occurs
			log("reason "+me.getReasonCode());
			log("msg "+me.getMessage());
			log("loc "+me.getLocalizedMessage());
			log("cause "+me.getCause());
			log("excep "+me);
			me.printStackTrace();
		}
	}
	
    /**
     * ブローカーに接続して指定トピックのメッセージを受信します。
     * ENTERキーを押すまで待機し続けます。
     * 
     * @param broker ブローカー
     * @param clientId クライアントID
     * @param サブスクライブするトピック(ワイルドカードも利用可)
     * @param このサブスクリプションで利用するQOSの最大値(これを超えるものは下げて受信される)
     * @throws MqttException
     */
    public void subscribe(String broker, String clientId, String topic, int qos) throws MqttException {
		log("Initializing");			
    	MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
    	client.setCallback(this);
    	MqttConnectOptions connOpts = new MqttConnectOptions();
		// QoSに沿った耐障害性の高い配信を行うためには、falseにセット
    	connOpts.setCleanSession(false);
    	
    	log("Connecting to broker: " + broker);
    	client.connect(connOpts);
    	log("Connected");

    	log("Subscribing to topic");
    	client.subscribe(topic, qos);

    	log("Press <Enter> to exit");
		try {
			System.in.read();
		} catch (IOException e) {
			//do nothing
		}

		client.disconnect();
		log("Disconnected");
    }
    
    /**
     * メッセージを受信したときに呼ばれるCallback。SkyOnDemandではスクリプトにメッセージを渡す処理を行う。
     */
    @Override
	public void messageArrived(String topic, MqttMessage message) throws MqttException {
		String time = new Timestamp(System.currentTimeMillis()).toString();
		log(time + "\tMessage: " + new String(message.getPayload()));
	}

    /**
     * ブローカーとの接続が失われた時に呼ばれるCallback。本来は再接続のロジックを入れる。
     */
    @Override
	public void connectionLost(Throwable cause) {
		log("Connection lost");
		System.exit(1);
	}

    /**
     * メッセージの送信が完了したときに呼ばれるCallback。
     */
    @Override
	public void deliveryComplete(IMqttDeliveryToken token) {
		//will not be called in this demo
		log("Delivery complete");
	}
	
	private static void log(String message) {
		System.out.println(message);
	}
}

実行してみる

では実行してみましょう。
最初にEC2上で mosquitto と打ってMosquittoを起動します。

つぎにSubscribeクライアントを実行します。
Mosquitto側には次のように表示され、接続されたことが確認できます。

1438144597: New connection from xx.xx.xx.xx on port 1883.
1438144597: New client connected from xx.xx.xx.xx as Java Subscribe Client (c0, k60).

次にPublishクライアントを実行します。
Mosquitto側には次のように表示され、接続され、メッセージが投稿後に切断されたことが分かります。

1438144733: New connection from xx.xx.xx.xx on port 1883.
1438144733: New client connected from xx.xx.xx.xx as Java Publish Client (c1, k60).
1438144733: Client Java Publish Client disconnected.

このときSubscribeクライアントのコンソールを見てみると、ちゃんとメッセージを受信していることがわかります。

Initializing
Connecting to broker: tcp://xx.xx.xx.xx:1883
Connected
Subscribing to topic
Press <Enter> to exit
2015-07-29 13:38:53.149	Message: こんにちは、SkyOnDemand

応用編

今回のケースではQoSを2に設定していますので、かならず1回配信されることが保証されます。
ためしにSubscribeクライアントを強制終了した状態で、Publishクライアントを起動しメッセージを送信します。再度Subscribe側を起動するとMosquittoに蓄積されていたメッセージがちゃんとSubscribeクライアントで受信できます。

最後に

いかがでしたでしょうか。REST on HTTPSによるIoTデータの収集とは違った面白さを感じていただけたのなら幸いです。
もしSkyOnDemandでのMQTTに興味がありましたら、是非お問い合わせください。

2 件
     
  • banner
  • banner

関連する記事