Skip to content

天楚锐齿

人工智能 云计算 大数据 物联网 IT 通信 嵌入式

天楚锐齿

  • 下载
  • 物联网
  • 云计算
  • 大数据
  • 人工智能
  • Linux&Android
  • 网络
  • 通信
  • 嵌入式
  • 杂七杂八

EMQ(emqttd) 2.x 安装和使用(物联网传输控制协议的Broker)

2018-03-14
支持下国产开源。
MQTT物联网传输控制协议:《MQTT-3.1.1-CN.pdf》
下载:emqttd-centos64-v2.0-rc.2-20161019.zip
安装:
$ unzip emqttd-centos64-v2.0-rc.2-20161019.zip -d /data/
$ mv /data/emqttd /data/emqttd-centos64-v2.0-rc.2-20161019
$ ln -s /data/emqttd-centos64-v2.0-rc.2-20161019 /data/emqttd
系统优化配置:
# ulimit -n 1048576
$ sudo sysctl -w fs.file-max=2097152
$ sudo sysctl -w fs.nr_open=2097152
$ sudo sysctl -w net.core.somaxconn=65535
$ sudo sysctl -p
修改配置文件(node.cookie必须每台都要一样,node.name的@后面必须是ip地址或者fqdn方式的主机名):
$ cd /data/emqttd
$ vi etc/emq.conf
## Node name
node.name = emqttd@192.168.60.58
## Cookie for distributed node
node.cookie = emq_dist_cookie_533d99ckd9ji475
## Erlang Process Limit
node.process_limit = 2000000
## Sets the maximum number of simultaneously existing ports for this system
node.max_ports = 1000000
## Size of acceptor pool
mqtt.listener.tcp.acceptors = 64
## Maximum number of concurrent clients
mqtt.listener.tcp.max_clients = 1000000
## Rate Limit. Format is ‘burst,rate’, Unit is KB/Sec
## mqtt.listener.tcp.rate_limit = 100,10
## TCP Socket Options
mqtt.listener.tcp.backlog = 262144
## Distributed node port range
node.dist_listen_min = 6000
node.dist_listen_max = 6999
## 如果需要启用防火墙,则上面两行去掉注释,注意下面的防火墙端口设置,要打开该段端口。
## Expired after 1 day:
## w – week
## d – day
## h – hour
## m – minute
## s – second
mqtt.session.expired_after = 2w
# 上面为持久会话到期时间,从客户端断开算起,超时后客户端没有收到的消息会丢弃(不想丢失消息的话,该值就要设置的很大)。
## Console log. Enum: off, file, console, both
log.console = both
## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency
log.console.level = info
## Console log file
log.console.file = log/console.log
## Error log file
log.error.file = log/error.log
## Enable the crash log. Enum: on, off
log.crash = on
log.crash.file = log/crash.log
修改下启动脚本(在stop处增加一行):
$ vi bin/emqttd
…
    stop)
        # Wait for the node to completely stop…
        PID=”$(relx_get_pid)”
        if ! relx_nodetool “stop”; then
            exit 1
        fi
        while $(kill -s 0 “$PID” 2>/dev/null);
        do
            sleep 1
        done
        killall epmd
        ;;
