上一篇
安卓开发之mqtt协议实例代码
- 行业动态
- 2025-04-22
- 2
环境准备
添加依赖库
使用Eclipse Paho MQTT库,需在build.gradle
中添加依赖:dependencies { implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' }
配置AndroidManifest权限
添加网络权限:<uses-permission android:name="android.permission.INTERNET" /> <uses-permission android:name="android.permission.WAKE_LOCK" />
核心代码实现
初始化MQTT客户端
// MQTT客户端配置 String serverUri = "tcp://broker.hivemq.com:1883"; // 公共测试服务器 String clientId = MqttClient.generateClientId(); // 自动生成客户端ID MemoryPersistence persistence = new MemoryPersistence(); // 非持久化存储 // 创建MQTT客户端 MqttClient client = new MqttClient(serverUri, clientId, persistence);
设置连接选项
参数 | 说明 | 示例值 |
---|---|---|
username |
MQTT服务器用户名(可选) | "user" |
password |
MQTT服务器密码(可选) | "password" |
cleanSession |
是否清除会话 | true |
connectionTimeout |
连接超时时间(毫秒) | 10 |
keepAliveInterval |
心跳间隔(秒) | 60 |
automaticReconnect |
是否自动重连 | true |
qos |
消息质量等级(默认0) | 1 |
MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setConnectionTimeout(10); options.setKeepAliveInterval(60); options.setAutomaticReconnect(true);
连接服务器并订阅主题
client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { Log.e("MQTT", "连接断开: " + cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) { Log.d("MQTT", "收到消息: " + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { Log.d("MQTT", "消息发送完成: " + token.isComplete()); } }); try { client.connect(options); // 建立连接 client.subscribe("test/topic", 1); // 订阅主题(QoS=1) } catch (MqttException e) { Log.e("MQTT", "连接失败: " + e.getMessage()); }
发布消息
String payload = "Hello MQTT!"; MqttMessage message = new MqttMessage(payload.getBytes()); message.setQos(1); // 设置QoS等级 try { client.publish("test/topic", message); // 发布消息 } catch (MqttException e) { Log.e("MQTT", "发布失败: " + e.getMessage()); }
完整流程示例
public class MqttService extends Service { private MqttClient client; private String serverUri = "tcp://broker.hivemq.com:1883"; private String clientId = MqttClient.generateClientId(); @Override public void onCreate() { super.onCreate(); initMqtt(); } private void initMqtt() { try { client = new MqttClient(serverUri, clientId, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setAutomaticReconnect(true); client.setCallback(new MqttCallback() { // 实现回调方法(见上文) }); client.connect(options); client.subscribe("test/topic"); } catch (MqttException e) { e.printStackTrace(); } } @Override public void onDestroy() { super.onDestroy(); try { client.disconnect(); // 断开连接 } catch (MqttException e) { e.printStackTrace(); } } }
常见问题与解答
问题1:连接失败如何处理?
解答:
- 检查
serverUri
是否正确(如协议tcp
/ssl
,端口号)。 - 确认网络权限已添加。
- 若使用用户名/密码,确保
options.setUserName()
和setPassword()
正确配置。 - 查看日志中的具体错误信息(如
MqttException
的reasonCode
)。
问题2:如何保证消息可靠传输?
解答:
- 设置QoS等级:
QoS 0
:最多一次传递(不可靠)。QoS 1
:至少一次传递(消息可能重复)。QoS 2
:仅一次传递(最高可靠性,开销大)。
- 启用持久会话:设置
options.setCleanSession(false)
,断线重连后可恢复订阅状态。 - 手动确认消息:在
deliveryComplete
回调中处理消息确认