ブログに書くつもりじゃなかった

フリーのプログラマーが綴る、裏チラ系の備忘録や雑記帳。

jackson-dataformat-xmlを使ってXMLをデシリアライズ

やりたいこと

前回ダウンロードした都道府県指定文化財データを全件ElasticSearchに登録したい。 その中で、jackson-dataformat-xmlを使ってXMLをJavaBeansにデシリアライズする。

準備

pom.xmlに以下を追記する。

<dependency>
  <groupId>com.fasterxml.jackson.dataformat</groupId>
  <artifactId>jackson-dataformat-xml</artifactId>
  <version>2.13.3</version>
</dependency>

公式ドキュメント

https://github.com/FasterXML/jackson-dataformat-xml

Bean

  • @JsonIgnoreProperties(ignoreUnknown = true)で、XMLマッピングする際に、Bean側に対応するフィールドがなくてもエラーを無視できる。今回は自分が使用しないデータは読み捨てたかったので指定。
  • @Datalombokアノテーション
@Data
@JacksonXmlRootElement(localName = "Dataset")
@JsonIgnoreProperties(ignoreUnknown = true)
public class GisData {

    @JacksonXmlProperty(localName = "Point")
    @JacksonXmlElementWrapper(useWrapping = false)
    private List<Point> points;

    @JacksonXmlProperty(localName = "CulturalProperty")
    @JacksonXmlElementWrapper(useWrapping = false)
    private List<CulturalProperty> culturalProperties;

    @Data
    @JsonIgnoreProperties(ignoreUnknown = true)
    public static class Point {
        @JacksonXmlProperty(isAttribute = true)
        private String id;
        @JacksonXmlProperty
        private String pos;        
    }
        
    @Data
    @JsonIgnoreProperties(ignoreUnknown = true)
    public static class CulturalProperty {
        @JacksonXmlProperty(localName = "position")
        private PositionHref positionHref;     
        @JacksonXmlProperty
        private String prefectureCode;
        @JacksonXmlProperty
        private String administrativeAreaCode;
        @JacksonXmlProperty
        private String largeClassificationCode;
        @JacksonXmlProperty
        private String smallClassificationCode;
        @JacksonXmlProperty
        private String culturalPropertyName;
        @JacksonXmlProperty
        private String address;
        @JacksonXmlProperty
        private Long specifiedDate;
        @JacksonXmlProperty
        private String pointClassificationCode;
    }

    @Data
    @JsonIgnoreProperties(ignoreUnknown=true)
    public static class PositionHref {
        @JacksonXmlProperty(isAttribute = true)
        private String href;
    }

}

シリアライズするコード

ObjectMapper xmlMapper = new XmlMapper();
GisData gisData = xmlMapper.readValue(new File(INPUT_FILE), GisData.class);

Beanの作成で分からなくなったら

いきなりBeanにマッピングしようとして期待する結果が得られず、しばらく悪戦苦闘した。そんな時は一旦readTree()JsonNodeにデシリアライズ、さらにJsonNodewriteValue()シリアライズして中身を確認すると、一気に理解が進んだ。

ObjectMapper xmlMapper = new XmlMapper();
JsonNode jsonNode = xmlMapper.readTree(new File(INPUT_FILE));

objectMapper.writeValue(new File(OUTPUT_XML_FILE), jsonNode);

ElasticSearchで位置情報を検索する

現在位置の近くにあるコンビニを探すとか、そういうのを試したくなったのでやってみる。

 

ElasticSearchの導入

お手軽に試したいだけなのでdockerを使用する。

$ docker pull elasticsearch:8.3.2

dockerイメージのelasticsearch.ymlTLSが有効になっていたが、お試しで使用するには煩わしい。-e "xpack.security.enabled=false"を付けてセキュリティ機能を無効にしてコンテナを起動する。

$ docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "xpack.security.enabled=false"elasticsearch:8.3.2

curlで死活確認。

