emq使用3.0以上版本(emq 2.X版本称为emqttd,3.X版本称为emqx),并且自己编译源码(方便做扩展性修改)。git地址:https://github.com/emqx/emqx-rel.git。
emq的编译依赖于Erlang
环境(emqx依赖于Erlang R21.2+版本,emqttd依赖于Erlang R20+版本)。Erlang 安装: http://www.erlang.org/。
源码下载完成后,进入emqx-rel主文件夹,然后直接make编译。编译完成后的文件结构如下:
由于emq文件结构过于复杂,因此只介绍需要用到的部分。可执行的命令以及配置文件位于_rel/emqx中,插件位于deps中。
进入emqx-rel/_rel/emqx/bin
文件夹下,用emqx start
命令即可运行emq,用emqx stop
课停止emq, emqx console
进入控制台运行模式,在此模式下可看见诸多类似设备连接、服务器报错等信息。
emq的配置文件是emqx-rel/_rel/emqx/etc/emqx.conf
,下面介绍配置文件中比较重要的部分。
node.name=emqx@192.168.4.31
服务器的节点名,需要emqx@host
的格式,注意host中最好使用本机真实ip,尤其是在集群配置中,否则会出现预期之外的错误。并且,emq启动之后不要修改此配置,确保emq开启和关闭的时候此配置必须相同,否则会出现无法关闭emqx的错误(就算一开始写的127.0.0.1
,然后启动之后发现错了,也要先关闭emqx再修改配置!)。
listener.tcp.external = 0.0.0.0:1885
: emq对外监听的tcp端口,默认1883。listener.ssl.external = 8883
: emq监听的ssl端口,默认8883。listener.ssl.external.keyfile = etc/certs/key.pem
: 服务器的私钥(关于ssl认证的具体介绍见:SSL认证原理、证书生成及校验)。listener.ssl.external.certfile = etc/certs/cert.pem
: 服务器的公钥。listener.ssl.external.cacertfile = etc/certs/cacert.pem
: ca证书。listener.ssl.external.verify = verify_peer
: 开启双向认证。listener.ssl.external.fail_if_no_peer_cert = true
: 如果客户端证书错误,则禁止连接。如果开启单向认证,只需要配置服务器公钥和私钥,如果开启双向认证,则需要配置ca、开启双向认证、并禁止客户端无证书连接。
另外,在emqx-rel/_rel/emqx/etc/acl.conf
中,有一处配置需要注意。
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
%%{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
{allow, all}.
这里对emq客户端的权限作了一些配置,对某些topic禁止客户端订阅。我们需要将第三行的配置注释掉,否则客户端无法订阅$SYS/#
的topic(具体作用见下文)。
emq提供了多种去中心化的集群部署方案:
在emqx-rel/_rel/emqx/etc/emqx.conf
中可以配置集群的创建方式,在此介绍一下手动创建集群的方法。
cluster.discovery = manual
: 开启手动创建集群模式。
启动两台emqx
节点(物理节点,非逻辑节点),假设两个节点分别为emqx@node1
和emqx@node2
,在任意一台节点(以node1
为例)上,执行命令:emqx-rel/_rel/emqx/bin/emqx_ctl clsuter join emqx@node2
节点退出集群:mqx-rel/_rel/emqx/bin/emqx_ctl cluster leave
emq
提供了方便用户管理监控的控制台、api及系统消息。控制台可直接访问http://{host}:18083
,默认用户名admin
,密码public
。
此处是插件功能和emq提供的HTTP API列表,可直接在控制台中进行配置和插件的开关,emq支持插件的热加载。
除此之外,emq提供了一些用于监控broker状态的系统主题$SYS。其中,我们的上下线通知中用到了下列两个topic:
但是此处的上线通知流程存在问题,emq broker会在客户端尝试连接(鉴权)时进行上线通知,而不是连接成功之后通知。还有一种解决方案是用插件进行通知,但这样会使用http同步调用,代价较大,因此暂时还是用系统消息进行异步通知。
插件位于emqx-rel/deps中,其中有若干已经由官方写好的插件,可以直接修改配置文件或直接在控制台中进行配置。
如果需要自己写插件,emq也提供了插件模板,在emqx-rel/deps/emqx_plugin_template
中。打开emqx-rel/deps/emqx_plugin_template/src/emqx_plugin_template.erl
,其中可以看到emq提供了一些钩子:
emqx:hook('client.authenticate', fun ?MODULE:on_client_authenticate/2, [Env]),
emqx:hook('client.check_acl', fun ?MODULE:on_client_check_acl/5, [Env]),
emqx:hook('client.connected', fun ?MODULE:on_client_connected/4, [Env]),
emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
emqx:hook('client.subscribe', fun ?MODULE:on_client_subscribe/3, [Env]),
emqx:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/3, [Env]),
emqx:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
emqx:hook('session.resumed', fun ?MODULE:on_session_resumed/3, [Env]),
emqx:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqx:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
emqx:hook('session.terminated', fun ?MODULE:on_session_terminated/3, [Env]),
emqx:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqx:hook('message.deliver', fun ?MODULE:on_message_deliver/3, [Env]),
emqx:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]),
emqx:hook('message.dropped', fun ?MODULE:on_message_dropped/3, [Env]).
当达成这些条件事,会触发钩子函数,用户可以自己定义钩子函数,例如**目前鉴权所用到的:
on_client_authenticate(Credentials = #{client_id := ClientId, password := Password, username := Username}, _Env) ->
io:format("Client(~s) authenticate, Password:~p ~n, Username:~p ~n", [ClientId, Password, Username]),
inets:start(),
{ok, {{Version, 200, ReasonPhrase}, Headers, Body}} = httpc:request(string:join(["http://www.example.com/api/mqtt/authenticate/login?username=", binary_to_list(base64:encode(binary_to_list(Username))), "&password=", binary_to_list(Password), "&clientId=", binary_to_list(base64:encode(binary_to_list(ClientId)))], "")),
if
Body == "0" ->
{stop, Credentials#{auth_result => success}};
true ->
{stop, Credentials#{auth_result => fail}}
end.
插件编写完成后,需要到主目录emqx-rel中重新make。注意,make
之后配置文件会重新生成,因此make之前最好备份一下配置文件。然后启动emq,到控制台中开启插件即可。
emq提供了一些api,可以获取服务器的一些信息。
首先在系统中新增用户
然后通过http://host:8080/{api}
访问,访问需要http basic authentication
,用刚刚创建的用户可以访问。
emqx start
指令后,卡住不动,没有出现start successfully
的字样,可能是端口被其他进程占用。emq会占用的端口如下:这些端口都可以在配置文件中自己修改,但是只要有一个冲突就无法正常启动。
emqx stop
时,显示emqx@xxx no response
字样,检查配置文件中的节点名是否正确。beam.smp
,守护进程为epmd
,但是Erlang的进程结构比较奇怪,所以直接kill
掉上述进程之后,emq可能还在运行中。所以最好执行emqx stop
指令来停止emq。在官方文档中有更为全面的说明和指南,本文只列出了部分实际用到的比较重要的功能。如果后续有扩展或本文中不详细之处,可参加官方文档:https://developer.emqx.io/docs/broker/v3/cn/getstarted.html
另附:插件编写指南https://www.cnblogs.com/wunaozai/p/8067621.html
官方文档对于部分内容也并不十分详细,本文对实际中遇到的问题和注意事项作了部分补充。
“The first 90% of the code accounts for the first 90% of the development time. The remaining 10% of the code accounts for the other 90% of the development time.” – Tom Cargill
标 题:emq的部署、使用及维护