从零开始手写 dubbo rpc 框架-07-timeout客戶端超时检测

mac2025-10-10  10

timeout 超时处理

我们调用外部服务,不可能一直等待。

当外部的调用超过指定的时间后,就直接报错,避免无意义的资源消耗。

核心实现

思路

调用的时候,将开始时间保留。

获取的时候检测是否超时。

同时创建一个线程,用来检测是否有超时的请求。

超时检测线程

import com.github.houbb.heaven.util.common.ArgUtil; import com.github.houbb.rpc.common.rpc.domain.RpcResponse; import com.github.houbb.rpc.common.rpc.domain.impl.RpcResponseFactory; import com.github.houbb.rpc.common.support.time.impl.Times; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 超时检测线程 * @author binbin.hou * @since 0.0.7 */ public class TimeoutCheckThread implements Runnable{ /** * 请求信息 * @since 0.0.7 */ private final ConcurrentHashMap<String, Long> requestMap; /** * 请求信息 * @since 0.0.7 */ private final ConcurrentHashMap<String, RpcResponse> responseMap; /** * 新建 * @param requestMap 请求 Map * @param responseMap 结果 map * @since 0.0.7 */ public TimeoutCheckThread(ConcurrentHashMap<String, Long> requestMap, ConcurrentHashMap<String, RpcResponse> responseMap) { ArgUtil.notNull(requestMap, "requestMap"); this.requestMap = requestMap; this.responseMap = responseMap; } @Override public void run() { for(Map.Entry<String, Long> entry : requestMap.entrySet()) { long expireTime = entry.getValue(); long currentTime = Times.time(); if(currentTime > expireTime) { final String key = entry.getKey(); // 结果设置为超时,从请求 map 中移除 responseMap.putIfAbsent(key, RpcResponseFactory.timeout()); requestMap.remove(key); } } } }

InvokeService 调用服务的默认实现

核心实现

@Override public InvokeService addRequest(String seqId, long timeoutMills) { LOG.info("[Client] start add request for seqId: {}, timeoutMills: {}", seqId, timeoutMills); final long expireTime = Times.time()+timeoutMills; requestMap.putIfAbsent(seqId, expireTime); return this; } @Override public InvokeService addResponse(String seqId, RpcResponse rpcResponse) { // 1. 判断是否有效 Long expireTime = this.requestMap.get(seqId); // 如果为空,可能是这个结果已经超时了,被定时 job 移除之后,响应结果才过来。直接忽略 if(ObjectUtil.isNull(expireTime)) { return this; } //2. 判断是否超时 if(Times.time() > expireTime) { LOG.info("[Client] seqId:{} 信息已超时,直接返回超时结果。", seqId); rpcResponse = RpcResponseFactory.timeout(); } // 这里放入之前,可以添加判断。 // 如果 seqId 必须处理请求集合中,才允许放入。或者直接忽略丢弃。 // 通知所有等待方 responseMap.putIfAbsent(seqId, rpcResponse); LOG.info("[Client] 获取结果信息,seqId: {}, rpcResponse: {}", seqId, rpcResponse); LOG.info("[Client] seqId:{} 信息已经放入,通知所有等待方", seqId); // 移除对应的 requestMap requestMap.remove(seqId); LOG.info("[Client] seqId:{} remove from request map", seqId); synchronized (this) { this.notifyAll(); } return this; } @Override public RpcResponse getResponse(String seqId) { try { RpcResponse rpcResponse = this.responseMap.get(seqId); if(ObjectUtil.isNotNull(rpcResponse)) { LOG.info("[Client] seq {} 对应结果已经获取: {}", seqId, rpcResponse); return rpcResponse; } // 进入等待 while (rpcResponse == null) { LOG.info("[Client] seq {} 对应结果为空,进入等待", seqId); // 同步等待锁 synchronized (this) { this.wait(); } rpcResponse = this.responseMap.get(seqId); LOG.info("[Client] seq {} 对应结果已经获取: {}", seqId, rpcResponse); } return rpcResponse; } catch (InterruptedException e) { throw new RpcRuntimeException(e); } }

核心方法说明

添加请求 addRequest

会将过时的时间直接放入 map 中。

因为放入是一次操作,查询可能是多次。

所以时间在放入的时候计算完成。

添加响应 addResponse

如果 requestMap 中已经不存在这个请求信息,则说明可能超时,直接忽略存入结果。

此时检测是否出现超时,超时直接返回超时信息。

放入信息后,通知其他等待的所有进程。

获取相应 getResponse

如果结果存在,直接返回响应结果

否则进入等待。

等待结束后获取结果。

测试代码

服务端

为了测试超时,服务端实现故意添加沉睡模拟耗时。

其他保持不变。

客户端

启动配置添加属性,设置超时时间为1S

config.timeout(1000); 测试日志 [INFO] [2019-11-01 17:01:41.959] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] 获取结果信息,seqId: 092d96d2d90d4b1f8ff5081b2f545e5c, rpcResponse: DefaultRpcResponse{seqId='null', error=com.github.houbb.rpc.common.exception.RpcTimeoutException, result=null} [INFO] [2019-11-01 17:01:41.959] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:092d96d2d90d4b1f8ff5081b2f545e5c 信息已经放入,通知所有等待方 [INFO] [2019-11-01 17:01:41.960] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:092d96d2d90d4b1f8ff5081b2f545e5c remove from request map Exception in thread "main" [INFO] [2019-11-01 17:01:41.960] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :DefaultRpcResponse{seqId='092d96d2d90d4b1f8ff5081b2f545e5c', error=null, result=CalculateResponse{success=true, sum=30}} [INFO] [2019-11-01 17:01:41.960] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 092d96d2d90d4b1f8ff5081b2f545e5c 对应结果已经获取: DefaultRpcResponse{seqId='null', error=com.github.houbb.rpc.common.exception.RpcTimeoutException, result=null} com.github.houbb.rpc.common.exception.RpcTimeoutException
最新回复(0)