$ curl -i http://localhost:9200/
HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 507

{
 "name" : "cba32a558fa6",
 "cluster_name" : "docker-cluster",
 "cluster_uuid" : "yjJCYYaoSTKkwO_6coZdDQ",
 "version" : {
   "number" : "8.3.2",
   "build_type" : "docker",
   "build_hash" : "8b0b1f23fbebecc3c88e4464319dea8989f374fd",
   "build_date" : "2022-07-06T15:15:15.901688194Z",
   "build_snapshot" : false,
   "lucene_version" : "9.2.0",
   "minimum_wire_compatibility_version" : "7.17.0",
   "minimum_index_compatibility_version" : "7.0.0"
},
 "tagline" : "You Know, for Search"
}

 

位置情報を登録する

登録するデータ

せっかくなので、ある程度まとまった件数の面白そうなデータが欲しい。なんとなく検索して見つかったのがこれ。国土交通省が提供している都道府県指定文化財データ

何やら説明文が難しいが、ダウンロードして中身を確認してみると至ってシンプルなxmlだった。これならjsonに変換してElasticSearchに登録するのも簡単そう。ElasticSearchの公式ドキュメントgeo_pointの記述方法を確認、登録するjsonデータを以下の形にする。

{
    "location": [140.714494, 41.756023],
    "prefectureCode": "01",
    "administrativeAreaCode": "01202",
    "largeClassificationCode": "1",
    "smallClassificationCode": "11",
    "culturalPropertyName": "樽岸出土の石器",
    "address": "函館市柳町17-1",
    "specifiedDate": 19571220,
    "pointClassificationCode": "1"
}

geo_pointを使用するにはマッピングを明示する必要があるらしいので、インデックスを登録する際に指定する。他のフィールドはテキトーに。

$ curl -i -XPUT -H "content-type: application/json" http://localhost:9200/property-01/ -d '
{
  "mappings": {
    "properties": {
      "location": {"type": "geo_point"},
      "prefectureCode": {"type": "keyword"},
      "administrativeAreaCode": {"type": "keyword"},  
      "largeClassificationCode": {"type": "keyword"},  
      "smallClassificationCode": {"type": "keyword"},
      "culturalPropertyName": {"type": "text"},
      "address": {"type": "text"},
      "specifiedDate": {"type": "long"},
      "pointClassificationCode": {"type": "keyword"}
    }
  }
}'

インデックスができたら、とりあえずドキュメントを1件登録。

$ curl -i -XPOST -H "content-type: application/json" http://localhost:9200/property-01/_doc/ -d '
{
  "location": [140.714494, 41.756023],
  "prefectureCode": "01",
  "administrativeAreaCode": "01202",
  "largeClassificationCode": "1",
  "smallClassificationCode": "11",
  "culturalPropertyName": "樽岸出土の石器",
  "address": "函館市柳町17-1",
  "specifiedDate": 19571220,
  "pointClassificationCode": "1"
}'

 

位置情報を検索する

さあ確認してみよう。件の場所をGoogle Mapで検索してみると、函館公園の一角にあると判明。そしてJR函館駅からの直線距離は、パッと見で1km以上3km未満といったところだ。この条件で検索してみる。

まずは函館駅から半径1km以内で検索。

$ curl -i -H "content-type: application/json" http://localhost:9200/*/_search?pretty -d '
{
  "query": {
    "geo_distance": {
      "distance": "1km",
      "location": {
        "lat": 41.773989,
        "lon": 140.726442
      }
    }
  }
}'

ヒットしない。

HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 262

{
  "took" : 11,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 0,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  }
}

条件を半径3kmに変更して検索。

$ curl -i -H "content-type: application/json" http://localhost:9200/*/_search?pretty -d '
{
"query": {
  "geo_distance": {
    "distance": "3km",
    "location": {
      "lat": 41.773989,
      "lon": 140.726442
    }
  }
}
}'

ヒットした。とりあえず上手くいったようだ。

