博客
关于我
Mqtt搭建代理服务器进行通信-浅析
阅读量:792 次
发布时间:2023-02-09

本文共 8297 字,大约阅读时间需要 27 分钟。

本文基于Windows系统操作

MQTT简介:

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议!复制代码

MQTT消息的主要特点:

使用(publish/subscribe)消息模式,简称p/s模式,即发布/订阅!提供一对多的发送方式!复制代码
MQTT根据QoS定义的等级来传输消息:
  • level 0:最多一次的传输
消息是基于TCP/IP网络传输的。没有回应,在协议中也没有定义重传的语义。消息可能到达服务器1次,也可能根本不会到达。复制代码
  • level 1:至少一次的传输
服务器接收到消息会被确认,通过传输一个PUBACK信息。如果有一个可以辨认的传输失败,无论是通讯连接还是发送设备,还是过了一段时间确认信息没有收到,发送方都会将消息头的DUP位置1,然后再次发送消息。消息最少一次到达服务器。SUBSCRIBE和UNSUBSCRIBE都使用level 1 的QoS。如果客户端没有接收到PUBACK信息(无论是应用定义的超时,还是检测到失败然后通讯session重启),客户端都会再次发送PUBLISH信息,并且将DUP位置1。当它从客户端接收到重复的数据,服务器重新发送消息给订阅者,并且发送另一个PUBACK消息。复制代码
  • level 2: 只有一次的传输
在QoS level 1上附加的协议流保证了重复的消息不会传送到接收的应用。这是最高级别的传输,当重复的消息不被允许的情况下使用。这样增加了网络流量,但是它通常是可以接受的,因为消息内容很重要。QoS level 2在消息头有Message ID。复制代码

接下来开始我们的表演:

下载代理服务器
  • 本文使用mqtt代理服务器是apache下的apollo代理服务器

  • 下载地址:http://www.apache.org/dyn/closer.cgi?path=activemq/activemq-apollo/1.7.1/apache-apollo-1.7.1-unix-distro.tar.gz

创建代理服务器
  • 下载完成然后解压目录

  • 打开dos窗口进入到apache-apollo-1.7.1\bin目录下

  • 执行apollo create testbroker命令创建一个名称为testbroker的代理服务器

  • 下面就是我们创建的代理服务器

启动代理服务器
  • 使用dos进入testbroker目录中的bin目录下

  • 执行apollo-broker run命令启动代理服务器

通过HTTP访问代理服务器

现在我们可以打开浏览器看下我们的代理服务器 输入网址http://127.0.0.1:61680/

  • 用户名密码可到配置文件中查看

  • 进入testbroker目录下的etc目录

  • users.properties中配置的用户名和密码

  • 默认有个用户名为admin,密码为password的用户

  • 我们也可以自己配置用户

  • 现在就用默认用户登陆

OK登陆成功

接下来我们编写Android客户端

  • 首先准备mqtt jar包
  • 不容易呀,mqtt这个jar真难找
  • https://repo.eclipse.org/content/repositories/paho/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/

一定保证客户端和服务端以及代理服务器所在的电脑在同一网段下
  • 可以在电脑上生成wifi热点,手机客户端连接热点即可
  • 接下来直接贴Android客户端代码
