SpringCloudHystrix之请求合并

mac2023-01-25  24

Spring Cloud 系列 eureka之服务治理完整过程搭建eureka之高可用的注册中心eureka之详解ribbon之客户端负载均衡ribbon之配置详解Hystrix之服务容错保护Hystrix之使用详解Hystrix之请求合并Hystrix之仪表盘以及Turbine集群监控Feign之声明式服务调用Consul之服务发现和配置管理Sleuth之分布式服务跟踪Zuul之API网关服务Config之分布式配置中心Config之服务端详解Config之客户端详解Bus之消息总线Stream之消息驱动的微服务Stream之绑定器详解重试机制

系列源码地址


文章目录

1. 基于继承实现2. 基于注解实现 写在前面

该文参考来自 程序猿DD 的Spring Cloud 微服务实战一书,该文是作为阅读了 spring cloud hystrix 一章的读书笔记。书中版本比较老,我选择了最新稳定版的 spring cloud Greenwich.SR2 版本,该版本较书中版本有些变动。非常感谢作者提供了这么好的学习思路,谢谢!文章也参考了 Spring-cloud-netflix 的官方文档。

微服务架构中的依赖通常通过远程调用实现,而远程调用中最常见的问题就是通信消耗与连接数占用。高并发情况下,通信次数增加,但依赖的服务的线程池资源有限,将出现排队等待与响应延迟的情况。为了优化这个问题,Hystrix 提供了 HystrixCollapser 来实现请求的合并,以减少通信消耗和线程数的占用。

注:请求的合并是有有效范围的,在针对 http 请求中,可以描述为在单次 http 接口请求中,对于多次请求服务的合并,还有一种情况为在多次 http 接口请求中,多于请求服务的合并。更详细描述以及实现在后续文章里。

代码参考 eureka-service-consumer、eureka-service-provider 两个微服务。

1. 基于继承实现


被调用方服务准备: @PostMapping("/hello4") public List<String> hello4(@RequestBody List<String> names) { return names.stream().map(name -> "hello" + name).collect(Collectors.toList()); } @GetMapping("/hello4") public String hello4(@RequestBody String name) { return "hello" + name; }

get 方式的请求将获取单个值,Post 方式的请求传入多个参数,获取多个值。

消费方:

service 层准备:

public String getName(String name){ return restTemplate.getForEntity("http://HELLO-SERVICE/provider/hello4", String.class).getBody(); } public List<String> getNames(List<String> names){ System.out.println("合并请求:" + Thread.currentThread().getName() + ",names=" + names); HttpEntity<List<String>> request = new HttpEntity<>(names, new HttpHeaders()); return restTemplate.postForEntity("http://HELLO-SERVICE/provider/hello4", request, List.class).getBody(); }

针对服务方的接口,我们准备了如上的两个 service 方法,供其它方法调用。

批量请求命令实现:

/** * 批量请求命令 * @author duofei * @date 2019/10/28 */ public class HelloBatchCommand extends HystrixCommand<List<String>> { private ConsumerService service; private List<String> names; public HelloBatchCommand(ConsumerService service, List<String> names){ super(Setter.withGroupKey(() -> "helloServiceCommand")); this.service = service; this.names = names; } @Override protected List<String> run() throws Exception { return service.getNames(names); } }

请求合并实现:

/** * 请求合并命令 * @author duofei * @date 2019/10/28 */ public class HelloCollapseCommand extends HystrixCollapser<List<String>, String, String> { private ConsumerService consumerService; private String name; public HelloCollapseCommand(ConsumerService consumerService, String name) { super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("userCollapseCommand"))); this.consumerService = consumerService; this.name = name; } @Override public String getRequestArgument() { return name; } @Override protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, String>> collapsedRequests) { /** * collapsedRequests 保存了延迟时间窗中收集到的所有获取单个 name 的请求, * 通过获取这些参数,来组织批量请求命令 HelloBatchCommand 实例 */ return new HelloBatchCommand(consumerService, collapsedRequests.stream() .map(CollapsedRequest::getArgument).collect(Collectors.toList())); } @Override protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, String>> collapsedRequests) { int count = 0; for (CollapsedRequest<String, String> collapsedRequest : collapsedRequests) { collapsedRequest.setResponse(batchResponse.get(count++)); } } }