HTTP/1.1 200 OK
X-elastic-product: Elasticsearch
content-type: application/json
content-length: 850

{
  "took" : 9,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "property-01",
        "_id" : "9e1AKYIBvFRNCcdg2ydQ",
        "_score" : 1.0,
        "_source" : {
           "location" : [
             140.714494,
             41.756023
           ],
           "prefectureCode" : "01",
           "administrativeAreaCode" : "01202",
           "largeClassificationCode" : "1",
           "smallClassificationCode" : "11",
           "culturalPropertyName" : "樽岸出土の石器",
           "address" : "函館市柳町17-1",
           "specifiedDate" : 19571220,
           "pointClassificationCode" : "1"
         }
       }
     ]
  }
}

 

今後の予定

ダウンロードしたデータを全部突っ込んで遊んでみよう。kibanaで可視化するのも良いかな。

 

Dockerコンテナ内のJavaプログラムのシャットダウン処理

やりたいこと

Dockerコンテナ上で動作するJavaプログラムをdocker stopで停止した時に、リソースの解放やその他の後始末に関する処理が実行されるようにしたい。

Javaプログラムにシグナル受信時の処理を追加する

最初にdocker stopを実行したときの動作を確認しておこう。Dockerのコマンドライン・リファレンスには、以下のように記載されている。

コンテナ内のメイン・プロセスがSIGTERMを受信し、一定期間の経過後、 SIGKILLを送信します。

ということは、Javaプログラム側はRuntime#addShutdownHookを使ってシャットダウンフックを追加し、シグナルを受信した時に呼び出されるようにすれば良さそう。早速サンプルを作って試してみよう。