MqttService.java
package com.example.jingwc.mqtt_demo;import android.app.Service;import android.content.Intent;import android.os.Binder;import android.os.IBinder;import android.util.Log;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.MqttTopic;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MqttService extends Service {    /**     * 代理服务器ip地址     */    public static final String MQTT_BROKER_HOST = "tcp://192.168.1.107:61613";    /**     * 客户端唯一标识     */    public static final String MQTT_CLIENT_ID = "android-jingwc";    /**     * 订阅标识     */    public static final String MQTT_TOPIC = "jingwc";    /**     * 用户名     */    public static final String USERNAME = "admin";    /**     *  密码     */    public static final String PASSWORD = "password";    private MqttClient mqttClient;    public MqttService() {    }    @Override    public IBinder onBind(Intent intent) {        return binder;    }    /**     * 连接mqtt     */    public void connect(){        try {            // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,            // MemoryPersistence设置clientid的保存形式,默认为以内存保存            mqttClient = new MqttClient(MQTT_BROKER_HOST,MQTT_CLIENT_ID,new MemoryPersistence());            // 配置参数信息            MqttConnectOptions options = new MqttConnectOptions();            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,            // 这里设置为true表示每次连接到服务器都以新的身份连接            options.setCleanSession(true);            // 设置用户名            options.setUserName(USERNAME);            // 设置密码            options.setPassword(PASSWORD.toCharArray());            // 设置超时时间 单位为秒            options.setConnectionTimeout(10);            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制            options.setKeepAliveInterval(20);            // 连接            mqttClient.connect(options);            // 订阅            mqttClient.subscribe(MQTT_TOPIC);            // 设置回调            mqttClient.setCallback(new MqttCallback() {                //连接丢失后,一般在这里面进行重连                @Override                public void connectionLost(Throwable throwable) {                    Log.d("test","connectionLost");                }                //subscribe后得到的消息会执行到这里面                @Override                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {                    Log.d("test","messageArrived"+mqttMessage.toString());                }                //publish后会执行到这里                @Override                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {                    Log.d("test","deliveryComplete");                }            });        } catch (MqttException e) {            e.printStackTrace();        }catch (Exception e) {            e.printStackTrace();        }    }    /**     * 断开连接     */    public void disconnect(){        if(mqttClient != null){            if(mqttClient.isConnected()){                try {                    mqttClient.disconnect();                    mqttClient = null;                } catch (MqttException e) {                    e.printStackTrace();                }            }        }    }    private final Binder binder = new MyBinder();    class MyBinder extends Binder{        public MqttService getService(){            return MqttService.this;        }    }}复制代码
MainActivity.java
package com.example.jingwc.mqtt_demo;import android.content.ComponentName;import android.content.Intent;import android.content.ServiceConnection;import android.os.IBinder;import android.support.v7.app.AppCompatActivity;import android.os.Bundle;import android.view.View;import android.widget.Button;import android.widget.EditText;public class MainActivity extends AppCompatActivity {    MqttService service = null;    private ServiceConnection mConnection = new ServiceConnection() {        @Override        public void onServiceConnected(ComponentName componentName, IBinder iBinder) {            service = ((MqttService.MyBinder)iBinder).getService();        }        @Override        public void onServiceDisconnected(ComponentName componentName) {            service = null;        }    };    @Override    protected void onCreate(Bundle savedInstanceState) {        super.onCreate(savedInstanceState);        setContentView(R.layout.activity_main);        Button bt_connect = (Button) findViewById(R.id.bt_connect);        Button bt_disconnect = (Button) findViewById(R.id.bt_disconnect);        bt_connect.setOnClickListener(new View.OnClickListener() {            @Override            public void onClick(View view) {                // 连接                service.connect();            }        });        bt_disconnect.setOnClickListener(new View.OnClickListener() {            @Override            public void onClick(View view) {                // 断开连接                service.disconnect();            }        });        bindService(new Intent(this,MqttService.class),mConnection,BIND_AUTO_CREATE);    }}复制代码

服务端代码

  • 也可以在写一个android程序当作服务端
  • 我这里写的是java项目
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.MqttTopic;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MqttServer {		// 代理服务器ip地址	private static String host = "tcp://192.168.1.107:61613";		private static String userName = "admin";	private static String password = "password";		private static MqttClient client;		// 主题	private static MqttTopic topic;		private static MqttMessage message;		// 订阅标识	private static String topicStr = "jingwc";		public static void main(String[] args) throws MqttException{		client = new MqttClient(host,"java-server-jingwc",new MemoryPersistence());		MqttConnectOptions options = new MqttConnectOptions();		options.setCleanSession(true);		options.setUserName(userName);		options.setPassword(password.toCharArray());		options.setConnectionTimeout(10);		options.setKeepAliveInterval(20);				topic = client.getTopic(topicStr);				message = new MqttMessage();		message.setQos(1);		message.setRetained(true);		message.setPayload("from server message".getBytes());		client.connect(options);		MqttDeliveryToken token = topic.publish(message);		token.waitForCompletion();		System.out.println("token:"+token.isComplete());			}}复制代码
服务端通过代理服务器发送客户端订阅的消息图 ( 个人理解 )

转载地址:http://sxffk.baihongyu.com/

你可能感兴趣的文章
mysql 写入慢优化
查看>>
mysql 分组统计SQL语句
查看>>
Mysql 分页
查看>>
Mysql 分页语句 Limit原理
查看>>
MySql 创建函数 Error Code : 1418
查看>>
MySQL 创建新用户及授予权限的完整流程
查看>>
mysql 创建表,不能包含关键字values 以及 表id自增问题
查看>>
mysql 删除日志文件详解
查看>>
mysql 判断表字段是否存在,然后修改
查看>>
MySQL 到底能不能放到 Docker 里跑?
查看>>
mysql 前缀索引 命令_11 | Mysql怎么给字符串字段加索引?
查看>>
MySQL 加锁处理分析
查看>>
mysql 协议的退出命令包及解析
查看>>
mysql 参数 innodb_flush_log_at_trx_commit
查看>>
mysql 取表中分组之后最新一条数据 分组最新数据 分组取最新数据 分组数据 获取每个分类的最新数据
查看>>
MySql 同一个列中的内容进行批量改动
查看>>
MySQL 命令和内置函数
查看>>
MySQL 和 PostgreSQL,我到底选择哪个?
查看>>
mysql 四种存储引擎
查看>>
mysql 在windons下的备份命令
查看>>