controller 层准备:

对外提供访问单个服务的接口(调用上述service 中的 getName方法):

@GetMapping(value="/ribbon-consumer5") public String helloConsumer5(@RequestParam String name) { Future<String> queue1 = new HelloCollapseCommand(consumerService, name + "12").queue(); Future<String> queue2 = new HelloCollapseCommand(consumerService, name + "123").queue(); Future<String> queue3 = new HelloCollapseCommand(consumerService, name).queue(); Future<String> queue4 = new HelloCollapseCommand(consumerService, name).queue(); try { System.out.println(queue1.get() + queue2.get() + queue3.get() + queue4.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return consumerService.getName(name); }

这里还有个很有趣的现象,对于相同参数,请求会自动合并。例如 上述虽然有四个 HelloCollapseCommand,但入参有重复的 name,最终,会自动去重。

运行代码,service 层的打印语句如下:

// name = 122 合并请求:hystrix-helloServiceCommand-1,names=[122, 122123, 12212]

上述的批量请求合并是发生在同一个接口中的,那么如果我们多次调用该接口呢?(简单修改打印语句,输出当前线程名)

// 输出如下: 合并请求:hystrix-helloServiceCommand-1(线程名),names=[小李, 小李12, 小李123] 线程名:http-nio-8081-exec-1,hello小李12hello小李123hello小李hello小李 合并请求:hystrix-helloServiceCommand-2(线程名),names=[小明2123, 小明2, 小明212] 线程名:http-nio-8081-exec-2,hello小明212hello小明2123hello小明2hello小明2

可以发现,合并请求用了两个线程来执行任务,这好像对我们来说用处不大呀,我们完全可以不使用请求合并命令,自己拼接参数,然后调用 service层的批量服务接口。其实,不然,这只是它的一种用法;

hystrix 针对请求合并引入了 scope 的概念,代表请求合并域,通俗的来叫,就是它的请求合并的有限范围,默认情况下是在 REQUEST,即一次请求;通常,这意味着单个用户请求(即HTTP请求)中的请求将被折叠。没有与其他用户请求的交互。每个用户请求1个队列。scope 还提供了一个值: GLOBAL,这意味着来自 JVM 中任何线程的请求(即所有HTTP请求)将被折叠,整个应用1个队列。

注:这需要初始化 HystrixRequestContext, 这个的介绍有在我上一篇 Hystrix之使用详解中。

修改 HelloCollapseCommand 构造函数如下:

public HelloCollapseCommand(ConsumerService consumerService, String name) { super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("userCollapseCommand")) .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(2000)) .andScope(Scope.GLOBAL) ); this.consumerService = consumerService; this.name = name; }

注意:我们这里将 withTimerDelayInMilliseconds 值修改为了 2000 ,因为我在1000 的测试环境下,很难达到合并的条件(时间太短,我来不及刷新两个页面)。

打印结果如下:

//来自 service的打印语句 合并请求:hystrix-helloServiceCommand-1(线程名),names=[小李, 小明2123, 小明2, 小李12, 小明212, 小李123] // 来自 controller 的打印语句 线程名:http-nio-8081-exec-3,hello小明212hello小明2123hello小明2hello小明2 线程名:http-nio-8081-exec-1,hello小李12hello小李123hello小李hello小李

这不就是我们想要的了嘛?不过一定要权衡好利弊,这无形中会造成单次接口调用响应延时。

2. 基于注解实现


注解的实现就简单多了,参考上面的实例,service 新增方法如下:

@HystrixCollapser(batchMethod = "getNames2", scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL, collapserProperties = {@HystrixProperty(name = "timerDelayInMilliseconds",value = "2000")}) public String getName1(String name){ return restTemplate.getForEntity("http://HELLO-SERVICE/provider/hello5?name=" + name, String.class).getBody(); } @HystrixCommand public List<String> getNames2(List<String> names){ System.out.println("合并请求:" + Thread.currentThread().getName() + "(线程名)" + ",names=" + names); HttpEntity<List<String>> request = new HttpEntity<>(names, new HttpHeaders()); return restTemplate.postForEntity("http://HELLO-SERVICE/provider/hello4", request, List.class).getBody(); }

测试代码,就不写了,这个很好处理了。

最新回复(0)