public class Sample {
    public static void main(String[] args) throws Exception {

        Runtime.getRuntime().addShutdownHook(new Thread(
            () -> System.out.println("called ShutdownHook.")
        ));

        System.out.println("executing...");

        try {
            Thread.sleep(60000);      // 何もせず1分間待機
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.exit(0);
    }
}

上記プログラムをコンパイルして単体で実行してみる。実行後、一定時間経過したらCtrl + cを押す。

$ java Sample
executing...
^Ccalled ShutdownHook.

今度はSIGTERMを送ってみる。再度プログラムを実行する。

$ java Sample
executing...

別ウィンドウからkillコマンドを実行する。

$ kill -15 [PID]

シグナルを受信したJavaプログラムが、以下を出力して終了することを確認。

called ShutdownHook.

Dockerコンテナに入れて実行

Dockerfileを準備する。

FROM openjdk:17
COPY ./Sample.class .
CMD ["java", "Sample"]

イメージをビルド。

$ docker build -t shutdown-sample .

実行してみる。一定時間が経過したらCtrl + cを押して停止。

$ docker run -it --rm --name shutdown-sample shutdown-sample
executing...
^Ccalled ShutdownHook.

今度は-dオプションを付けてデタッチドモードで実行する。(コンテナ停止後にdocker logsを実行したいので、先ほど付けていた--rmオプションは外しておく。)

$ docker run -it --name shutdown-sample -d shutdown-sample

docker stopを実行する。

$ docker stop shutdown-sample
shutdown-sample

docker logsでログを確認。

$ docker logs shutdown-sample
executing...
called ShutdownHook.

ここまでは期待通り動いてくれた。

起動用スクリプトを追加してハマる

本番では起動用スクリプトからJavaプログラムを実行する。まあ結果は同じだろうけど、念のため確認しておこうか?と軽い気持ちで作業したら、これが結構ハマった。

startup.shという起動用スクリプトを用意する。

#!/bin/bash
java Sample

Dockerfileに追加。

FROM openjdk:17
COPY ./Sample.class .
COPY ./startup.sh .
RUN chmod +x startup.sh
CMD ["./startup.sh"]

ビルドして実行。一定時間経過後にCtrl + cを押す。

$ docker run -it --rm --name shutdown-sample shutdown-sample
executing...
^Ccalled ShutdownHook.

ここまではOK。次は-dを付けてデタッチドモードで実行する。

$ docker run -it --name shutdown-sample -d shutdown-sample

docker stopを実行する。

$ docker stop shutdown-sample

10秒ほど時間がかかってコンテナが終了。これはなんだか怪しい挙動だ。docker logsを覗いてみる。

$ docker logs shutdown-sample
executing...

やはりSIGTERMを受け付けていないようだ。終了するのに10秒ほどかかったのは、最終的にSIGKILLが受信されて終了したいうことか。

原因と解決策

リファレンスをもう一度読むと、docker stopSIGTERMを送信する対象はメインプロセスに対して、と記載されている。メインプロセスとは何か?それはPIDが1で実行されるプロセスとのことだ。 PID 1で実行しているのは?それはDockerfileCMDに記載しているbin/bash startup.shだ。その中でjava Sampleを実行したら?それは別プロセスになるだろう!そりゃSIGTERMが受信できる訳ないよ。何とも間抜けだな。

execを付けて同じプロセスでjavaを実行するようstartup.shを修正する。

#!/bin/bash
exec java Sample

これでイメージを作り直して再実行。今度は期待通りの動作になった。一件落着。

nginxでMQTTを負荷分散する

はじめに

nginxを使ってMQTTを負荷分散できることを確認したい。NginxとMosquittoはDockerコンテナを使用する。また、メッセージ送信確認に使用するmosquitto-clientはホスト側にインストール済み。

事前準備

Docker Hubにあるnginxの公式イメージが使用可能か確認する。コンテナを起動してコンパイルフラグを確認する。

root@a7ef8471144d:/# nginx -V 2>&1 | grep -oP '\-\-with-[a-z_]+'
--with-compat
--with-file
--with-threads
--with-http_addition_module
--with-http_auth_request_module
--with-http_dav_module
--with-http_flv_module
--with-http_gunzip_module
--with-http_gzip_static_module
--with-http_mp
--wit-http_random_index_module
--with-http_realip_module
--with-http_secure_link_module
--with-http_slice_module
--with-http_ssl_module
--with-http_stub_status_module
--with-http_sub_module
--with-http_v
--with-mail
--with-mail_ssl_module
--with-stream
--with-stream_realip_module
--with-stream_ssl_module
--with-stream_ssl_preread_module
--with-cc
--with-ld

--with-streamを確認できたので使えるみたいだ。

環境構築

docker-compose.yml
version: '3'
services:

  broker1:
    image: eclipse-mosquitto
    container_name: broker1
    volumes:
      - ./mosquitto.conf:/mosquitto/config/mosquitto.conf

  broker2:
    image: eclipse-mosquitto
    container_name: broker2
    volumes:
      - ./mosquitto.conf:/mosquitto/config/mosquitto.conf

  nginx:
    image: nginx
    container_name: nginx
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    ports:
      - 1883:1883
mosquitto.conf
allow_anonymous true
listener 1883
nginx.conf
user  nginx;
worker_processes  auto;

error_log  /var/log/nginx/error.log notice;
pid        /var/run/nginx.pid;


events {
    worker_connections  1024;
}

stream {
    upstream mqtt_brokers {
        server broker1:1883;
        server broker2:1883;
    }
    server {
        listen 1883;
        proxy_pass mqtt_brokers;
    }
}

動作確認その1

2つのブローカーそれぞれから自分に接続してトピックをサブスクライブ。

$ docker exec -it broker1 /bin/sh
/ # mosquitto_sub -t topic1
$ docker exec -it broker2 /bin/sh
/ # mosquitto_sub -t topic1

ホストからメッセージをパブリッシュする。

$ mosquitto_pub -t topic1 -m "It is $(date) now."

TCPコネクションの単位で負荷分散されて、ラウンドロビンでbroker1とbroker2に交互にメッセージが届くことを確認。ちなみにCONNECTしてPUBLISHしてDISCONNECTまでの一連のMQTTパケットは、HTTPのKeep-Aliveみたいに1つのTCPコネクションの中で送信されていた。MQTTパケット毎に別々のブローカーに届いたら予期しない動作をするのでは?という事は杞憂に過ぎない。はずである。そう願いたい。

動作確認その2

ブローカーが止まった時の動作も見てみよう。最後にbroker1でメッセージを受け取ったあと、broker2を落とす。

$ docker stop broker2

クライアントからメッセージをパブリッシュする。

$ mosquitto_pub -t topic1 -m "It is $(date) now."

20秒弱待ったあとでbroker1の方で受信できた。tcpdumpで通信内容を見るとこんな動きをしていた。