启动:
$ cd emqttd
直接进入控制台模式:
$ ./bin/emqttd console
后台运行模式:
$ ./bin/emqttd start
$ ./bin/emqttd_ctl status
$ ./bin/emqttd stop
开启防火墙:
端口:1883:MQTT协议tcp端口,8883:MQTT(SSL) tcp端口,8083:MTQQ(websocket)、HTTP API端口,18083:dashboard管理控制WEB端口,4369:集群处理epmd端口,6000-6999由上面配置文件定义的epmd需要的端口范围。
sudo firewall-cmd                      –zone=public –add-port=1883/tcp
sudo firewall-cmd –permanent –zone=public –add-port=1883/tcp
sudo firewall-cmd                      –zone=public –add-port=8883/tcp
sudo firewall-cmd –permanent –zone=public –add-port=8883/tcp
sudo firewall-cmd                      –zone=public –add-port=8083/tcp
sudo firewall-cmd –permanent –zone=public –add-port=8083/tcp
sudo firewall-cmd                      –zone=public –add-port=18083/tcp
sudo firewall-cmd –permanent –zone=public –add-port=18083/tcp
sudo firewall-cmd                      –zone=public –add-port=4369/tcp
sudo firewall-cmd –permanent –zone=public –add-port=4369/tcp
sudo firewall-cmd                      –zone=public –add-port=6000-6999/tcp
sudo firewall-cmd –permanent –zone=public –add-port=6000-6999/tcp
sudo firewall-cmd                      –zone=public –list-all
sudo firewall-cmd –permanent –zone=public –list-all
查看各台的启动状态:
http://192.168.60.55:8083/status
http://192.168.60.55:18083/      用户名/密码: admin / public
$ vi data/configs/vm.xxx.args
$ vi data/configs/app.xxx.conf
LOG位置:
$ vi log/
把节点加入集群:
在各个节点上执行(重复执行也没关系,其中192.168.60.55这台会提示错误:cannot_join_with_self,这个没关系,自己不用加入自己):
$ ./bin/emqttd_ctl cluster join emqttd@192.168.60.55
$ ./bin/emqttd_ctl cluster status
Cluster status: [{running_nodes,[’emqttd@192.168.60.58′,
                                 ’emqttd@192.168.60.56′,
                                 ’emqttd@192.168.60.57′,
                                 ’emqttd@192.168.60.55′]}]
