CountDownLatch和okhttp3 实现多线程异步http调用

mac2022-06-30  96

一、在工作中需要多次调用第三方接口,或者其他耗时任务,这个时候为了提高返回结果的速度,可以利用countDownLatch来实现多线程调用,提高返回结果的时间。

(1) 由于代码较多,这里先将实现逻辑由伪代码,先行演示一遍。

//先定义需要异步调用的次数,每次调用 CountDownLatch.countDown(); //等最慢的一次完成调用,封装在list对象里面,这个时候 CountDownLatch 减为0继续往下进行 @Autowired private AsyncTask asyncTask; CountDownLatch countDownLatch = new CountDownLatch(4); List<TbOrder> tbOrders = new ArrayList<>(); for (int i = 1; i < 5; i++) { asyncTask.doTaskOne(tbOrders, i,countDownLatch); } countDownLatch.await(10, TimeUnit.SECONDS);

异步伪代码

@EnableAsync @Component public class AsyncTask { private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Autowired private HttpClient httpClient; @Async public void doTaskOne(List<TbOrder> users, int i, CountDownLatch countDownLatch) throws Exception { List<TbOrder> orders=doPost(); users.addl(orders); ccountDownLatch.countDown(); } }

(2)下面是具体的实现 (下面是需要用到maven依赖)

<dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> <version>3.10.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency>

(关于okhttp3 连接的配置)

ok.http.connect-timeout=30 ok.http.read-timeout=30 ok.http.write-timeout=30 # 连接池中整体的空闲连接的最大数量 ok.http.max-idle-connections=200 # 连接空闲时间最多为 300 秒 ok.http.keep-alive-duration=300

(关于okhttp3 初始化OkHttpClient 的配置)

