搭建一个基于mqtt的协议处理框架(二.总体架构设计和emqx服务器的搭建)

mac2025-01-25  33

架构及说明

    基于业务的需求,整个流程设计如下: 设备发送的数据找到broker,broker通过桥接插件直接转给Kafka一份,kafka再将数据发给java编写的数据处理中心,在这里将解析协议报文,然后将解析后的报文通过rocketmq发送给数据存储中心,数据存储中心再将数据分别插入不同的表,然后将数据封装成能被用户看懂的数据发送给数据推送中心,数据推送中心通过友盟和Goeasy将数据推送给用户. 用户发送的下行数据会首先来到数据存储中心,再这里将数据补齐后发送到数据处理中心,数据处理中心将数据转化为协议报文,直接通过mqttclient将数据发送给mqtt broker上对应设备id的主题,订阅此主题的设备就会拿到报文

emqx安装

因为我们需要用到emqx的kafka桥接插件,这个插件在安装版中是没有的,所以需要使用源码安装emqx 首先需要安装emqx的运行环境,目前kafka的桥接插件emqx_kafka_bridge只能运行在emqx3.0版本上,所以搭建的环境都是基于emqx3.0版本的 首先是下载和安装erlang21.3

yum install gcc* glibc-devel makencurses-devel openssl-devel autoconf -y yum install unixODBC unixODBC-devel-y yum -y install gcc-c++ kernel-develm4 python-simplejson

下载并编译erlang

wget http://erlang.org/download/otp_src_21.3.tar.gz tar -zxvf otp_src_21.3.tar.gz cd otp_src_21.3 ./otp_build autoconf ./configure --enable-smp-support–enable-threads --enable-sctp --enable-kernel-poll --enable-hipe --with-ssl make && make install

完成后使用erl命令验证 如果返回

Erlang/OTP 21 [erts-10.3] [source][64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe] Eshell V10.3 (abort with ^G)

则证明编译成功,使用halt().命令退出erlang命令行 安装Rebar(Erlang开发构建工具)

git clonegit://github.com/rebar/rebar.git cd rebar ./bootstrap

安装成功会显示 Congratulations! You now have aself-contained script called “rebar” in your current working directory. Placethis script anywhere in your path and you can use rebar to buildOTP-compliant apps.

配置环境变量

vim /etc/profile export PATH=/emq/rebar/:$PATH (rebar安装目录) 执行文件: source /etc/profile

测试是否安装成功 rebar -V

接下来就可以安装emqx了 首先我们选择emqx3.0.1安装

wget -c https://github.com/emqx/emqx-rel/archive/v3.0.1.tar.gz tar -zxvf v3.0.1.tar.gz cd /

由于编译时间超长,所以我们可以提前先把kafka插件安装上 修改 emqx-rel目录下的Makefile:

DEPS += ( f o r e a c h d e p , (foreachdep, (foreachdep,(MAIN_APPS), ( c a l l a p p n a m e , (call app_name, (callappname,(dep))) DEPS += emqx_kafka_bridge 【增加这行】 ( f o r e a c h d e p , (foreach dep, (foreachdep,(MAIN_APPS),KaTeX parse error: Expected group after '_' at position 9: (evaldep_̲(call app_name,$(dep)) = ( C L O N E M E T H O D ) h t t p s : / / g i t h u b . c o m / e m q x / (CLONE_METHOD) https://github.com/emqx/ (CLONEMETHOD)https://github.com/emqx/(dep) ( c a l l a p p v s n , (call app_vsn, (callappvsn,(dep)))) dep_emqx_kafka_bridge = git https://github.com/bob403/emqx_kafka_bridge.git master 【增加这行】

在relx.config里增加:

{emqx_kafka_bridge, load}

然后就可以怀着虔诚的心,在emqx的解压目录输入make,并且祈祷不要编译失败

编译成功后,需要更改kafka插件的配置

/emq/emqx-rel/_rel/emqx/etc/plugins/emqx_kafka_bridge.conf

查看插件列表有没有kafka插件 ./emqx_ctl plugins list 然后启动插件 ./emqx_ctl plugins load emqx_kafka_bridge

最后如果要使桥接成功,还需要提前在kafka上面创建一个名为Processing的主题

接下来,你的设备发送的mqtt消息就可以成功桥接到kafka上去了

kafka的配置详细见下文

最新回复(0)