当前位置:首页 > 行业动态 > 正文

安卓开发之mqtt协议实例代码

环境准备

  1. 添加依赖库
    使用Eclipse Paho MQTT库,需在build.gradle中添加依赖:

    安卓开发之mqtt协议实例代码  第1张

    dependencies {
        implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
        implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
    }
  2. 配置AndroidManifest权限
    添加网络权限:

    <uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.WAKE_LOCK" />

核心代码实现

初始化MQTT客户端

// MQTT客户端配置
String serverUri = "tcp://broker.hivemq.com:1883"; // 公共测试服务器
String clientId = MqttClient.generateClientId(); // 自动生成客户端ID
MemoryPersistence persistence = new MemoryPersistence(); // 非持久化存储
// 创建MQTT客户端
MqttClient client = new MqttClient(serverUri, clientId, persistence);

设置连接选项

参数 说明 示例值
username MQTT服务器用户名(可选) "user"
password MQTT服务器密码(可选) "password"
cleanSession 是否清除会话 true
connectionTimeout 连接超时时间(毫秒) 10
keepAliveInterval 心跳间隔(秒) 60
automaticReconnect 是否自动重连 true
qos 消息质量等级(默认0) 1
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(10);
options.setKeepAliveInterval(60);
options.setAutomaticReconnect(true);

连接服务器并订阅主题

client.setCallback(new MqttCallback() {
    @Override
    public void connectionLost(Throwable cause) {
        Log.e("MQTT", "连接断开: " + cause.getMessage());
    }
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        Log.d("MQTT", "收到消息: " + new String(message.getPayload()));
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        Log.d("MQTT", "消息发送完成: " + token.isComplete());
    }
});
try {
    client.connect(options); // 建立连接
    client.subscribe("test/topic", 1); // 订阅主题(QoS=1)
} catch (MqttException e) {
    Log.e("MQTT", "连接失败: " + e.getMessage());
}

发布消息

String payload = "Hello MQTT!";
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(1); // 设置QoS等级
try {
    client.publish("test/topic", message); // 发布消息
} catch (MqttException e) {
    Log.e("MQTT", "发布失败: " + e.getMessage());
}

完整流程示例

public class MqttService extends Service {
    private MqttClient client;
    private String serverUri = "tcp://broker.hivemq.com:1883";
    private String clientId = MqttClient.generateClientId();
    @Override
    public void onCreate() {
        super.onCreate();
        initMqtt();
    }
    private void initMqtt() {
        try {
            client = new MqttClient(serverUri, clientId, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setAutomaticReconnect(true);
            client.setCallback(new MqttCallback() {
                // 实现回调方法(见上文)
            });
            client.connect(options);
            client.subscribe("test/topic");
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void onDestroy() {
        super.onDestroy();
        try {
            client.disconnect(); // 断开连接
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

常见问题与解答

问题1:连接失败如何处理?

解答

  • 检查serverUri是否正确(如协议tcp/ssl,端口号)。
  • 确认网络权限已添加。
  • 若使用用户名/密码,确保options.setUserName()setPassword()正确配置。
  • 查看日志中的具体错误信息(如MqttExceptionreasonCode)。

问题2:如何保证消息可靠传输?

解答

  • 设置QoS等级
    • QoS 0:最多一次传递(不可靠)。
    • QoS 1:至少一次传递(消息可能重复)。
    • QoS 2:仅一次传递(最高可靠性,开销大)。
  • 启用持久会话:设置options.setCleanSession(false),断线重连后可恢复订阅状态。
  • 手动确认消息:在deliveryComplete回调中处理消息确认
0