  • broker2に対して接続を実行。
  • 約1秒後にbroker2に対して接続リトライ(1回目)を実行。
  • 約2秒後にbroker2に対して接続リトライ(2回目)を実行。
  • 約4秒後にbroker2に対して接続リトライ(3回目)を実行。
  • 約12秒後にbroker1に対して接続リトライ(4回目)を実行。

この辺りのリトライの上限や間隔は設定値で変えられるはず。今後の調査課題としよう。また、この状態でしばらく放置したが、ヘルスチェックみたいな動きはなかった。ドキュメントを見る限り、この辺は別モジュールの機能っぽいな。

broker2を復活させる。

$ docker start broker2

broker2自身に接続してトピックをサブスクライブ。

$ docker exec -it broker2 /bin/sh
/ # mosquitto_sub -t topic1

ホストからnginxに向けてメッセージをパブリッシュする。3回目の実行でbroker2の方で受信できた。

とりあえず今回はここまでか。

Azure IoT Hub X.509証明書によるデバイス認証

やりたいこと

前回の続き。今度はAzure IoT HubでX.509証明書を使ってデバイスを認証したい。

Azure IoT Hub側

バイスは登録後に認証方法を変えられないみたいなので、新しいデバイス登録を登録する。認証の種類は「X.509 CA署名済み」を選択。

次にCA証明書を登録する。前々回作ったオレオレ認証局の鍵と証明書を使い回そう。が、拡張子チェックがあるので.crtは通らない。

ここは.pemに直して登録しよう。ついでに「アップロード時に証明書の状態を確認済みに設定する」をチェックして、アップロード後の証明書の確認手順をスキップ。

クライアント側

keytoolコマンドでデバイス秘密鍵自己署名証明書を生成し、新しいキーストアを作成する。

$ keytool -keystore mykeystore -genkey -keyalg RSA -keysize 2048 -alias client
キーストアのパスワードを入力してください:  
新規パスワードを再入力してください: 
姓名は何ですか。
  [Unknown]:  device0002
組織単位名は何ですか。
  [Unknown]:  
組織名は何ですか。
  [Unknown]:  
都市名または地域名は何ですか。
  [Unknown]:  Sendai
都道府県名または州名は何ですか。
  [Unknown]:  Miyagi
この単位に該当する2文字の国コードは何ですか。
  [Unknown]:  JP
CN=device0002, OU=Unknown, O=Unknown, L=Sendai, ST=Miyagi, C=JPでよろしいですか。
  [いいえ]:  y

90日間有効な2,048ビットのRSAのキー・ペアと自己署名型証明書(SHA256withRSA)を生成しています
    ディレクトリ名: CN=device0002, OU=Unknown, O=Unknown, L=Sendai, ST=Miyagi, C=JP

続いて証明書署名要求を作成する。

$ keytool -keystore mykeystore -certreq -alias client -keyalg RSA -file client.csr
キーストアのパスワードを入力してください:

証明書署名要求にopensslコマンドで署名する。

$ openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client.crt -days 365 -sha256
Signature ok
subject=/C=JP/ST=Miyagi/L=Sendai/O=Unknown/OU=Unknown/CN=device0002
Getting CA Private Key
Enter pass phrase for ca.key:

署名された証明書をキーストアにインポートする。前もってCAの証明書をインポートしないとkeytoolエラー: java.lang.Exception: 応答から連鎖を確立できませんでしたが出るよ。

$ keytool -import -keystore mykeystore -file ca.crt -alias rootCA
キーストアのパスワードを入力してください:  
所有者: CN=rootCA
発行者: CN=rootCA
シリアル番号: fad999777ea4f00a
有効期間の開始日: Fri Apr 01 14:01:33 JST 2022終了日: Fri Apr 08 14:01:33 JST 2022
証明書のフィンガプリント:
     SHA1: AC:34:65:2F:17:15:F2:A4:6A:96:05:31:B8:B6:6D:20:A6:6F:95:5B
     SHA256: AD:C6:DE:C2:F4:8E:1C:83:8D:92:EB:49:EE:1A:12:D2:87:8A:20:A3:6D:E1:A7:6E:07:CD:E6:F7:F1:A5:58:D1
署名アルゴリズム名: SHA256withRSA
サブジェクト公開キー・アルゴリズム: 2048ビットRSAキー
バージョン: 1
この証明書を信頼しますか。 [いいえ]:  y
証明書がキーストアに追加されました

CAの証明書に続いて自身の証明書をキーストアにインポートする。

$ keytool -import -keystore mykeystore -file client.crt -alias client
キーストアのパスワードを入力してください:  
証明書応答がキーストアにインストールされました

実行

前回のクライアントプログラムのデバイスIDを変更して、パスワード(SASトークン)はコメントアウトする。最後に以下のシステムプロパティを付けて実行。

-Djavax.net.ssl.keyStore=mykeystore -Djavax.net.ssl.keyStorePassword=topsecret

上手く動いたようだ。

Published message.
Delivered message.

ついでに確認