把节点退出集群:
本机退出集群:
$ ./bin/emqttd_ctl cluster leave
把某节点退出集群:
$ ./bin/emqttd_ctl cluster remove emqttd@192.168.60.56
测试一下:
下载安装MTQQ协议的客户端:https://mosquitto.org/download/
$ sudo rpm -ivh libmosquitto1-1.4.10-1.1.x86_64.rpm
$ sudo rpm -ivh mosquitto-clients-1.4.10-1.1.x86_64.rpm
订阅:
$ mosquitto_sub -h 192.168.60.56 -p 1883 -t test_topic_1 -q 1 -c -i 1111
发布:另开一个ssh或者另外机器上执行:
$ mosquitto_pub -h 192.168.60.57 -p 1883 -t test_topic_1 -q 1 -m “hello mqtt 7”
参数:
-h: 连接到哪台broker。
-p: 连接端口。
-t: topic名字,topic类似文件系统的组织方式,以”/”分隔符来分层,订阅者订阅时可以使用通配符”+”和”#”,发布者不能使用通配符。
-q: qos级别,默认为0,共三个值:0:至多一次,不保证到达订阅者;1:至少一次,保证到达订阅者,但不保证不重复;2:正好一次,保证到达,又保证不重复;非钱类的应用使用qos1就可以了。
-c: 如果订阅者退出,在broker保留所有订阅的消息,一旦重新连接上,则把所有消息发给订阅者,就是持久性订阅(clean_sess=false)。
-i: 设置client id,在即时通信时可以设置为用户id或者用户名等具有唯一性的字段。
-r: 发布方使用的参数,保留消息(每个topic只保留最后一个这种消息,之前的会覆盖 ),即使该消息被一个订阅者读取了,还会一直保留在broker,如果有新的订阅者订阅该topic,则马上会收到该消息,类似qq里面的公告性消息,这类信息的删除,发送一个payload为空的null消息即可:$ mosquitto_pub -h 192.168.60.56 -p 1883 -t test_topic_1 -q 1 -r -n  -u user -P 123456。
-m: 该topic的一条消息内容。
加入MariaDB的认证:
修改配置:
$ vi ./etc/emq.conf
## Allow Anonymous authentication
mqtt.allow_anonymous = false
## Default ACL File
mqtt.acl_file = etc/acl.conf
$ vi ./data/loaded_plugins
emq_dashboard.
emq_auth_mysql.
修改plugins配置:
$ vi etc/plugins/emq_auth_mysql.conf
## Mysql Server
auth.mysql.server = 192.168.60.60:3306
## Mysql Pool Size
auth.mysql.pool = 8
## Mysql Username
auth.mysql.username = root
## Mysql Password
auth.mysql.password = 123456
## Mysql Database
auth.mysql.database = mqtt
## Variables: %u = username, %c = clientid
## Authentication Query: select password only
auth.mysql.auth_query = select password from mqtt_user where username = ‘%u’ limit 1
## Password hash: plain, md5, sha, sha256, pbkdf2
auth.mysql.password_hash = plain
## %% Superuser Query
auth.mysql.super_query = select is_superuser from mqtt_user where username = ‘%u’ limit 1
## ACL Query Command
auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = ‘%a’ or username = ‘%u’ or username = ‘$all’ or clientid = ‘%c’
## ACL nomatch
auth.mysql.acl_nomatch = deny
往MariaDB插入初始化数据库和表(引擎需要改成InnDB):
认证的用户表也可以共用其他系统的用户表,由上面的emq_auth_mysql.conf来配置:auth.mysql.auth_query、auth.mysql.super_query、auth.mysql.acl_query:
建立mqtt数据库:
$ mysql -uroot -p123456 -hxxxx
MariaDB [test]> create database mqtt;
用户表:
MariaDB [test]> CREATE TABLE `mqtt_user` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `username` varchar(100) DEFAULT NULL,
  `password` varchar(100) DEFAULT NULL,
  `salt` varchar(20) DEFAULT NULL,
  `is_superuser` tinyint(1) DEFAULT 0,
  `created` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_username` (`username`)
) DEFAULT CHARSET=utf8;
用户表插入测试数据:
MariaDB [mqtt]> INSERT INTO mqtt_user (id, username, password, salt, is_superuser, created)
VALUES
    (1,’superuser’,’123456′,’123456′,True,’2016-10-26 10:00:00′),
    (2,’user’,’123456′,’123456′,False,’2016-10-26 10:01:00′);
acl表:
MariaDB [mqtt]> CREATE TABLE `mqtt_acl` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `allow` int(1) DEFAULT NULL COMMENT ‘0: deny, 1: allow’,
  `ipaddr` varchar(60) DEFAULT NULL COMMENT ‘IpAddress’,
  `username` varchar(100) DEFAULT NULL COMMENT ‘Username’,
  `clientid` varchar(100) DEFAULT NULL COMMENT ‘ClientId’,
  `access` int(2) NOT NULL COMMENT ‘1: subscribe, 2: publish, 3: pubsub’,
  `topic` varchar(100) NOT NULL DEFAULT ” COMMENT ‘Topic Filter’,
  PRIMARY KEY (`id`)
) DEFAULT CHARSET=utf8;
acl表插入默认数据(注意带$SYS为broker保留的特殊topic用来统计使用,username为$all表示所有在user表中的用户):
MariaDB [mqtt]> INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)
VALUES
    (1,1,NULL,’$all’,NULL,2,’#’),
    (2,1,NULL,’$all’,NULL,1,’#’),
    (3,1,NULL,’$all’,NULL,3,’#’),
    (4,1,’127.0.0.1′,NULL,NULL,2,’$SYS/#’),
    (5,1,’127.0.0.1′,NULL,NULL,2,’#’),
    (6,1,NULL,’dashboard’,NULL,1,’$SYS/#’);
测试,重启每台服务器后执行:
$ mosquitto_sub -h 192.168.60.55 -p 1883 -t test_topic_1 -q 1 -c -i 1112  -u user -P 123456
$ mosquitto_pub -h 192.168.60.55 -p 1883 -t test_topic_1 -q 1 -m “hello mqtt 1” -u user -P 123456
重新初始化emq:
$ bin/emqttd stop
$ rm -rf data/mnesia/*
$ rm -rf data/configs/*
$ rm -rf log/*
$ bin/emqttd start
$ bin/emqttd_ctl cluster join emqttd@192.168.60.55
使用Haproxy来实现负载分担:
因为emq的topic和消息在集群的各台服务器上一致,所以数据不能以增加机器的方式扩容,只能增加每台的内存,和客户端的连接则可以以增加机器方式扩容。
$ vi /etc/haproxy/haproxy.cfg
################## EMQ ######################
listen emq_emqttd *:1883
    mode tcp
    balance source
    log global
    timeout connect         25s
    timeout client          0
    timeout server          0
    option tcpka
    option tcplog
    option tcp-check
    server emq1 192.168.60.55:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25
    server emq2 192.168.60.56:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25
    server emq3 192.168.60.57:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25
    server emq4 192.168.60.58:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25
开启haproxy上的防火墙:
sudo firewall-cmd                      –zone=public –add-port=1883/tcp
sudo firewall-cmd –permanent –zone=public –add-port=1883/tcp
sudo firewall-cmd                      –zone=public –list-all
sudo firewall-cmd –permanent –zone=public –list-all
使用:
broker操作:
显示broker状态:
$ ./bin/emqttd_ctl status
显示broker版本、启用时间等:
$ ./bin/emqttd_ctl broker
显示broker的pubsub进程状态、内存、队列长度、规约数等:
$ ./bin/emqttd_ctl broker  pubsub
显示broker的统计信息:客户端连接数、会话数、主题数、订阅数、路由数等:
$ ./bin/emqttd_ctl broker stats
显示broker的测量信息:底层流量、MQTT报文数、消息数等:
$ ./bin/emqttd_ctl broker metrics
Cluster操作:
$ ./bin/emqttd_ctl cluster
cluster join <Node>                             # Join the cluster
cluster leave                                   # Leave the cluster
cluster remove <Node>                           # Remove the node from cluster
cluster status                                  # Cluster status
Client操作:
$ ./bin/emqttd_ctl clients
clients list                                    # List all clients
clients show <ClientId>                         # Show a client
clients kick <ClientId>                         # Kick out a client
Sessions操作:
$ ./bin/emqttd_ctl sessions
sessions list                                   # List all sessions
sessions list persistent                        # List all persistent sessions ,桥接Bridge的时候才会用到(即clean_sess=false的类型)
sessions list transient                         # List all transient sessions
sessions show <ClientId>                        # Show a session
Routes操作:
$ ./bin/emqttd_ctl routes
routes list                                     # List all routes
routes show <Topic>                             # Show a route
Topics操作:
$ ./bin/emqttd_ctl topics
topics list                                     # List all topics
topics show <Topic>                             # Show a topic
Subscription订阅者操作:
$ ./bin/emqttd_ctl subscriptions
subscriptions list                              # List all subscriptions
subscriptions show <ClientId>                   # Show subscriptions of a client
subscriptions add <ClientId> <Topic> <QoS>      # Add a static subscription manually
subscriptions del <ClientId>                    # Delete static subscriptions manually
subscriptions del <ClientId> <Topic>            # Delete a static subscription manually
Plugins插件操作:
$ ./bin/emqttd_ctl plugins
plugins list                                    # Show loaded plugins
plugins load <Plugin>                           # Load plugin
plugins unload <Plugin>                         # Unload plugin
$ ./bin/emqttd_ctl plugins list
Plugin(emq_auth_clientid, version=2.0, description=Authentication with ClientId/Password, active=false)
Plugin(emq_auth_http, version=2.0, description=Authentication/ACL with HTTP API, active=false)
Plugin(emq_auth_ldap, version=2.0, description=Authentication/ACL with LDAP, active=false)
Plugin(emq_auth_mongo, version=2.0, description=Authentication/ACL with MongoDB, active=false)
Plugin(emq_auth_mysql, version=2.0, description=Authentication/ACL with MySQL, active=false)
Plugin(emq_auth_pgsql, version=2.0, description=Authentication/ACL with PostgreSQL, active=false)
Plugin(emq_auth_redis, version=2.0, description=Authentication/ACL with Redis, active=false)
Plugin(emq_auth_username, version=2.0, description=Authentication with Username/Password, active=false)
Plugin(emq_coap, version=0.2, description=CoAP Gateway, active=false)
Plugin(emq_dashboard, version=2.0, description=Dashboard, active=true)
Plugin(emq_mod_rewrite, version=2.0, description=Rewrite Module, active=false)
Plugin(emq_plugin_template, version=2.0, description=EMQ Plugin Template, active=false)
Plugin(emq_recon, version=2.0, description=Recon Plugin, active=false)
Plugin(emq_reloader, version=2.0, description=Reloader Plugin, active=false)
Plugin(emq_sn, version=0.2, description=MQTT-SN Gateway, active=false)
Plugin(emq_stomp, version=2.0, description=Stomp Protocol Plugin, active=false)
Bridges桥接操作:
$ ./bin/emqttd_ctl bridges
bridges list                                    # List bridges
bridges options                                 # Bridge options
bridges start <Node> <Topic>                    # Start a bridge
bridges start <Node> <Topic> <Options>          # Start a bridge with options
bridges stop <Node> <Topic>                     # Stop a bridge
vm虚机(erlang虚机)性能查看:
$ ./bin/emqttd_ctl vm all
cpu/load1               : 0.05
cpu/load5               : 0.05
cpu/load15              : 0.07
memory/total            : 166404232
memory/processes        : 32564328
memory/processes_used   : 32563952
memory/system           : 133839904
memory/atom             : 959633
memory/atom_used        : 954350
memory/binary           : 46088
memory/code             : 28250535
memory/ets              : 5741416
process/limit           : 2097152
process/count           : 288
io/max_fds              : 1000000
io/active_fds           : 1
ports/count           : 25
ports/limit           : 1048576
端口使用情况查看:
$ ./bin/emqttd_ctl listeners
mnesia数据库信息查看:
$ bin/emqttd_ctl mnesia
管理dashboard用户:
$ ./bin/emqttd_ctl admins
admins add <Username> <Password> <Tags>         # Add dashboard user
admins passwd <Username> <Password>             # Reset dashboard user password
admins del <Username>                           # Delete dashboard user
追踪 ,EMQ消息服务器支持追踪来自某个客户端(Client)的全部报文,或者发布到某个主题(Topic)的全部消息。
追踪客户端(Client):
./bin/emqttd_ctl trace client “clientid(mosqsub/23058-vm6)” “trace_clientid.log”
追踪主题(Topic):
./bin/emqttd_ctl trace topic “topic1” “trace_topic.log”
查询追踪:
./bin/emqttd_ctl trace list
停止追踪:
./bin/emqttd_ctl trace client “clientid(mosqsub/23058-vm6)” off
./bin/emqttd_ctl trace topic “topic1” off
钩子(Hook)扩展,EMQ消息服务器在客户端上下线、主题订阅、消息收发位置设计了扩展钩子(Hook):
钩子                                说明
client.connected             客户端上线
client.subscribe               客户端订阅主题前
client.unsubscribe           客户端取消订阅主题
session.subscribed          客户端订阅主题后
session.unsubscribed      客户端取消订阅主题后
message.publish             MQTT消息发布
message.delivered          MQTT消息送达
message.acked               MQTT消息回执
client.disconnected        客户端连接断开
钩子使用例子:
-module(emqttd_plugin_template).
-export([load/1, unload/0]).
-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
load(Env) ->
    emqttd:hook(‘message.publish’, fun ?MODULE:on_message_publish/2, [Env]),
    emqttd:hook(‘message.delivered’, fun ?MODULE:on_message_delivered/4, [Env]),
    emqttd:hook(‘message.acked’, fun ?MODULE:on_message_acked/4, [Env]).
on_message_publish(Message, _Env) ->
    io:format(“publish ~s~n”, [emqttd_message:format(Message)]),
    {ok, Message}.
on_message_delivered(ClientId, _Username, Message, _Env) ->
    io:format(“delivered to client ~s: ~s~n”, [ClientId, emqttd_message:format(Message)]),
    {ok, Message}.
on_message_acked(ClientId, _Username, Message, _Env) ->
    io:format(“client ~s acked: ~s~n”, [ClientId, emqttd_message:format(Message)]),
    {ok, Message}.
unload() ->
    emqttd:unhook(‘message.publish’, fun ?MODULE:on_message_publish/2),
    emqttd:unhook(‘message.acked’, fun ?MODULE:on_message_acked/4),
    emqttd:unhook(‘message.delivered’, fun ?MODULE:on_message_delivered/4).
MQTT协议的客户端库:
https://github.com/mqtt/mqtt.github.io/wiki/libraries
http://www.eclipse.org/paho/downloads.php
做IM即时通信协议时的考虑:
MQTT作为发布、订阅系统,作为消息推送是很好的。
如果作为IM即时通信,则可以考虑把每个用户id做成一个topic,每个用户订阅名为自己id的topic(比如”USER/1111″),发送方如果需要发布消息给某一个用户,则发布该用户id的topic消息,对方自然就会收到。另外每个用户还要订阅几个系统类的topic(比如”ADMIN/broadcast”,”ADMIN/1111″),以便后台系统发布各类消息。至于群组消息,每加入一个群,则增加订阅一个名为群id的topic(比如”GROUP/1111″)。
1,540次阅读

Post navigation

前一篇:

需要写一个基于SCTP的类netty、mina组件

后一篇:

使用VMWare虚机Centos7.x实现网关、DHCP、修改HTTP User-Agent字段(绕过深信服、绿盟等对该字段的检测)

发表回复 取消回复

要发表评论,您必须先登录。

个人介绍

需要么,有事情这里找联系方式:关于天楚锐齿

=== 美女同欣赏,好酒共品尝 ===

微信扫描二维码赞赏该文章:

扫描二维码分享该文章:

分类

  • Linux&Android (81)
  • Uncategorized (1)
  • 下载 (28)
  • 云计算 (38)
  • 人工智能 (9)
  • 大数据 (35)
  • 嵌入式 (34)
  • 杂七杂八 (35)
  • 物联网 (65)
  • 网络 (25)
  • 通信 (22)

归档

近期文章

  • 飞书机器人发送卡片interactive消息
  • Springboot JPA实现对数据库表统一的增删改查
  • WEB的内容安全策略CSP(Content-Security-Policy)
  • CSS利用@media和viewport实现响应式布局自动适配手机电脑等
  • VUE前端增加国际化支持

近期评论

  • linux爱好者 发表在《Linux策略路由及iptables mangle、ip rule、ip route关系及一种Network is unreachable错误》
  • maxshu 发表在《使用Android的HIDL+AIDL方式编写从HAL层到APP层的程序》
  • Ambition 发表在《使用Android的HIDL+AIDL方式编写从HAL层到APP层的程序》
  • Ambition 发表在《使用Android的HIDL+AIDL方式编写从HAL层到APP层的程序》
  • maxshu 发表在《Android9下用ethernet 的Tether模式来做路由器功能》

阅读量

  • 使用Android的HIDL+AIDL方式编写从HAL层到APP层的程序 - 23,810次阅读
  • 卸载深信服Ingress、SecurityDesktop客户端 - 18,519次阅读
  • 车机技术之车规级Linux-Automotive Grade Linux(AGL) - 10,571次阅读
  • linux下的unbound DNS服务器设置详解 - 9,323次阅读
  • 在Android9下用ndk编译vSomeIP和CommonAPI以及使用例子 - 9,136次阅读
  • linux的tee命令导致ssh客户端下的shell卡住不动 - 8,639次阅读
  • Linux策略路由及iptables mangle、ip rule、ip route关系及一种Network is unreachable错误 - 8,126次阅读
  • 车机技术之360°全景影像(环视)系统 - 8,088次阅读
  • 车机技术之Android Automotive - 7,941次阅读
  • Windows下安装QEMU并在qemu上安装ubuntu和debian - 7,840次阅读

其他操作

  • 注册
  • 登录
  • 条目 feed
  • 评论 feed
  • WordPress.org

联系方式

地址
深圳市科技园

时间
周一至周五:  9:00~12:00,14:00~18:00
周六和周日:10:00~12:00

标签

android AT命令 CAN centos docker Hadoop hdfs ip java kickstart linux mapreduce mini6410 modem nova OAuth openstack os python socket ssh uboot 内核 协议 安装 嵌入式 性能 报表 授权 操作系统 数据 数据库 月报 模型 汽车 深信服 源代码 统计 编译 脚本 虚拟机 调制解调器 车机 金融 鉴权
© 2025 天楚锐齿