2019.11.18
こんにちは、R&Mグループの内田です。
最近、IoT向けといわれているプロトコル「MQTT」を使い始めてみました。
様々なモノとモノがつながるIoTでは各デバイス・サーバの間でデータの受け渡しが発生します。
MQTTは軽量であること、ネットワーク環境が不安定でも動作すること、1対多通信などの特徴があり、IoT向けといわれているようです。
QoS(通信保障),Retain(最終メッセージ保存),will(遺言)など様々な機能が備わっています。
MQTTでは中継サーバである「Broker(ブローカ)」とクライアントである「Publisher(パブリッシャ)」、「Subscriber(サブスクライバ)」という3種類の役割が登場します。
主なメッセージ配信の流れは
といった形です。

さて、タイトルにあるPahoについてです。
※今回MQTTの各環境立ち上げについては割愛します。気になった方はネットで検索などしてみてください。
Pahoは、MQTTのクライアント(Publisher/Subscriber)を実装するためのライブラリです。
対応言語は複数あるようですがPythonを使いました。
試しに以下のスクリプトを作ってみます。
Publisher:topic名="Topic1"に "test1"というメッセージを送信する
Subscriber :topic名="Topic1"のメッセージを受け取る
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt # ライブラリのimport
# MQTT Broker
MQTT_HOST = "MQTT_broker" # brokerのアドレス
MQTT_PORT = 1883 # brokerのport
MQTT_KEEP_ALIVE = 60 # keep alive
# broker接続時
def on_connect(mqttc, obj, flags, rc):
print("rc: " + str(rc)) # 接続結果表示
mqttc = mqtt.Client() #clientオブジェクト作成
mqttc.on_connect = on_connect # 接続時に実行するコールバック関数設定
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEP_ALIVE) # MQTT broker接続
mqttc.loop_start() # 処理開始
mqttc.publish("topic1", "test1") # topic名="Topic1"に "test1"というメッセージを送信
import paho.mqtt.client as mqtt
# MQTT Broker
MQTT_HOST = "MQTT_broker" # brokerのアドレス
MQTT_PORT = 1883 # brokerのport
MQTT_KEEP_ALIVE = 60 # keep alive
# broker接続時
def on_connect(mqttc, obj, flags, rc):
print("rc: " + str(rc))
#メッセージ受信時
def on_message(mqttc, obj, msg):
print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))
mqttc = mqtt.Client()
mqttc.on_message = on_message # メッセージ受信時に実行するコールバック関数設定
mqttc.on_connect = on_connect
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEP_ALIVE)
mqttc.subscribe("topic1") # Topic名:"topic1"を購読
mqttc.loop_forever() # 永久ループ
実行結果
# python pub_test1.py
rc: 0
# python sub_test1.py
rc: 0
topic1 0 b'test1'
無事メッセージを受け取ることができました。
次に、受け取ったメッセージ次第でメインのループの処理を変えたいなと思ったのですが
on_message側から、mainループに値を返す or 共有する方法はないのか調べてみました。
以下サイトを読み解いてみると
コールバック関数は別スレッドで動いて非同期なので
on_messageのコールバック関数で変数をglobal宣言をして値を共有するのが正解みたいです。
Subscriberを改造してみます。
Publisher:topic名="Topic1"に "test1"というメッセージを送信する
Subscriber :topic名="Topic1"のメッセージを受け取ったらフラグ有効化し、ループを抜ける
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
import time
# MQTT Broker
MQTT_HOST = "MQTT_broker" # brokerのアドレス
MQTT_PORT = 1883 # brokerのport
MQTT_KEEP_ALIVE = 60 # keep alive
# broker接続時
def on_connect(mqttc, obj, flags, rc):
print("rc: " + str(rc))
#メッセージ受信時
def on_message(mqttc, obj, msg):
global on_message_Flag # メインループと共有するグローバル変数
print(msg.topic + " " + str(msg.qos) + " " + str(msg.payload))
on_message_Flag=True # flagを有効化
mqttc = mqtt.Client()
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEP_ALIVE)
mqttc.subscribe("topic1")
mqttc.loop_start()
on_message_Flag=False # 共有するグローバル変数
i=0
while True:
print(i)
i+=1
time.sleep(1)
if on_message_Flag: # フラグがTrueになるとループEND
print("END!")
break
実行結果
# python pub_test1.py
rc: 0
# python sub_test2.py
0
rc: 0
1
2
3
4
5
topic1 0 b'test1'
END!
メッセージを受け取ったのち無事ループを抜けました。
いろいろインターネットで検索をかけたり、公式のexamplesを参考にすると、もっと色々なことができそうですね!
サンビット株式会社では、開発技術者を募集しています!
興味のある方はぜひお問い合わせください。
2026.01.26
10年使った炊飯器が大きな故障はないものの、水漏れや炊き上がりの変化などから寿命を意識し、買い替えを検討。ネット検索では情報過多に戸惑い、家電量販店で最新事情を確認する流れを選びました。メーカーは保温性能と使い慣れた考え方を重視し、象印マホービンを継続候補に。価格帯や世代差、型落ち上位モデルの魅力を整理し、価格推移を見ながら最適な購入タイミングを探っています。
2026.01.23
ダッシュボードの目標管理を、もっと効率的に行いたい方におすすめの記事です。Salesforceの「動的ゲージグラフ」は、ユーザーやレコードの項目を目標値として参照できるため、複数のダッシュボードで同じ目標を使っている場合でも、元データを一か所更新するだけで反映される点が特長です。具体的な仕組みから具体的な作成手順、活用時のポイントまでを分かりやすく解説しています。KPI管理や目標達成率の可視化に役立つ内容となっております。
2026.01.19
疲れたときは「寝る」だけで十分だと思っていませんか?本記事では、科学的な視点から休養を7つのタイプに分類し、運動や人との交流、環境の変化など、行動することで回復につながる方法を紹介しています。自分に合った休養の取り方を知り、日々の疲労と上手に付き合うためのヒントをお届けします。
2026.01.14
2025年10月に開催されたさがねんりんピック2025 ラージボール卓球交流大会に混合ダブルスA級で参加した体験を振り返ります。予選リーグを2位で通過し、決勝トーナメントでは準決勝まで進出。苦手とするカットマンペアとの対戦を通じて、戦術理解や練習の重要性を実感しました。結果だけでなく、試合を通して感じた反省や今後の反省、健康と向き合いながら競技を続けていく思いについても触れています。