(Spring Cloud本身集合了很多微服务框架,实现分布式系统的快速开发) Spring Cloud是一系列框架的有序集合。使用springboot的开发风格,实现分布式系统的快速开发。 如服务发现注册、配置中心、消息总线、负载均衡、断路器、数据监控等,都可以用Spring Boot的开发风格做到一键启动和部署
一、 什么是服务注册中心
服务注册中心是服务实现服务化管理的核心组件,类似于目录服务的作用,主要用来存储服务信息,譬如提供者 url 串、路由信息等。服务注册中心是 SOA 架构中最基础的设施之一 1 服务注册中心的作用
服务的注册服务的发现2 常见的注册中心有哪些
Dubbo 的注册中心 ZookeeperSringcloud 的注册中心 Eureka3 服务注册中心解决了什么问题
服务管理服务的依赖关系管理4 什么是 Eureka 注册中心
Eureka 是 Netflix 开发的服务发现组件,本身是一个基于 REST 的服务。 Spring Cloud将它集成在其子项目 spring-cloud-netflix 中,以实现 Spring Cloud 的服务注册于发现,同时还提供了负载均衡、故障转移等能力。 5 Eureka 注册中心三种角色
5.1Eureka Server
通过 Register、Get、Renew 等接口提供服务的注册和发现。 5.2Application Service (Service Provider)
服务提供方把自身的服务实例注册到 Eureka Server 中 5.3Application Client (Service Consumer)
服务调用方通过 Eureka Server 获取服务列表,消费服务
①. 服务发现——Netflix Eureka
一个RESTful服务,用来定位运行在AWS地区(Region)中的中间层服务。 由两个组件组成:Eureka服务器和Eureka客户端。Eureka服务器用作服务注册服务器。 Eureka客户端是一个java客户端,用来简化与服务器的交互、作为轮询负载均衡器,并提供服务的故障切换支持。 Netflix在其生产环境中使用的是另外的客户端,它提供基于流量、资源利用率以及出错状态的加权负载均衡。
②. 客服端负载均衡——Netflix Ribbon
Ribbon,主要提供客户侧的软件负载均衡算法。 Ribbon客户端组件提供一系列完善的配置选项,比如连接超时、重试、重试算法等。 Ribbon内置可插拔、可定制的负载均衡组件。
③. 断路器——Netflix Hystrix
断路器可以防止一个应用程序多次试图执行一个操作,即很可能失败,允许它继续而不等待故障恢复或者浪费 CPU 周期,而它确定该故障是持久的。 断路器模式也使应用程序能够检测故障是否已经解决。 如果问题似乎已经得到纠正,应用程序可以尝试调用操作。
④. 服务网关——Netflix Zuul
类似nginx,反向代理的功能,不过netflix自己增加了一些配合其他组件的特性。
⑤. 分布式配置——Spring Cloud Config
这个还是静态的,得配合Spring Cloud Bus实现动态的配置更新
1.创建SpringBoot项目 创建一个SpringBoot项目 2.引入相关的依赖 添加如下的相关依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> <version>1.4.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka-server</artifactId> <version>1.3.2.RELEASE</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Dalston.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>创建启动器
在启动类中我们需要放开@EnableEurekaServer,表明这是个Eureka的服务端
@SpringBootApplication @EnableEurekaServer //表示开启服务注册中心 public class EurekaApplication { public static void main(String[] args) { SpringApplication.run(EurekaApplication.class, args); } }修改application.properties文件
# 服务名 spring.application.name=eureka # 是否将自己注册到 Eureka中,默认为true eureka.client.register-with-eureka=false # 是否从 Eureka服务中 获取注册信息,默认为true eureka.client.fetch-registry=false # 设置端口 server.port=8761 # 注册地址 eureka.client.service-url.defaultZone=http://127.0.0.1:8761/eureka启动服务访问web页面
启动服务后访问 http://localhost:8761 可以访问表示启动成功 服务提供者 修改application.properties文件
spring.application.name=provider eureka.client.service-url.defaultZone=http://127.0.0.1:8761/eureka server.port=2000启动 服务消费者
修改application.properties文件
spring.application.name=consumer eureka.client.service-url.defaultZone=http://127.0.0.1:8761/eureka server.port=3000注册Bean
@Bean @LoadBalanced //表示具有负载均衡功能的实例,使用这个实例发送的请求,统统都会被拦截,进行负载均衡处理 RestTemplate restTemplate(){ return new RestTemplate(); }注册服务
@RestController public class UserHelloCOntroller { @Autowired private DiscoveryClient discoveryClient; @GetMapping("/hello") public void hello() throws IOException { //根据服务名获取服务的详细信息, 因为propider可能是集群 List<ServiceInstance> list = discoveryClient.getInstances("provider"); ServiceInstance serviceInstance = list.get(0); String url = "http://" + serviceInstance.getHost() + ":" + serviceInstance.getPort() + "/hello"; HttpURLConnection con = null; URL u = new URL(url); con = (HttpURLConnection) u.openConnection(); con.connect(); if(con.getResponseCode() == 200){ BufferedReader br = new BufferedReader(new InputStreamReader(con.getInputStream())); String s = br.readLine(); System.out.println(s); br.close(); } } } 接口说明LoadBalancerClient负载均衡器,根据服务名称可以获取对应服务的ip和host等信息RestTemplateRest服务模板,可以完成服务的调用负载均衡有好几种实现策略,常见的有:
随机 (Random)轮询 (RoundRobin)一致性哈希 (ConsistentHash)哈希 (Hash)加权(Weighted)SpringRestTemplate是Spring 提供的用于访问 Rest 服务的客端, RestTemplate提供了多种便捷访问远程Http服务的方法, 能够大大提高客户端的编写效率,所以很多客户端比如Android或者第三方服务商都是使用RestTemplate 请求 restful服务
API方法介绍
API说明getForEntity()发送一个HTTP GET请求,返回的ResponseEntity包含了响应体所映射成的对象getForObject()发送一个HTTP GET请求,返回的请求体将映射为一个对象postForEntity()POST 数据到一个URL,返回包含一个对象的ResponseEntity,这个对象是从响应体中映射得到的postForObject()POST 数据到一个URL,返回根据响应体匹配形成的对象headForHeaders()发送HTTP HEAD请求,返回包含特定资源URL的HTTP头optionsForAllow()发送HTTP OPTIONS请求,返回对特定URL的Allow头信息postForLocation()POST 数据到一个URL,返回新创建资源的URLput()PUT 资源到特定的URLdelete()在特定的URL上对资源执行HTTP DELETE操作exchange()在URL上执行特定的HTTP方法,返回包含对象的ResponseEntity,这个对象是从响应体中映射得到的execute()在URL上执行特定的HTTP方法,返回一个从响应体映射得到的对象1.环境搭建
commons是一个公共模块,是一个普通的JavaSE工程,主要将实体类写在这个模块中, provider和consumer是两个spring boot项目, provider将扮演 服务提供者 的角色, consumer扮演服务消费者的角色
然后在provider和consumer模块中添加对commons的依赖,依赖代码如下:
<dependency> <groupId>com.i</groupId> <artifactId>commons</artifactId> <version>0.0.1-SNAPSHOT</version> </dependencyGET请求 可通过如下两种方式:
第一种:getForEntity
getForEntity方法的返回值是一个ResponseEntity<T>, ResponseEntity<T>是Spring对HTTP请求响应的封装,包括了几个重要的元素,如响应码、contentType、contentLength、响应消息体等。
@GetMapping("/test1") public void test1(){ Map<String, Object> map = new HashMap<>(); map.put("id", 1); ResponseEntity<User> entity = restTemplate.getForEntity("http://provider/user/?id={id}", User.class); System.out.println("entity.getStatusCode() :" + entity.getStatusCode()); System.out.println("entity.getStatusCodeValue() :"+entity.getStatusCodeValue()); System.out.println("entity.getBody() :"+entity.getBody()); } getForEntity()第一个参数为 要调用的服务的地址,通过 服务名 而不是 服务地址getForEntity()第二个参数为 返回类型getForEntity()第三个参数为 传递的参数有两种方式传递:
@RequestMapping("/sayhello") public String sayHello() { ResponseEntity<String> responseEntity = restTemplate.getForEntity("http://provider/user/user?name={1}", String.class, "张三"); return responseEntity.getBody(); } @RequestMapping("/sayhello2") public String sayHello2() { Map<String, String> map = new HashMap<>(); map.put("name", "李四"); ResponseEntity<String> responseEntity = restTemplate.getForEntity("http://provider/user/user?name={name}", String.class, map); return responseEntity.getBody(); } 可以用 一个数字做 占位符,最后是一个 可变长度的参数,来一一替换前面的占位符可以使用name={name}形式,最后一个参数是 map,map的key为 前面占位符的 名字,map的value为 参数值第二种:getForObject
getForObject函数是对getForEntity函数的进一步封装,如 只关注返回的消息体的内容,其他不需要,可以使用getForObject
@RequestMapping("/book") public Book book2() { Book book = restTemplate.getForObject("http://hello-service/getbook1", Book.class); return book; }POST请求
三种方法:
第一种:postForEntity
@GetMapping("/addUser") public User test2() { User user = new User(); user.setId(1); ResponseEntity<User> responseEntity = restTemplate.postForEntity("http://provider/user/addUser", user, User.class); return responseEntity.getBody(); } 参数一: 要调用的服务的地址参数二: 上传的参数参数三: 返回的数据类型创建了User对象,只有id属性,传递到服务提供者去
@PostMapping("/addUser") public User addUser(@RequestBody User user) { user.setUsername("阿猫"); user.setAddress("深圳"); return user; }服务提供者 接收到服务消费者传来的 User参数,给其他属性 设置上值再返回,调用结果如下:
第二种:postForObject
如果只关注 返回的消息体,可以直接使用postForObject。用法与getForObject一样
第三种:postForLocation postForLocation也是提交新资源, 提交成功后,返回新资源的URL, postForLocation的参数和前面 两种的参数 基本一致,但 返回值为URI,只需 服务提供者返回 URI即可, 该URI表示新资源的位置
PUT请求 RestTemplate中,PUT请求可以通过put方法调用, put方法的参数和前面介绍的postForEntity方法的参数基本一致,只是put()方法没有返回值而已。
@RequestMapping("/put") public void put() { User user = new User(); user.setUsername("阿猫"); restTemplate.put("http://provider/user/?{1}", user, 99); }user对象为 要提交的参数,后面的 99 用来替换前面的占位符
DELETE请求 delete请求我们可以通过delete()方法调用来实现
@RequestMapping("/delete") public void delete() { restTemplate.delete("http://provider/user/{1}",99); }灾难性雪崩 在微服务环境中,因为一个节点的故障而造成的其他节点的不可用的情况是比较常见的, 这也就是我们常说的灾难性雪崩现象,而Hystrix给我们提供了解决这种情况的方案
正常情况下各个节点相互配置,完成用户请求的处理工作 当某种请求增多,造成"服务T"故障的情况时,会延伸的造成"服务U"不可用,及继续扩展,如下 最终造成下面这种所有服务不可用的情况 这就是灾难性雪崩 原因可归纳为三个:
服务提供者不可用 (硬件故障,程序BUG,缓存击穿,用户大量请求)重试加大流量 (用户重试,代码逻辑重试)服务调用者不可用 (同步等待造成的资源耗尽)最终的结果就是一个服务不可用,导致一系列服务的不可用,而往往这种后果是无法预料的。
如何解决灾难性雪崩效应?
1.降级 超时降级、资源不足时(线程或信号量)降级,降级后可以 配合降级接口 返回 托底数据。 实现一个 fallback 方法, 当请求后端服务出现异常的时候, 可以使用 fallback 方法返回的值
2.缓存
Hystrix 为了降低访问服务的频率,支持将一个请求与返回结果做缓存处理。 如果再次请求的 URL 没有变化,那么 Hystrix 不会请求服务,而是直接从缓存中将结果返回。 这样可以大大降低访问服务的压力
3.请求合并
在微服务架构中,我们将一个 项目拆分 成 很多个 独立的 模块,这些独立的模块通过 远程调用 来互相配合工作, 但是,在高并发情况下,通信次数 的 增加 会导致 总的通信时间 增加, 同时,线程池的资源也是有限的,高并发环境会 导致有 大量的 线程处于 等待状态,进而导致 响应延迟, 为了解决这些问题,我们需要了解 Hystrix 的请求合并。
4.熔断 当失败率 (如因网络故障/超时造成的失败率高)达到 阀值 自动 触发 降级, 熔断器触发的 快速失败会进行 快速恢复。
5.隔离(线程池隔离和信号量隔离)
限制调用分布式服务的资源使用,某一个调用的服务出现问题不会影响其他服务调用。
超时降级、资源不足时(线程或信号量)降级,降级后可以配合降级接口返回托底数据。 实现一个 fallback 方法, 当请求后端服务出现异常的时候, 可以使用 fallback 方法返回的值
服务消费者中加入断路器
首先我们需要在服务消费者中引入hystrix,如下:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> </dependency>修改application.properties文件
# 服务名 spring.application.name=hystrix # 设置端口 server.port=4000 #设置服务注册中心地址,指向另一个注册中心 eureka.client.service-url.defaultZone=http://127.0.0.1:8761/eureka修改服务消费者启动入口类
引入hystrix之后,我们需要在入口类上通过@EnableCircuitBreaker开启断路器功能,如下:
@SpringBootApplication @EnableCircuitBreaker //开启断路器 public class HystrixApplication { public static void main(String[] args) { SpringApplication.run(HystrixApplication.class, args); } @Bean @LoadBalanced RestTemplate restTemplater(){ return new RestTemplate(); } }我们也可以使用一个名为@SpringCloudApplication的注解代替这三个注解,@SpringBootApplication注解的定义如下:
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @SpringBootApplication @EnableDiscoveryClient @EnableCircuitBreaker public @interface SpringCloudApplication { } //@SpringBootApplication //@EnableCircuitBreaker //开启断路器 @SpringCloudApplication public class HystrixApplication { public static void main(String[] args) { SpringApplication.run(HystrixApplication.class, args); } @Bean @LoadBalanced RestTemplate restTemplater(){ return new RestTemplate(); } }修改Controller
创建一个Service类 业务层代码中在getUsers方法中通过RestTemplate来调用服务, 在方法头部添加@HystrixCommand注解,通过fallbackMethod 属性指定当调用的provider方法异常的时候的fallback方法为fallBack方法,然后在其方法中返回了托底数据
@RestController public class UserHelloController { @Autowired RestTemplate restTemplate; @HystrixCommand(fallbackMethod = "error") public String getUser(){ return restTemplate.getForObject("http://provider/user/", String.class); } public String error(){ return "error"; } }1.RestTemplate执行网络请求的操作我们放在HelloService中来完成。 2.error方法是一个请求失败时回调的方法。 3.在hello方法上通过@HystrixCommand注解来指定请求失败时回调的方法
将Controller的逻辑修改成下面这样
@RestController public class UserHelloController { @Autowired RestTemplate restTemplate; @Autowired HelloService helloService; @GetMapping("/hello") public String hello(){ return helloService.hello(); } }此时就开启了断路器功能。
测试
启动eureka注册中心,然后启动consumer服务,provider服务不用启动,这样我们访问consumer中的服务的时候就会出现异常,当我们浏览器看到托底数据的话,表示降级成功.
实际开发中,不是所有的请求都要提前预备好 服务降级问题,如果我就是要将服务调用失败的信息展示给用户,那么此时就没必要添加断路器了
自定义Hystrix请求命令 除了使用@HystrixCommand注解,也可以自定义类继承自HystrixCommand,如下
public class HelloCommand extends HystrixCommand<String> { RestTemplate restTemplate; public HelloCommand(RestTemplate restTemplate) { super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(""))); this.restTemplate = restTemplate; } @Override protected String run() throws Exception { return restTemplate.getForObject("http://provider/hello",String.class); } @Override protected String getFallback() { return "error-2"; } }在HelloCommand中注入RestTemplate,然后重写两个方法:
getFallBack(): 该方法在服务调用失败时回调run():执行请求时调用构造方法的第一个参数 主要用来保存一些分组信息异步请求
1.配置HystrixCommandAspect的Bean 在项目的入口类中配置HystrixCommandAspect的Bean
@Bean public HystrixCommandAspect hystrixCommandAspect() { return new HystrixCommandAspect(); }2.通过AsyncResult来执行调用
还是使用@HystrixCommand注解,但是方法的实现使用AsyncResult
@HystrixCommand(fallbackMethod = "error") public Future<String> hello(){ return new AsyncResult<String>(){ @Override public String invoke() { return restTemplate.getForObject("http://provider/user/", String.class); } }; }之后就可以通过注解来实现异步调用了。调用方式如下:
@GetMapping("/hello") public String hello() throws ExecutionException, InterruptedException { Future<String> hello = helloService.hello(); //调用 get()方法 也可以设置超时时长 return hello.get(); }对响应式函数编程的支持 Hystrix对响应式函数编程也提供了相应的支持,在获取到BookCommand对象之后, 也可以通过如下两种方式来获取到一个Observable来对数据进行二次处理
UserCommand userCommand = new UserCommand(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("")), restTemplate); Observable<User> observe = userCommand.observe(); Observable<User> bookObservable = userCommand.toObservable();通过注解支持响应式函数编程
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.EAGER) //表示使用observe模式来执行 @HystrixCommand(observableExecutionMode = ObservableExecutionMode.LAZY) //表示使用toObservable来执行 ObservableExecutionMode.EAGER 表示使用observe模式来执行ObservableExecutionMode.LAZY 表示使用toObservable来执行异常处理 在调用服务提供者 时有可能会 抛异常,默认情况下方法抛了异常会 自动进行 服务降级, 交给服务降级中的方法去处理,在自定义Hystrix请求命令的方式下, 可以在getFallback方法中调用getExecutionException方法来获取抛出的异常
@Override protected String run() throws Exception { int i = 1/0; return restTemplate.getForObject("http://provider/hello",String.class); } @Override protected String getFallback() { Throwable t = getExecutionException(); return "error-2"+t.getMessage(); }抛出了异常也会自动进行了服务降级
采用了注解的方式,只需要在服务降级方法中添加一个Throwable类型的参数就能够获取到抛出的异常的类型,如下:
@HystrixCommand(fallbackMethod = "error") public String hello(){ int i = 1/0; return restTemplate.getForObject("http://provider/user/", String.class); } public String error(Throwable throwable){ return "error"+throwable.getMessage(); }如果有一个异常抛出后,不希望进入到服务降级方法中去处理,而是直接将 异常抛给 用户,那么我们可以在@HystrixCommand注解中添加忽略异常,如下:
@HystrixCommand(fallbackMethod = "error",ignoreExceptions = ArithmeticException.class) public String hello(){ int i = 1/0; return restTemplate.getForObject("http://provider/user/", String.class); } public String error(Throwable throwable){ return "error"+throwable.getMessage(); }原理:有个HystrixBadRequestException的异常不会进入到 服务降级方法中去,当定义了ignoreExections为ArithmeticExection.class之后,当抛出 ArithmeticException异常时,Hystrix会将异常信息包装在HystrixBadRequestExection里,然后再抛出,此时就不会触发 服务降级方法了
Hystrix 为了降低访问服务的频率,支持将一个请求与返回结果做缓存处理。 如再次请求的 URL 没有变化,那Hystrix不会请求服务,而是直接从缓存中将结果返回。这样可以大大降低访问服务的压力。
Hystrix 自带缓存。有两个缺点:
是一个本地缓存。在集群情况下缓存是不能同步的不支持第三方缓存容器。Redis,memcache 不支持的所以我们使用spring的cache 通过注解开启缓存
@CacheResult@CacheKey@CacheRemove@CacheResult @CacheResult方法可以用在我们之前的Service方法上, 表示给该方法开启缓存,默认情况下方法的所有参数都将作为缓存的key,如下
@CacheResult @HystrixCommand() public String hello(Integer id,String a){ return restTemplate.getForObject("http://provider/user/", String.class); }如果想指定key @CacheKey 指定缓存的key
@CacheResult @HystrixCommand() public String hello(@CacheKey Integer id, String a){ return restTemplate.getForObject("http://provider/user/", String.class); }使用了服务降级的话,对应的方法也要加上参数
public String error(Integer id,Throwable throwable){ return "error"+throwable.getMessage(); }测试
HystrixRequestContext ctx = HystrixRequestContext.initializeContext(); String hello = helloService.hello(1); hello = helloService.hello(1); ctx.close(); return hello;@CacheRemove 让缓存失效的注解
@CacheRemove(commandKey = "hello") @HystrixCommand public Book test(@CacheKey Integer id) { return null; }通过方法重载开启缓存 使用了自定义Hystrix请求命令的方式来使用Hystrix,那只需重写getCacheKey方法即可实现请求缓存
public class HelloCommand extends HystrixCommand<String> { private RestTemplate restTemplate; private Integer id; public HelloCommand(Integer id,RestTemplate restTemplate) { super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(""))); this.restTemplate = restTemplate; this.id = id; } @Override protected String run() throws Exception { return restTemplate.getForObject("http://provider/hello",String.class); } @Override protected String getCacheKey() { System.out.println("run"); return String.valueOf(id); } }系统在运行时会根据getCacheKey方法的返回值来判断这个请求是否和之前执行过的请求一样,即被缓存,如果被缓存,则直接使用缓存数据而不去请求服务提供者,getCacheKey方法将在run方法之前执行。
controller消费
@GetMapping("/hello") public String hello() throws ExecutionException, InterruptedException { HystrixRequestContext ctx = HystrixRequestContext.initializeContext(); HelloCommand cmd = new HelloCommand(1,restTemplate); HelloCommand cmd2 = new HelloCommand(1,restTemplate); String execute = cmd.execute(); String execute2 = cmd2.execute(); return execute; }注意,在服务请求发起之前,需要先初始化HystrixRequestContext
通过HystrixRequestCache中的clear方法将缓存的数据清除掉,这个时候如果我再发起请求,则又会调用服务提供者的方法
@GetMapping("/hello") public String hello() throws ExecutionException, InterruptedException { HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("commandKey"); HystrixRequestContext ctx = HystrixRequestContext.initializeContext(); HelloCommand cmd = new HelloCommand(1,restTemplate); HelloCommand cmd2 = new HelloCommand(1,restTemplate); String execute = cmd.execute(); HystrixRequestCache.getInstance(commandKey, HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(1l)); String execute2 = cmd2.execute(); return execute; }熔断其实是在降级的基础上引入了重试的机制。当某个时间内失败的次数达到了多少次就会触发熔断机制,具体的流程如下
@HystrixCommand(fallbackMethod = "fallback", commandProperties = { //默认 20 个;10s 内请求数大于 20 个时就启动熔断器,当请求符合熔断条件时将触发 getFallback()。 @HystrixProperty(name= HystrixPropertiesManager.CIRCUIT_BREAKER_REQUEST_VOLUME_THRESHOLD, value="10"), //请求错误率大于 50%时就熔断,然后 for 循环发起请求,当请求符合熔断条件时将触发 getFallback()。 @HystrixProperty(name=HystrixPropertiesManager.CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE, value="50"), //默认 5 秒;熔断多少秒后去尝试请求 @HystrixProperty(name=HystrixPropertiesManager.CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS, value="5000"), })什么情况下使用请求合并
在微服务架构中,我们将一个 项目拆分 成 很多个 独立的 模块,这些独立的模块通过 远程调用 来互相配合工作, 但是,在高并发情况下,通信次数 的 增加 会导致 总的通信时间 增加, 同时,线程池的资源也是有限的,高并发环境会 导致有 大量的 线程处于 等待状态,进而导致 响应延迟, 为了解决这些问题,我们需要了解 Hystrix 的请求合并。 请求合并的缺点
设置请求合并之后,本来一个请求可能 5ms 就搞定了,但是现在必须再等 10ms 看看还有没有其他的请求一起的,这样一个请求的耗时就从 5ms 增加到 15ms 了,不过,如果我们要发起的命令本身就是一个高延迟的命令,那么这个时候就可以使用请求合并了,因为这个时候时间窗的时间消耗就显得微不足道了,另外高并发也是请求合并的一个非常重要的场景。
服务提供者接口
需在服务提供者中提供 批处理接口 调用,如下
@GetMapping("/{ids}") public List<User> getUserById(@PathVariable String ids) { System.out.println("getUserById" + ids); String[] split = ids.split(","); List<User> users = new ArrayList<>(); for (String s:split) { User u = new User(); u.setId(Integer.parseInt(s)); users.add(u); } return users; }服务消费者
UserService
首先在BookService中添加方法用来调用服务提供者提供的接口,如下
public List<User> getUserByIds(List<Integer> ids){ User[] users = restTemplate.getForObject("http://provider/user/{1}", User[].class, StringUtils.join(ids, ",")); return Arrays.asList(users); }UserBatchCommand
public class UserBatchCommand extends HystrixCollapser<List<User>,User,Integer> { private Integer id; private UserService userService; public UserBatchCommand(Integer id, UserService userService) { super(HystrixCollapser.Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("")). andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100))); this.id = id; this.userService = userService; } @Override public Integer getRequestArgument() { return id; } //请求合并,具体的合并操作在这进行 @Override protected HystrixCommand<List<User>> createCommand(Collection<CollapsedRequest<User, Integer>> collection) { List<Integer> ids = new ArrayList<>(collection.size()); for (CollapsedRequest<User, Integer> request : collection) { ids.add(request.getArgument()); } UserCommand userCommand = new UserCommand(userService,ids); return userCommand; } //映射请求 响应到不同的请求上 @Override protected void mapResponseToRequests(List<User> users, Collection<CollapsedRequest<User, Integer>> collection) { int count = 0; for (CollapsedRequest<User, Integer> request : collection) { request.setResponse(users.get(count++)); } } }1.构造方法中,设置了请求时间隔为 100ms,即请求时间 间隔在100ms之内的请求会被合并为一个请求 2.createCommand方法用来 合并请求,在这获取 各个请求的id ,将单个的id放到一个集合中,再创建出 UserBatchCommand对象,用该对象去发起一个批量请求 3.mapResponseToRequests方法用来 为每个请求设置请求结果。方法的第一个参数batchResponse表示批处理请求的结果,第二个参数collapsedRequests代表 每一个被合并的请求, 然后通过 遍历batchResponse来为collapsedRequests设置请求结果
测试
@GetMapping("/test1") public void test1() throws ExecutionException, InterruptedException { HystrixRequestContext ctx = HystrixRequestContext.initializeContext(); UserBatchCommand cmd1 = new UserBatchCommand(1, userService); UserBatchCommand cmd2 = new UserBatchCommand(2, userService); UserBatchCommand cmd3 = new UserBatchCommand(3, userService); Future<User> q1 = cmd1.queue(); Future<User> q2 = cmd2.queue(); Future<User> q3 = cmd3.queue(); User user1 = q1.get(); User user2 = q2.get(); User user3 = q3.get(); System.out.println(user1); System.out.println(user2); System.out.println(user3); Thread.sleep(3000); UserBatchCommand cmd4 = new UserBatchCommand(4, userService); Future<User> q4 = cmd4.queue(); User user4 = q4.get(); System.out.println(user4); ctx.close(); }1.首先要初始化HystrixRequestContext 2.创建BookCollapseCommand类的实例来发起请求,先发送3个请求,然后睡眠3秒钟,再发起1个请求,这样,前3个请求就会被合并为一个请求,第四个请求因为间隔的时间比较久,所以不会被合并,而是单独创建一个线程去处理
通过注解实现请求合并
UserService中添加方法
@Service public class UserService { @Autowired RestTemplate restTemplate; @HystrixCollapser(batchMethod = "getUserByIds", collapserProperties = { //请求时间间隔在 100ms 之内的请求会被合并为一个请求,默认为 10ms @HystrixProperty(name = "timerDelayInMilliseconds",value = "100"), //设置触发批处理执行之前,在批处理中允许的最大请求数。 @HystrixProperty(name = "maxRequestsInBatch",value = "200") }) public Future<User> test(Integer id){ return null; } @HystrixCommand public List<User> getUserByIds(List<Integer> ids){ User[] users = restTemplate.getForObject("http://provider/user/{1}", User[].class, StringUtils.join(ids, ",")); return Arrays.asList(users); } }在test方法上添加@HystrixCollapser注解实现请求合并,用batchMethod属性指明请求合并后的处理方法,collapserProperties属性指定其他属性 测试
@GetMapping("/test2") public void test2() throws ExecutionException, InterruptedException { HystrixRequestContext ctx = HystrixRequestContext.initializeContext(); Future<User> q1 = userService.test(1); Future<User> q2 = userService.test(2); Future<User> q3 = userService.test(3); User user1 = q1.get(); User user2 = q2.get(); User user3 = q3.get(); System.out.println(user1); System.out.println(user2); System.out.println(user3); Thread.sleep(2000); Future<User> q4 = userService.test(4); User user4 = q4.get(); System.out.println(user4); ctx.close(); }@HystrixCollapser
参数作用/默认值备注batchMethod合并请求的方法scope请求方式/REQUEST请求方式/REQUESTREQUEST范围只对一个request请求内的多次服务请求进行合并GLOBAL是多单应用中的所有线程的请求中的多次服务请求合并timerDelayInMilliseconds请求时间间隔在10ms之内的请求会被合并为一个请求/10ms建议尽量设置的小一点,如果并发量不大的话,其实也没有必要使用HystrixCollapser来处理maxRequestsInBatch设置触发批处理执行之前,在批处理中允许的最大请求数/Integer.MAX_VALUE请求合并的优点已经看到了,多个请求被合并为一个请求进行一次性处理, 可以有效节省网络带宽和线程池资源, 但是,有优点必然也有缺点,设置请求合并之后,本来一个请求可能5ms就搞定了, 但是现在必须再等10ms看看还有没有其他的请求一起的,这样一个请求的耗时就从5ms增加到15ms了, 不过,如果我们要发起的命令本身就是一个高延迟的命令, 那么这个时候就可以使用请求合并了,因为这个时候时间窗的时间消耗就显得微不足道了, 另外高并发也是请求合并的一个非常重要的场景
限制调用分布式服务的资源使用,某一个调用的服务出现问题不会影响其他服务调用。 隔离又分为线程池隔离和信号量隔离
概念介绍 通过上图来看,线程池隔离的作用还是蛮明显的。 但线程池隔离的使用也不是在任何场景下都适用的,线程池隔离的优缺点如下:
优点:
使用线程池隔离可以完全隔离依赖的服务(如图中的A,B,C服务),请求线程可以快速返回当线程池出现问题时,线程池隔离是独立的不会影响其他服务和接口当失败的服务再次变得可用时,线程池将清理并可立即修复,而不是需要一个长时间的恢复独立的线程池提高了并发性缺点: 线程池隔离的主要缺点是它们 增加计算机开销(CPU),每个命令的执行 涉及到 排队,调度 和 上下文切换都是在 一个单独的 线程上运行的
线程池隔离
@HystrixCommand(groupKey="ego-product-provider", commandKey = "getUsers", threadPoolKey="ego-product-provider", threadPoolProperties = { @HystrixProperty(name = "coreSize", value = "30"),//线程池大小 @HystrixProperty(name = "maxQueueSize", value = "100"),//最大队列长度 @HystrixProperty(name = "keepAliveTimeMinutes", value = "2"),//线程存活时间 @HystrixProperty(name = "queueSizeRejectionThreshold", value = "15")//拒绝请求 }, fallbackMethod = "fallback")线程池隔离参数
参数作用默认值备注groupKey服务名(相同服务用一个名称,如用户,商品等)getClass().getSimpleName();在consumer里 为每个propvider,设置group标识,一个group使用一个线程池commandKey接口(服务下面的接口,如购买商品)当前执行方法名consumer的接口名称threadPoolKey线程池的名称默认是分组名groupKey配置全局唯一标识线程池的名称,相同线程池名称的线程池是同一个coreSize线程池大小(最大的并发执行数量)10设置标准:每秒最大支撑的请求数(99%平均响应时间+一个缓冲值)比如:每秒能处理1000个请求,99%的请求响应时间是60ms,公式就是:1000*(0.060+0.012)maxQueueSize最大队列长度(设置BlockingQueue的最大长度)-1如果使用正数,将从同步队列(SynchronousQueue)改为阻塞队列(LinkedBlockingQueue)keepAliveTimeMinutes线程存活时间1分钟(单位)控制一个线程从实用完成到被释放的时间queueSizeRejectionThreshold拒绝请求(设置拒绝请求的临界值)5此属性不适于maxQueueSize= -1时原因是maxQueueSize值运行时不能改变,可通过修改这个变量动态修改允许排队的长度信号量隔离
信号量隔离其实就是我们定义的队列并发时最多支持多大的访问,其他的访问通过托底数据来响应,如下结构图
@HystrixCommand(fallbackMethod = "fallback", commandProperties = { @HystrixProperty(name= HystrixPropertiesManager.EXECUTION_ISOLATION_STRATEGY,value="SEMAPHORE"),// 信号量 隔离 @HystrixProperty (name=HystrixPropertiesManager.EXECUTION_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS, value="100")//信号量最大并度 })信号量隔离参数
参数作用默认值备注execution.isolation.strategy隔离策略配置项THREAD只有2种,THREAD和SEMAPHOREexecution.isolation.thread.timeoutInMilliseconds超时时间1000ms1.THREAD模式下,达到超时时间,自动中断2.SEMAPHONE模式下,会等待执行完成后,再去判断是否超时execution.isolation.thread.interruptOnTimeout是否打开超时线程中断TRUETHREAD模式有效execution.isolation.semaphore.maxConcurrentRequests信息量最大并发度10SEMAPHONE模式有效fallback.isolation.semaphore.maxConcurrentRequestsfallback最大并发度10SEMAPHONE模式有效线程池和信息量的区别
线程池隔离信息量隔离线程请求线程和调用provider线程不是同一条线程请求线程和调用provider线程是同一条线程开销排队,调度,上下文开销等无线程切换,开销低异步支持不支持并发支持支持(最大线程池大小)支持(最大信息量上限)传递Header无法传递http Header可以传递http Header支持超时支持不支持什么情况下用线程池隔离? 请求 并发量大,且 耗时长(请求耗时长一般是计算量大,或读数据库):采用线程隔离策略, 可以保证大量的 容器(tomcat)线程可用,不会由于服务原因,一直处于阻塞或等待状态,快速失败返回
什么情况下用信息量隔离? 请求并发量大,且 耗时短(请求耗时短 可能是计算量小,或读缓存):采用信息量隔离策略, 因为这类服务的返回通常会非常快,不会占用容器线程太长时间, 而且也减少了线程切换的一些开销,提高了缓存服务的效率
Feign是一种声明式、模板化的HTTP客户端(仅在 consumer 中使用)。
什么是声明式,有什么作用,解决什么问题? 声明式调用就像调用本地方法一样调用远程方法;无感知远程 http 请求。
1.Spring Cloud 的声明式调用, 可以做到使用 HTTP 请求远程服务时能就像调用本地方法一样的体验, 开发者完全感知不到这是远程方法,更感知不到这是个 HTTP 请求。
2.它像 Dubbo 一样,consumer 直接调用接口方法调用 provider, 而不需要通过常规的Http Client 构造请求再解析返回数据。
3.它解决了让开发者调用远程接口就跟调用本地方法一样, 无需关注与远程的交互细节,更无需关注分布式环境开发。
1.创建普通Spring Boot工程 2.添加依赖 添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> <version>2.2.0.M3</version> </dependency> 属性配置 # 服务名 spring.application.name=feign # 设置端口 server.port=5000 #设置服务注册中心地址,指向另一个注册中心 eureka.client.service-url.defaultZone=http://127.0.0.1:8761/eureka4.添加注解 在工程的入口类上添加@EnableFeignClients注解表示开启Spring Cloud Feign的支持功能
@SpringBootApplication @EnableFeignClients //开启Feign public class FeignApplication { public static void main(String[] args) { SpringApplication.run(FeignApplication.class, args); } }5.声明服务 定义一个HelloService接口,通过@FeignClient注解来指定服务名进而绑定服务,然后再通过SpringMVC中提供的注解来绑定服务提供者提供的接口
@FeignClient("provider") public interface HelloService { @GetMapping("/user/") User getUserById(@RequestParam("id") Integer id); }服务提供者提供的接口
@RestController @RequestMapping("/user") public class UserController { @GetMapping("/") public User getUserById(Integer id) { User user = new User(); user.setId(id); System.out.println("getUserById :" + id); return user; } }6.Controller中调用服务
@RestController public class UserController { @Autowired private UserService userService; @GetMapping("/test1") public void test1(){ User userById = userService.getUserById(1); System.out.println(userById); } }测试
Feign的继承特性 服务提供者另一种写法,出了错误只用看一边,但是代码间耦合性增加了
创建公共接口
@RequestMapping("/user") public interface IUserService { @GetMapping("/") public User getUserById(@RequestParam("id") Integer id); @PostMapping("/addUser") public User addUser(@RequestBody User user); @DeleteMapping("/{id}") public void deleteUserById(@PathVariable("id") Integer id) ; @PutMapping("/") public User updateUser(@RequestBody User user); @GetMapping("/{ids}") public List<User> getUserById(@PathVariable("ids") String ids); }服务提供者中实现接口
方法的实现还是和上文的一样。不同的是这里不需要在方法上面添加@RequestMapping注解, 这些注解在父接口中都有,不过在Controller上还是要添加@RestController注解,另外需要注意的是, 方法中的参数@RequestHeader和@RequestBody注解还是要添加,@RequestParam注解可以不添加。
@RestController public class UserController implements IUserService { @Override public User getUserById(Integer id) { User user = new User(); user.setId(id); System.out.println("getUserById :" + id); return user; } @Override public User addUser(@RequestBody User user) { user.setUsername("阿猫"); user.setAddress("深圳"); return user; } @Override public void deleteUserById(@PathVariable Integer id) { System.out.println("deleteUserById :" + id); } @Override public User updateUser(@RequestBody User user) { System.out.println("updateUser " + user); return user; } @Override public List<User> getUserById(@PathVariable String ids) { System.out.println("getUserById" + ids); String[] split = ids.split(","); List<User> users = new ArrayList<>(); for (String s:split) { User u = new User(); u.setId(Integer.parseInt(s)); users.add(u); } return users; } }服务消费者中继承接口
这个接口中不需要添加任何方法,方法都在父接口中,这里只需要在类上面添加@FeignClient(“provider”)注解来绑定服务即可
@FeignClient("provider") public interface UserService extends IUserService { }API网关是一个更为智能的应用服务器,它有点类似于我们微服务架构系统的门面, 所有的外部访问都要先经过API网关,然后API网关来实现请求路由、负载均衡、权限验证等功能。 Spring Cloud中提供的Spring Cloud Zuul实现了API网关的功能 1.创建Spring Boot工程并添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-zuul</artifactId> </dependency>2.添加注解 入口类上添加@EnableZuulProxy注解表示开启Zuul的API网关服务功能
@SpringBootApplication @EnableZuulProxy public class ZuulApplication { public static void main(String[] args) { SpringApplication.run(ZuulApplication.class, args); } }3.配置路由规则
# 基础信息配置 spring.application.name=zuul # API网关也将作为一个服务注册到eureka-server上 eureka.client.service-uri.defaultZone=http://127.0.0.1:1111/eureka测试
代理的路径 就是服务注册的名字
# 路由规则配置 zuul.routes.i.path=/i/** zuul.routes.i.service-id=provider定制路径 配置了路由规则所有符合/i/**的请求都将被转发到provider服务上,至于provider服务的地址到底是什么则由eureka-server去分析,我们这里只需要写上服务名即可。 以上面的配置为例,如果我请求http://localhost:8761/i/hello1接口则相当于请求http://localhost:8761/hello1(我这里provider的地址为http://localhost:8761),我们在路由规则中配置的i是路由的名字,可以任意定义,但是一组path和serviceId映射关系的路由名要相同。
简化写法
zuul.routes.feign=/aaa/**feign表示服务名
如果不想做映射,可以加上,忽略
zuul.ignored-services=hystrix如果有多个服务要排除,服务名称通过","连接
zuul.ignored-services=hystrix,feign由于服务太多,不可能手工一个个加,故路由排除所有服务,然后针对要 路由的服务进行手工加
# 先忽略所有的请求 zuul.ignored-services=* # 然后单独放开e-book-order的服务 zuul.routes.e-book-order.path=/i/**也可以通过排除指定关键字的路径
# 排除所有含有findAll关键字的请求 zuul.ignored-patterns=/**/findAll/** # 放开e-book-order服务,同时也会排除 findAll的请求 zuul.routes.e-book-order.path=/bobo/**指定路由前缀
也就是给提前的url添加一个前缀
zuul.prefix=/i zuul.routes.e-book-product.path=/product-provider/**请求过滤有点类似于Java中Filter过滤器,先将所有的请求拦截下来,然后根据现场情况做出不同的处理
定义过滤器 继承自ZuulFilter
@Component public class PermissFilter extends ZuulFilter { //返回值为过滤器的类型,过滤器的类型决定了过滤器在哪个生命周期执行,pre表示在路由之前执行过滤器,其他可选值还有post、error、route和static,也可自定义 @Override public String filterType() { return "pre"; } //过滤器的优先级,执行顺序 @Override public int filterOrder() { return 0; } //用来判断过滤器是否执行 @Override public boolean shouldFilter() { return true; } //表示过滤的具体逻辑 @Override public Object run() throws ZuulException { //获取当前上下文 RequestContext ctx = RequestContext.getCurrentContext(); //获取当前用户的请求 HttpServletRequest request = ctx.getRequest(); String username = request.getParameter("username"); String password = request.getParameter("password"); if(!"i".equals(username) || !"123".equals(password)){ ctx.setSendZuulResponse(false); //表示不对该请求进行路由 ctx.setResponseStatusCode(401); //设置响应码 ctx.setResponseBody("非法请求"); //设置响应值 ctx.addZuulResponseHeader("content-type","text/html;charset=utf-8"); //添加头信息,防止乱码 } return null; } }1.filterType方法的返回值为过滤器的类型,过滤器的类型决定了过滤器在哪个生命周期执行, pre表示在路由之前执行过滤器,其他可选值还有post、error、route和static,当然也可以自定义。 2.filterOrder方法表示过滤器的执行顺序,当过滤器很多时,这个方法会有意义。 3.shouldFilter方法用来判断过滤器是否执行,true表示执行,false表示不执行,在实际开发中,我们可以根据当 前请求地址来决定要不要对该地址进行过滤,这里直接返回true。 4.run方法则表示过滤的具体逻辑,假设请求地址中携带了login参数的话,则认为是合法请求, 否则就是非法请求,如果是非法请求的话,首先设置ctx.setSendZuulResponse(false); 表示不对该请求进行路由,然后设置响应码和响应值。 这个run方法的返回值在当前版本(Dalston.SR3)中暂时没有任何意义,可以返回任意值。
测试
API网关作为系统的的统一入口,将微服务中的内部细节都屏蔽掉了,而且能够自动的维护服务实例,实现负载均衡的路由转发,同时,它提供的过滤器为所有的微服务提供统一的权限校验机制,使得服务自身只需要关注业务逻辑即可
随着我们的分布式项目越来越大,我们可能需要将配置文件抽取出来单独管理,Spring Cloud Config对这种需求提供了支持。 Spring Cloud Config为分布式系统中的外部配置提供服务器和客户端支持。 我们可以使用Config Server在所有环境中管理应用程序的外部属性,Config Server也称为分布式配置中心,本质上它就是一个独立的微服务应用,用来连接配置仓库并将获取到的配置信息提供给客户端使用;客户端就是我们的各个微服务应用,我们在客户端上指定配置中心的位置,客户端在启动的时候就会自动去从配置中心获取和加载配置信息。 Spring Cloud Config可以与任何语言运行的应用程序一起使用。服务器存储后端的默认实现使用git,因此它轻松支持配置信息的版本管理,当然我们也可以使用Git客户端工具来管理配置信息。
构建配置中心
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-server</artifactId> </dependency>入口类上添加@EnableConfigServer注解,表示开启配置中心服务端功能
@SpringBootApplication @EnableConfigServer public class ConfigserverApplication { public static void main(String[] args) { SpringApplication.run(ConfigserverApplication.class, args); } }在application.properties中配置一下git仓库的信息,这里就不自己搭建git服务端了,直接使用GitHub(当然也可以使用码云),这里需要我首先在我的Github上创建一个名为scConfig的项目,创建好之后,再做如下配置:
spring.application.name=configserver server.port=2007 spring.cloud.config.server.git.uri=https://github.com/113XXXX/cloudconfig.git spring.cloud.config.server.git.username=1135XXXX@qq.com spring.cloud.config.server.git.password=200XXXX spring.cloud.config.server.git.search-paths=client1.服务名 2.端口 3.uri表示配置中心所在仓库的位置 4.search-paths表示仓库下的子目录 5.username表示你的GitHub用户名 6.password表示你的GitHub密码
构建配置仓库
需要在github上设置好配置中心,首先在本地建一个空文件夹,在该文件夹中创建一个文件夹叫configRepo,然后在configRepo中创建四个配置文件,如下:
如此之后,配置文件就上传到GitHub上了。此时启动我们的配置中心,通过/{application}/{profile}/{label}就能访问到我们的配置文件了,其中application表示配置文件的名字,对应我们上面的配置文件就是app,profile表示环境,我们有dev、test、prod还有默认,label表示分支,默认我们都是放在master分支上,我们在浏览器上访问结果如下 客户端配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> </dependency>创建bootstrap.properties文件,来获取配置信息,注意这些信息一定要放在bootstrap.properties文件中才有效
spring.application.name=client1 server.port=2008 spring.cloud.config.uri=http://127.0.0.1:2007 spring.cloud.config.label=master spring.cloud.config.profile=dev这里的name对应了配置文件中的application部分,profile对应了profile部分,label对应了label部分,uri则表示配置中心的地址。配置完成之后创建一个测试Controller
@RestController public class HelloController { @Value("${i}") String i; @GetMapping("hello") public String hello(){ return i; } }测试
默认情况下我们的JRE中自带了JCE(Java Cryptography Extension),但是默认是一个有限长度的版本,我们这里需要一个不限长度的JCE,这个JCE我们可以直接百度然后在Oracle官网下载,下载之后解压,我们可以看到如下三个文件:
我们需要将这里的两个jar包拷贝到我们的jdk安装目录下,\jre\lib\security,覆盖该目录下原有的文件
对称加解密
对称加解密比较简单,直接配置密钥就可以了,在我们前文创建出来的config-server中配置密钥,但是注意这个密钥需要配置在bootstrap.properties中,另外这里还有非常重要一点:Spring Cloud的Dalston.SR3和Dalston.SR2版本在这个问题上是有BUG的,如果用这两个版本在这里测试会没有效果,应该避开使用这两个版本,我这里使用的是Dalston.SR4版本
encrypt.key=123配置完成之后,启动我们的config-server工程,然后访问如下地址http://localhost:2007/encrypt/status,如果看到如下访问结果,表示环境搭建成功了:
此时我们就可以通过第三方工具如POSTMAN、RestClient等来访问/encrypt和/decrypt接口,比如说我要给123这个字符加密,方式如下(我这里以POSTMAN为例,注意是POST请求): 配置文件的值如果是以{cipher}开头,表示该值是一个加密字符,配置中心config-server在获取到这个值之后会先对值进行解密,解密之后才会返回给客户端使用
服务化配置中心 在config-client中配置config-server地址的时候都是直接将地址写死,这种方式显然不够灵活,这里可以结合eureka注册中心,然后在配置的时候直接使用服务名即可
config-server改造
服务端改造和客户端改造都是分三步走:1.添加依赖;2.修改application.properties.
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> eureka.client.service-url.defaultZone=http://127.0.0.1:8761/eurekaconfig-client改造
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> spring.application.name=client1 server.port=2008 # spring.cloud.config.uri=http://127.0.0.1:2007 spring.cloud.config.label=master spring.cloud.config.profile=test eureka.client.service-url.defaultZone=http://127.0.0.1:8761/eureka spring.cloud.config.discovery.enabled=true spring.cloud.config.discovery.service-id=configserver解释: 1.eureka.client.service-url.defaultZone设置了注册中心的地址,将config-client注册到eureka注册中心去 2.spring.cloud.config.discovery.enabled表示开启 通过服务名来访问config-server 3.spring.cloud.config.discovery.service-id=configserver表示config-server的服务名
失败快速响应
不作任何额外配置的情况下,失败响应有点迟钝,举个简单的例子,关掉config-server,我们直接启动config-client,此时启动会报错,但是报错时间较晚,报错的时候系统已经打印了许多启动日志了,如果我们希望在启动失败时能够快速响应,方式很简单,config-client中添加如下配置即可:
spring.cloud.config.fail-fast=true此时不启动config-server直接启动config-client依然会报错,但是我们看到报错时间较早,系统都没打印几条启动日志。
重试机制 如果由于网络抖动等原因导致config-client在启动时候访问config-server没有访问成功从而报错,遇到这种情况我们希望config-client最好能重试几次,重试机制在这里也是受支持的,引入依赖
<dependency> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId> </dependency> # 配置重试次数,默认为6 spring.cloud.config.retry.max-attempts=10 # 初始化重试间隔时间,默认 1000ms spring.cloud.config.retry.initial-interval=1000 # 最大间隔时间,默认 2000ms spring.cloud.config.retry.max-interval=3000 # 间隔乘数,默认1.1 spring.cloud.config.retry.multiplier=1.2动态刷新配置 有的时候,动态的更新了Git仓库中的配置文件,那如何让我的config-client能够及时感知到呢?首先在config-client中添加如下依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>该依赖中包含了/refresh端点的实现,我们将利用这个端点来刷新配置信息。然后需要在application.properties中配置忽略权限拦截:
management.endpoints.web.exposure.include=*利用Git客户端工具,将app-dev.properties中的内容修改一下,修改成功之后,先用POST请求访问http://localhost:2008/refresh地址
Spring Cloud Bus也是微服务架构系统中的必备组件。 Spring Cloud Bus可以将分布式系统的节点与轻量级消息代理链接,然后可以实现广播状态更改(例如配置更改)或广播其他管理指令。 Spring Cloud Bus就像一个分布式执行器,用于扩展的Spring Boot应用程序,但也可以用作应用程序之间的通信通道。 消息代理,Spring Cloud Bus支持RabbitMQ和Kafka
RabbitMQ安装配置
docker run -d --hostname my-rabbit --name some-rabbit -P rabbitmq:3-management账号密码一致 当我的微服务A/微服务B启动的时候,会从Config-Server中加载配置文件,而Config-Server则会通过git clone命令将配置中心的配置文件先clone下来在本地保存一份,然后再返回给微服务A/微服务B。 这是之前的工作流程,现在结合Spring Cloud Bus来实现配置文件的动态更新。 使用Spring Cloud Bus来实现配置文件的动态更新原理,如上图,当我的配置文件更新后,我向Config-Server中发送一个/bus/refresh请求,Config-Server收到这个请求之后,会将这个请求广播出去,这样所有的 微服务就都收到这个请求了,微服务收到这个请求之后就会 自动去更新自己的 配置文件。 在这个系统中,从RabbitMQ的角度来看,所有的微服务都是一样的,所以这个/bus/refresh请求我也可以在 微服务节点上发出,一样能够实现 配置文件动态更新的效果,但是这样做就 破坏了 微服务的结构,使得 微服务节点之间有了区别,所以 刷新配置的请求还是放在Config-Server上来做比较合适
1.工程创建 添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-bus</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>2.属性配置 在application.properties中配置RabbitMQ的连接信息
spring.rabbitmq.port=32771 spring.rabbitmq.host=192.168.230.176 spring.rabbitmq.username=guest spring.rabbitmq.password=guest3.配置消息队列Bean
@SpringBootApplication public class RabbitmqApplication { public static void main(String[] args) { SpringApplication.run(RabbitmqApplication.class, args); } @Bean Queue queue(){ return new Queue("i.com"); } }4.创建消息生产者
@SpringBootTest class RabbitmqApplicationTests { @Autowired AmqpTemplate amqpTemplate; @Test void contextLoads() { amqpTemplate.convertAndSend("i.com","hello cloud bus"); } }5.创建消息消费者
@Component public class MessageConsumer { @RabbitListener(queues = "i.com") public void receive(String msg) { System.out.println(msg); } }
Spring Cloud Stream是一个构建消息驱动的微服务框架。 它构建在Spring Boot之上用以创建工业级的应用程序,并且通过Spring Integration提供了和消息代理的连接。 Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现(目前仅支持RabbitMQ和Kafka),同时引入了发布订阅、消费组和分区的语义概念 1.创建工程
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>2.创建接收器
@EnableBinding(Sink.class) public class MySink { @StreamListener(Sink.INPUT) public void receive(String msg) { System.out.println(msg); } }使用了@EnableBinding注解实现对消息通道的绑定,在该注解中还传入了一个参数Sink.class,Sink是一个接口,该接口是Spring Cloud Stream中默认实现的对输入消息通道绑定的定义。 然后在SinkReceiver类中定义了receive方法,并在该方法上添加了@StreamListener注解,该注解表示该方法为 消息中间件上 数据流的 事件监听器,Sink.INPUT参数表示这是input消息通道上的监听处理器
消息分组
由于我们的服务可能会有多个实例同时在运行,如果不做任何设置,此时发送一条消息将会被所有的实例接收到,但是有的时候我们可能只希望消息被一个实例所接收,这个需求我们可以通过消息分组来解决。 方式很简单,给项目配置消息组和主题,如下:
spring.cloud.stream.bindings.mychannel-input.destination=i spring.cloud.stream.bindings.mychannel-output.destination=i spring.cloud.stream.bindings.mychannel-input.group=i spring.cloud.stream.bindings.mychannel-output.group=i这里我们设置该工程都属于 i 消费组,输入通道的主题名则为 i 。这里配置完成之后,我们在消息发送方做如下配置:
spring.cloud.stream.bindings.mychannel-output.destination=i也配置消息主题名为 i (如果发送和接收就在同一个应用中,则这里可以不配置)。 此时我们将我们的项目启动两个实例,注意两个实例的端口不一样,此时如果我们再发送消息, 则只会被两个实例中的一个接收到,另外一个应用则接收不到,但是到底是两个实例中的哪一个接收,则是随机的
消息分区 有的时候,我们可能需要相同特征的消息能够总是被发送到同一个消费者上去处理,如果我们只是单纯的使用消费组则无法实现功能,此时我们需要借助于消息分区,消息分区之后,具有相同特征的消息就可以总是被同一个消费者处理了,配置方式如下(这里的配置都是在消费组的配置基础上完成的):
消费者上添加如下配置:
# 消息输入通道配置 spring.cloud.stream.bindings.mychannel-input.consumer.partitioned=true spring.cloud.stream.instance-count=2 spring.cloud.stream.instance-index=0 第一行 表示开启 消息分区第二行 表示当前消费者 的总的实例个数第三行 表示当前实例的索引,从 0 开始,当开启多实例时,需要在 启动时 在命令行配置索引消息生产者上添加如下配置:
# 消息输出通道配置 spring.cloud.stream.bindings.mychannel-output.producer.partition-key-expression=payload spring.cloud.stream.bindings.mychannel-outpot.producer.partition-count=2 第一行 配置了 分区键的表达式规则第二行 设置了 消息分区数量此时再次启动多个消费者实例,然后重复发送多条消息,这些消息都将被同一个消费者处理掉