@Configuration public class OkHttpConfiguration { @Value("${ok.http.connect-timeout}") private Integer connectTimeOut; @Value("${ok.http.read-timeout}") private Integer readTimeOut; @Value("${ok.http.write-timeout}") private Integer writeTimeOut; @Value("${ok.http.max-idle-connections}") private Integer maxIdleConnections; @Value("${ok.http.keep-alive-duration}") private Long keepAliveDuration; @Bean public X509TrustManager x509TrustManager(){ return new X509TrustManager() { @Override public void checkClientTrusted(X509Certificate[] x509Certificates, String authType) throws CertificateException { } @Override public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { } @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } }; } @Bean @Primary public SSLSocketFactory sslSocketFactory(){ try { //信任任何连接 SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(null, new TrustManager[]{x509TrustManager()}, new SecureRandom()); return sslContext.getSocketFactory(); } catch (NoSuchAlgorithmException | KeyManagementException e) { e.printStackTrace(); } return null; } @Bean public ConnectionPool pool(){ return new ConnectionPool(maxIdleConnections, keepAliveDuration, TimeUnit.SECONDS); } @Bean public OkHttpClient okHttpClient(){ return new OkHttpClient().newBuilder() .sslSocketFactory(sslSocketFactory(), x509TrustManager()) //是否开启缓存 .retryOnConnectionFailure(false) .connectionPool(pool()) .connectTimeout(connectTimeOut, TimeUnit.SECONDS) .readTimeout(readTimeOut,TimeUnit.SECONDS) .writeTimeout(writeTimeOut,TimeUnit.SECONDS) .hostnameVerifier((hostname, session) -> true) // 拦截器 // .addInterceptor() .build(); } }

(初始化http的操作客户端)

@Component public class HttpClient { private final Logger logger = LoggerFactory.getLogger(HttpClient.class); private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8"); private static final MediaType XML = MediaType.parse("application/xml; charset=utf-8"); @Autowired private OkHttpClient okHttpClient; /** * get 请求 * @param url 请求url地址 * @return string * */ public String doGet(String url) { return doGet(url, null, null); } /** * get 请求 * @param url 请求url地址 * @param params 请求参数 map * @return string * */ public String doGet(String url, Map<String, String> params) { return doGet(url, params, null); } /** * get 请求 * @param url 请求url地址 * @param headers 请求头字段 {k1, v1 k2, v2, ...} * @return string * */ public String doGet(String url, String[] headers) { return doGet(url, null, headers); } /** * get 请求 * @param url 请求url地址 * @param params 请求参数 map * @param headers 请求头字段 {k1, v1 k2, v2, ...} * @return string * */ public String doGet(String url, Map<String, String> params, String[] headers) { StringBuilder sb = new StringBuilder(url); if (params != null && params.keySet().size() > 0) { boolean firstFlag = true; for (String key : params.keySet()) { if (firstFlag) { sb.append("?").append(key).append("=").append(params.get(key)); firstFlag = false; } else { sb.append("&").append(key).append("=").append(params.get(key)); } } } Request.Builder builder = new Request.Builder(); if (headers != null && headers.length > 0) { if (headers.length % 2 == 0) { for (int i = 0; i < headers.length; i = i + 2) { builder.addHeader(headers[i], headers[i + 1]); } } else { logger.warn("headers's length[{}] is error.", headers.length); } } Request request = builder.url(sb.toString()).build(); logger.info("do get request and url[{}]", sb.toString()); return execute(request); } /** * post 请求 * @param url 请求url地址 * @param params 请求参数 map * @return string */ public String doPost(String url, Map<String, String> params) { FormBody.Builder builder = new FormBody.Builder(); if (params != null && params.keySet().size() > 0) { for (String key : params.keySet()) { builder.add(key, params.get(key)); } } Request request = new Request.Builder().url(url).post(builder.build()).build(); logger.info("do post request and url[{}]", url); return execute(request); } /** * post 请求, 请求数据为 json 的字符串 * @param url 请求url地址 * @param json 请求数据, json 字符串 * @return string */ public String doPostJson(String url, String json) { logger.info("do post request and url[{}]", url); return exectePost(url, json, JSON); } /** * post 请求, 请求数据为 xml 的字符串 * @param url 请求url地址 * @param xml 请求数据, xml 字符串 * @return string */ public String doPostXml(String url, String xml) { logger.info("do post request and url[{}]", url); return exectePost(url, xml, XML); } private String exectePost(String url, String data, MediaType contentType) { RequestBody requestBody = RequestBody.create(contentType, data); Request request = new Request.Builder().url(url).post(requestBody).build(); return execute(request); } private String execute(Request request) { Response response = null; try { response = okHttpClient.newCall(request).execute(); if (response.isSuccessful()) { return response.body().string(); } } catch (Exception e) { logger.error(e.getMessage()); } finally { if (response != null) { response.close(); } } return ""; } }

(异步调用业务类)

@Autowired private AsyncTask asyncTask; @Override public List<TbOrder> getItems() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(4); List<TbOrder> tbOrders = Collections.synchronizedList(new ArrayList<TbOrder>()); for (int i = 1; i < 5; i++) { asyncTask.doTaskOne(tbOrders, i,countDownLatch); } countDownLatch.await(10, TimeUnit.SECONDS); return tbOrders; }

(异步实现调用第三方的逻辑)

@EnableAsync @Component public class AsyncTask { private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Autowired private HttpClient httpClient; @Async public void doTaskOne(List<TbOrder> users, int i, CountDownLatch countDownLatch) throws Exception { System.out.println("开始做任务"+countDownLatch.getCount()+formatter.format(LocalDateTime.now())); long start = System.currentTimeMillis(); Map<String, String> paramMap = new HashMap<>(); paramMap.put("pageNum", String.valueOf(i)); paramMap.put("pageSize", String.valueOf(10)); String jsonParam = JSON.toJSONString(paramMap); String s = httpClient.doPostJson("http://localhost:8080/order/all", jsonParam); PageInfo<TbOrder> pageInfo = JSON.parseObject(s,new TypeReference<PageInfo<TbOrder>>() {}); List<TbOrder> data = (List<TbOrder>) pageInfo.getData(); users.addAll(data); System.out.println("第" +i+ "次打印" ); countDownLatch.countDown(); long end = System.currentTimeMillis(); System.out.println("完成任务"+countDownLatch.getCount()+",耗时:" + (end - start) + "毫秒"+formatter.format(LocalDateTime.now())); } }

通过下面的打印结果,能够发现通过CountDownLatch实现异步调用,节约调用时间

开始做任务42019-10-02 00:39:03 开始做任务42019-10-02 00:39:03 开始做任务42019-10-02 00:39:03 开始做任务42019-10-02 00:39:03 2019-10-02 00:39:03.488 INFO 64092 --- [ task-1] com.yin.databaseproject.util.HttpClient : do post request and url[http://localhost:8080/order/all] 2019-10-02 00:39:03.488 INFO 64092 --- [ task-2] com.yin.databaseproject.util.HttpClient : do post request and url[http://localhost:8080/order/all] 2019-10-02 00:39:03.488 INFO 64092 --- [ task-3] com.yin.databaseproject.util.HttpClient : do post request and url[http://localhost:8080/order/all] 2019-10-02 00:39:03.488 INFO 64092 --- [ task-4] com.yin.databaseproject.util.HttpClient : do post request and url[http://localhost:8080/order/all] 第1次打印 第3次打印 完成任务2,耗时:306毫秒2019-10-02 00:39:03 完成任务3,耗时:306毫秒2019-10-02 00:39:03 第4次打印 完成任务1,耗时:306毫秒2019-10-02 00:39:03 第2次打印 完成任务0,耗时:349毫秒2019-10-02 00:39:03 开始做任务42019-10-02 00:42:58 开始做任务42019-10-02 00:42:58 2019-10-02 00:42:58.189 INFO 64092 --- [ task-7] com.yin.databaseproject.util.HttpClient : do post request and url[http://localhost:8080/order/all] 2019-10-02 00:42:58.190 INFO 64092 --- [ task-5] com.yin.databaseproject.util.HttpClient : do post request and url[http://localhost:8080/order/all] 开始做任务42019-10-02 00:42:58 开始做任务42019-10-02 00:42:58 2019-10-02 00:42:58.192 INFO 64092 --- [ task-6] com.yin.databaseproject.util.HttpClient : do post request and url[http://localhost:8080/order/all] 2019-10-02 00:42:58.191 INFO 64092 --- [ task-8] com.yin.databaseproject.util.HttpClient : do post request and url[http://localhost:8080/order/all] 第3次打印 第2次打印 第4次打印 完成任务3,耗时:124毫秒2019-10-02 00:42:58 完成任务1,耗时:122毫秒2019-10-02 00:42:58 完成任务2,耗时:122毫秒2019-10-02 00:42:58 第1次打印 完成任务0,耗时:125毫秒2019-10-02 00:42:58
最新回复(0)