  • 証明書で認証をする場合は、MQTT ConnectパケットのクライアントIDと証明書のCommon Nameが一致するかまでチェックするみたい。デバイスIDを変えて実行するとエラーになったよ。
  • 証明書で認証するデバイスは、共有アクセスポリシーのキー(以下)を使ってSASトークン認証することはできない。

Azure IoT Hubにjava+MQTTでテレメトリを送信

Azure側

まずは無料アカウントを作る

自動で課金されることはないって言うけど、クレジットカード番号を要求されるのは心理的ハードルが高いよなぁ...と長いこと保留にしていたが、本業に差し障るので覚悟を決める。

IoT Hubを作成する

ポータルからサクッと作る。

  • IoT Hub名はグローバルでユニークにしないといけないみたい。
  • 領域は自分に近い場所として「East Asia」を選択する。「Japan East」が選択肢にあることにはデプロイしてから気づいた。
  • 学習用なので、価格とスケールティアは「F1: Freeレベル」で良いはずだ。
IoT Hubにデバイスを登録する

こちらもポータルからサクッと作る。

  • バイスIDを適当に入れる。
  • その他の項目は画面の通りにする。

クライアント側

Azure SDKを使えば手っ取り早いのだけど、MQTTも勉強しなくちゃいけないので、Eclipse Paho Java Clientを使って作る。

pom.xml

以下をdependenciesに追加。

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
 </dependency>
App.java

SHARED_ACCESS_KEYは自動生成されたデバイスの主キーを指定。ポータルで登録したデバイスを参照すると取得できる。

package com.example.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class App implements MqttCallback {

    private final static String IOT_HUB_NAME = "your-own-iot-hub.azure-devices.net";
    private final static String DEVICE_ID = "device0001";
    private final static String SHARED_ACCESS_KEY = "c2hhcmVkX2FjY2Vzc19rZXk=";
    
    public static void main(String[] args) {
        try {
            new App().run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void run() throws MqttException, InterruptedException {

        final String endpoint = String.format("ssl://%s:8883", IOT_HUB_NAME);
        final MqttClient client = new MqttClient(endpoint, DEVICE_ID);
        final MqttConnectOptions connOpts = new MqttConnectOptions();
    
        final String username = String.format("%s/%s/?api-version=2020-09-30", IOT_HUB_NAME, DEVICE_ID);
        final long expiryTime = (System.currentTimeMillis() / 1000L) + 30;        
        final String sasToken = SASToken.getSASToken(String.format("%s/devices/%s", IOT_HUB_NAME, DEVICE_ID) , SHARED_ACCESS_KEY, expiryTime);
        connOpts.setUserName(username);
        connOpts.setPassword(sasToken.toCharArray());

        client.connect(connOpts);
        client.setCallback(this);
    
        final MqttMessage message = new MqttMessage("{\"test\":\"value\"}".getBytes());
    
        client.publish("devices/" + DEVICE_ID + "/messages/events/" , message);
        System.out.println("Published message.");
    
        Thread.sleep(1000L);
    
        client.disconnect();
        
        System.exit(0);
    }
    
    public void connectionLost(Throwable cause) {
        System.out.println("Lost connection.");
    }
    
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("Received message from topic " + topic + ": " + message);
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("Delivered message.");
    }
}
SASToken.java

Azure IoT Hubのドキュメントに載ってたサンプルを流用。この選択が後に悲劇を生むことになる...

package com.example.mqtt;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Base64.Encoder;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;

public class SASToken {

    public static String getSASToken(String resourceUri, String key, long expiryTime) {

        String expiry = Long.toString(expiryTime);

        String sasToken = null;
        try {
//         String stringToSign = URLEncoder.encode(resourceUri, "UTF-8") + "\n" + expiry;
            String stringToSign = resourceUri + "\n" + expiry;
            String signature = getHMAC256(key, stringToSign);
//         sasToken = "SharedAccessSignature sr=" + URLEncoder.encode(resourceUri, "UTF-8") + "&sig="
            sasToken = "SharedAccessSignature sr=" + resourceUri + "&sig="
                    + URLEncoder.encode(signature, "UTF-8") + "&se=" + expiry;

        } catch (Exception e) {
            e.printStackTrace();
        }

        return sasToken;
    }

    private static String getHMAC256(String key, String input) {
        Mac sha256_HMAC = null;
        String hash = null;
        try {
            sha256_HMAC = Mac.getInstance("HmacSHA256");
//         SecretKeySpec secret_key = new SecretKeySpec(key.getBytes(), "HmacSHA256");
            byte[] decodedDeviceKey = Base64.getDecoder().decode(key.getBytes());
            SecretKeySpec secret_key = new SecretKeySpec(decodedDeviceKey, "HmacSHA256");
            sha256_HMAC.init(secret_key);
            Encoder encoder = Base64.getEncoder();

            hash = new String(encoder.encode(sha256_HMAC.doFinal(input.getBytes("UTF-8"))));

        } catch (Exception e) {
            e.printStackTrace();
        }

        return hash;
    }
}

動作確認

プログラムを実行する。上手くいったようだが出力結果が地味すぎる。

Published message.
Delivered message.

ポータルで受け取ったメッセージを簡単に見れないの?とあちこち探したが、見つけられず。この辺はあまり興味もないし、次回の課題にしよう。とりあえずダッシュボードのグラフにメッセージがカウントされたので、良しとするか。

ハマったポイント

SASトークンの仕様と生成サンプルについては、こちらの公式ドキュメントを参考にした。

docs.microsoft.com

「リソースURIをURLエンコードした文字列」を得るのにサンプルではURLEncoder.encode()を使っているが、これで「/」を「%2F」にエンコードするのがNGだったようだ。とりあえずURLエンコードが必要な文字は他にないので、暫定策として処理を省いて上手く動いた。原因を突き止めるのに、結局SDKを使ったクライアントを作って動かす羽目になったよ。やれやれ。

MosquittoでTLS通信

はじめに

昨日作ったMQTTのお試し環境TLS通信できるようにする。

DockerとEclipse MosquittoでMQTT - ブログに書くつもりじゃなかった

証明書の作成

この辺はググればいくらでも出てくる情報だが、俺自信が覚えるために。

認証局秘密鍵を作成
$ openssl genrsa -des3 -out ca.key 2048
Generating RSA private key, 2048 bit long modulus
.......+++
....+++
e is 65537 (0x10001)
Enter pass phrase for ca.key:
Verifying - Enter pass phrase for ca.key:
自己署名証明書を作成
$ openssl req -new -x509 -days 1826 -key ca.key -out ca.crt
Enter pass phrase for ca.key:
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) []:
State or Province Name (full name) []:
Locality Name (eg, city) []:
Organization Name (eg, company) []:
Organizational Unit Name (eg, section) []:
Common Name (eg, fully qualified host name) []:root-ca        
Email Address []:
サーバーの秘密鍵を作成
$ openssl genrsa -out server.key 2048
Generating RSA private key, 2048 bit long modulus
...............................+++
...............+++
e is 65537 (0x10001)
サーバーの証明書要求を作成
$ openssl req -new -out server.csr -key server.key
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) []:JP
State or Province Name (full name) []:Miyagi
Locality Name (eg, city) []:Sendai
Organization Name (eg, company) []:
Organizational Unit Name (eg, section) []:
Common Name (eg, fully qualified host name) []:mosquitto-broker
Email Address []:

Please enter the following 'extra' attributes
to be sent with your certificate request
A challenge password []:

Common Nameはホスト名に合わせる。

証明書要求に署名する
$ openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 360 -sha256
Signature ok
subject=/C=JP/ST=Miyagi/L=Sendai/CN=mosquitto-broker
Getting CA Private Key
Enter pass phrase for ca.key:

後述するけど、自分の環境では-sha256オプションが必要だった。

環境構築

mosquitto.conf

リッスンするポート番号を8883に変更。証明書と秘密鍵のパスを指定。

allow_anonymous true
listener 8883
cafile /mosquitto/certs/ca.crt
keyfile /mosquitto/certs/server.key
certfile /mosquitto/certs/server.crt
docker-compose.yml

証明書と秘密鍵のファイルをバインドする。

version: '3'
services:

  mosquitto-publisher:
    build: .
    container_name: mosquitto-publisher
    tty: true
    volumes:
      - ./ca.crt:/ca.crt

  mosquitto-subscriber:
    build: .
    container_name: mosquitto-subscriber
    tty: true
    volumes:
      - ./ca.crt:/ca.crt

  mosquitto-broker:
    image: eclipse-mosquitto
    container_name: mosquitto-broker
    volumes:
      - ./mosquitto.conf:/mosquitto/config/mosquitto.conf
      - ./ca.crt:/mosquitto/certs/ca.crt
      - ./server.key:/mosquitto/certs/server.key
      - ./server.crt:/mosquitto/certs/server.crt

実践

Subscriber側。--cafileオプションでCA証明書を指定する。

$ mosquitto_sub -h mosquitto-broker -t mytopic --cafile /ca.crt

Publisher側。同じく--cafileオプションでCA証明書を指定する。

$ mosquitto_pub -h mosquitto-broker -t mytopic -m "This is a test." --cafile /ca.crt

つまずいた箇所

当初mosquitto_submosquitto_pubを実行したときにTLSエラーが出た。

Error: A TLS error occurred.

証明書を作り直す、dockerコンテナの設定を変えるなどして何度も試したが、なかなか解決せず。結構な時間を使った後で、証明書の中身を覗くと、署名アルゴリズムがまさかのSHA1になってたのを発見。

$ openssl x509 -text -noout -in server.crt

どうやらこれが原因で、サーバー証明書の危険が危ない的なエラーになっていたようだ。証明書要求を認証する際に-sha256オプションを付けることで、無事解消。やれやれ。