首先根据demo入手:
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name"); producer.setNamesrvAddr(Const.NAMESRV_ADDR); producer.start(); for(int i = 0;i<5;i++){ //创建消息 Message message = new Message("test_quick_topic","TagA"+i,"keyA"+i,("Hello RocketMQ"+i).getBytes()); //发送消息 // SendResult sr = producer.send(message); producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %n", sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %n", e); e.printStackTrace(); } }); Thread.sleep(1000000l); System.out.println("消息发出:" ); } producer.shutdown(); } }DefaultMQProducer定义如下:
public class DefaultMQProducer extends ClientConfig implements MQProducer { protected final transient DefaultMQProducerImpl defaultMQProducerImpl; private String producerGroup; private String createTopicKey; private volatile int defaultTopicQueueNums; private int sendMsgTimeout; private int compressMsgBodyOverHowmuch; private int retryTimesWhenSendFailed; private int retryTimesWhenSendAsyncFailed; private boolean retryAnotherBrokerWhenNotStoreOK; private int maxMessageSize; public DefaultMQProducer() { this("DEFAULT_PRODUCER", (RPCHook)null); } public DefaultMQProducer(String producerGroup, RPCHook rpcHook) { this.createTopicKey = "AUTO_CREATE_TOPIC_KEY"; this.defaultTopicQueueNums = 4; this.sendMsgTimeout = 3000; this.compressMsgBodyOverHowmuch = 4096; this.retryTimesWhenSendFailed = 2; this.retryTimesWhenSendAsyncFailed = 2; this.retryAnotherBrokerWhenNotStoreOK = false; this.maxMessageSize = 4194304; this.producerGroup = producerGroup; this.defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); } ... }其中defaultMQProducerImpl成员是Producer的具体实现,其余的一些成员是对一些参数的设置: createTopicKey:是一个Topic值,在创建时使用 defaultTopicQueueNums :默认的Topic队列个数 sendMsgTimeout:发送消息超时时间 compressMsgBodyOverHowmuch:消息容量限制,超过需要进行压缩 retryTimesWhenSendFailed:同步消息发送失败的允许重发次数 retryTimesWhenSendAsyncFailed:异步消息发送失败的允许重发次数 retryAnotherBrokerWhenNotStoreOK:是否允许发送给Broker失败后,重新选择Broker发送 maxMessageSize:消息最大大小 这些属性可以通过DefaultMQProducer提供的get、set方法进行相应操作
DefaultMQProducer继承自ClientConfig,首先会设置ClientConfig提供的更底层的参数配置:
public class ClientConfig { public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel"; private String namesrvAddr = System.getProperty("rocketmq.namesrv.addr", System.getenv("NAMESRV_ADDR")); private String clientIP = RemotingUtil.getLocalAddress(); private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT"); private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors(); private int pollNameServerInterval = 30000; private int heartbeatBrokerInterval = 30000; private int persistConsumerOffsetInterval = 5000; private boolean unitMode = false; private String unitName; private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty("com.rocketmq.sendMessageWithVIPChannel", "true")); private boolean useTLS; private LanguageCode language; ... }其中namesrvAddr是非常重要的成员,其保存着名称服务器(Name Server)的地址,在一开始构造时会根据系统属性进行设置,若是没有设置系统属性就是null,则需要在后面通过set方法进行设置 clientIP:Producer端的本地IP instanceName:Producer的实例名称 pollNameServerInterval :轮询NameServer的时间间隔 heartbeatBrokerInterval :向Broker发送心跳包的时间间隔 SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY 和vipChannelEnabled:决定是否使用VIP通道,即高优先级
接着分析demo代码中的:producer.start();
会进入到DefaultMQProducer类中:
public void start() throws MQClientException { this.defaultMQProducerImpl.start(); }然后进入到DefaultMQProducerImpl类中:
public void start() throws MQClientException { this.start(true); } public void start(boolean startFactory) throws MQClientException { switch(this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // 检查producerGroup this.checkConfig(); // 如果producerGroup为该值,则改变instanceName为pid if (!this.defaultMQProducer.getProducerGroup().equals("CLIENT_INNER_PRODUCER")) { this.defaultMQProducer.changeInstanceNameToPID(); } // 获取或者创建客户端实例 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, this.rpcHook); // 实例注册,实际是放到一个map中,producerGroup boolean registerOK = this.mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null); } else { this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { this.mQClientFactory.start(); } this.log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; } default: //心跳发送 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); return; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null); } }实例化mQClientFactory,这其实是生产者客户端的实例,其中MQClientManager采用单例模式,getInstance是获取MQClientManager的单例,根据ClientConfig的类型,通过getAndCreateMQClientInstance方法实例化不同属性的生产者客户端
MQClientInstance主要是客户端的一些操作的集合
单例模式:
private static MQClientManager instance = new MQClientManager(); public static MQClientManager getInstance() { return instance; }根据id,创建并缓存对应的MQClientInstance
ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; }使用当前MQClientInstance的生产者,消费者,管理者的纪录信息
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>(); private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>(); private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();topic的路由信息:
ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>(); public class TopicRouteData extends RemotingSerializable { private String orderTopicConf;//顺序消费的配置信息 private List<QueueData> queueDatas;//topic在broker上的队列配置信息 private List<BrokerData> brokerDatas;//topic所在broker的信息 private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;//过滤server信息 } public class QueueData implements Comparable<QueueData> { private String brokerName; //topc所在broker private int readQueueNums;//读队列数 private int writeQueueNums;//写队列数 private int perm;//读写配置 private int topicSynFlag;//同步配置 } public class BrokerData implements Comparable<BrokerData> { private String cluster;//broker的集群名称 private String brokerName;//broker名称 private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;//broker的主从节点地址 }全量broker信息
//broker主从节点地址表 ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable //broker版本表 ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable实例化
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) { //配置 this.clientConfig = clientConfig; this.instanceIndex = instanceIndex; //通信客户端配置 this.nettyClientConfig = new NettyClientConfig(); this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads()); this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS()); //请求码处理函数 this.clientRemotingProcessor = new ClientRemotingProcessor(this); //组装请求,代理通信客户端的通信接口 this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig); if (this.clientConfig.getNamesrvAddr() != null) { //更新namesrv地址列表 this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr()); log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr()); } this.clientId = clientId; //topic,队列,消息管理接口 this.mQAdminImpl = new MQAdminImpl(this); //push方式中,异步线程处理拉消息请求。 this.pullMessageService = new PullMessageService(this); //定时任务,调用消费端负载均衡服务。 this.rebalanceService = new RebalanceService(this); //内部生产者topic,用于消费失败或超时的消息,sendMessageBack回发给broker,放大retry topic中重试消费 this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP); this.defaultMQProducer.resetClientConfig(clientConfig); //统计服务 this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService); log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}", this.instanceIndex, this.clientId, this.clientConfig, MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer()); } 接着回到DefaultMQProducerImpl类中的start(boolean startFactory)方法中的this.mQClientFactory.start();即进入到MQClientInstance的start()方法中:
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr()); } //netty相关通信的初始化,然后开启一个定时任务扫描responseTable this.mQClientAPIImpl.start(); // 启动定时任务 this.startScheduledTask(); // 启动消息拉取服务 this.pullMessageService.start(); // 启动Rebalance拉取服务 this.rebalanceService.start(); // 启动内置producer this.defaultMQProducer.getDefaultMQProducerImpl().start(false); this.serviceState = ServiceState.RUNNING; break; case//其他状态不处理; } } }MQClientInstance启动的时候会把一些客户端相关的服务都启动,另外启动了很多重要的定时任务:
拉取NameServer地址更新Topic路由信息清除下线的Broker发送心跳给所有Broker持久化Consumer的消费offset调整线程池看一下this.mQClientAPIImpl.start();方法
进入NettyRemotingClient类start()方法中:
public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(this.nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); public Thread newThread(Runnable r) { return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); } }); Bootstrap handler = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)this.bootstrap.group(this.eventLoopGroupWorker)).channel(NioSocketChannel.class)).option(ChannelOption.TCP_NODELAY, true)).option(ChannelOption.SO_KEEPALIVE, false)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.nettyClientConfig.getConnectTimeoutMillis())).option(ChannelOption.SO_SNDBUF, this.nettyClientConfig.getClientSocketSndBufSize())).option(ChannelOption.SO_RCVBUF, this.nettyClientConfig.getClientSocketRcvBufSize())).handler(new ChannelInitializer<SocketChannel>() { public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (NettyRemotingClient.this.nettyClientConfig.isUseTLS()) { if (null != NettyRemotingClient.this.sslContext) { pipeline.addFirst(NettyRemotingClient.this.defaultEventExecutorGroup, "sslHandler", NettyRemotingClient.this.sslContext.newHandler(ch.alloc())); NettyRemotingClient.log.info("Prepend SSL handler"); } else { NettyRemotingClient.log.warn("Connections are insecure as SSLContext is null!"); } } pipeline.addLast(NettyRemotingClient.this.defaultEventExecutorGroup, new ChannelHandler[]{new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, NettyRemotingClient.this.nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), NettyRemotingClient.this.new NettyConnectManageHandler(), NettyRemotingClient.this.new NettyClientHandler()}); } }); this.timer.scheduleAtFixedRate(new TimerTask() { public void run() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable var2) { NettyRemotingClient.log.error("scanResponseTable exception", var2); } } }, 3000L, 1000L); if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } }其中pipeline.addLast(NettyRemotingClient.this.defaultEventExecutorGroup, new ChannelHandler[]{new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, NettyRemotingClient.this.nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), NettyRemotingClient.this.new NettyConnectManageHandler(), NettyRemotingClient.this.new NettyClientHandler()});
NettyClientHandler类:
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> { NettyClientHandler() { } protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { NettyRemotingClient.this.processMessageReceived(ctx, msg); } }首先看到的是 SimpleChannelInboundHandler 继承自 ChannelInboundHandlerAdapter。
既然是继承关系,也就是说,"你有的我也有,你没有的我还有。" 那么 SimpleChannelInboundHandler 里面肯定重写或者新增了 ChannelInboundHandlerAdapter 里面的方法功能 - channelRead0 和 channelRead()。然后执行 NettyRemotingClient.this.processMessageReceived(ctx, msg);
NettyRemotingAbstract类中:
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { if (msg != null) { switch(msg.getType()) { case REQUEST_COMMAND: this.processRequestCommand(ctx, msg); break; case RESPONSE_COMMAND: this.processResponseCommand(ctx, msg); } } }其中进入方法processResponseCommand:
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { int opaque = cmd.getOpaque(); ResponseFuture responseFuture = (ResponseFuture)this.responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); this.responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { this.executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }其中this.executeInvokeCallback(responseFuture);主要是发异步消息进行回调用的