websocket结合多线程实现请求一次,轮询推送数据

mac2024-03-18  25

maven导入依赖

<dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>4.3.12.RELEASE</version> </dependency>

java实现类

package com.tradeplatform.trade.order.websocket; import java.io.IOException; import java.util.Dictionary; import java.util.Hashtable; import java.util.concurrent.CopyOnWriteArraySet; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.socket.server.standard.SpringConfigurator; import com.alibaba.fastjson.JSONObject; import com.tradeplatform.common.utils.LoggerUtils; import com.tradeplatform.trade.order.service.TradeKlineService; /** * @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端, * 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端 */ @ServerEndpoint(value = "/webSocket/KlineWebSocket",configurator = SpringConfigurator.class) public class KlineWebSocket { private static LoggerUtils logger = LoggerUtils.getLogger(KlineWebSocket.class); //创建一个线程 // private KThread thread; //用来存放每个客户端对应的MyWebSocket对象。 private static CopyOnWriteArraySet<KlineWebSocket> webSocketSet = new CopyOnWriteArraySet<KlineWebSocket>(); //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; @Autowired private TradeKlineService tradeKlineService; //定义参数 private JSONObject params = new JSONObject(); private boolean start = false; private long sleep = 1000; private Thread th; private String symbol; private long marketId; private String resolution; /** * 连接建立成功调用的方法 * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据 */ @OnOpen public void onOpen(Session session) { webSocketSet.add(this); th = new Thread(new thread()); th.start(); } @OnMessage public void onMessage(String message, Session session) { System.out.println("来自客户端的消息:" + message); this.session = session; try { params = (JSONObject) JSONObject.parse(message); if(params.getBoolean("start")!=null) { start = params.getBoolean("start"); } if(params.getLong("sleep")!=null) { sleep = params.getLong("sleep"); } Object o = null; String req = params.getString("req"); if(req.equals("config")) { o = config(); this.session.getAsyncRemote().sendText(o.toString()); } } catch (Exception e) { } } class thread extends Thread{ @Override public void run() { while(true){ try { if(start) { sendMessage(session); } Thread.sleep(sleep); } catch (Exception e) { e.printStackTrace(); } } } } /** * 连接关闭调用的方法 */ @SuppressWarnings("deprecation") @OnClose public void onClose() { if (th!=null&&th.isAlive()) { th.stop(); } webSocketSet.remove(this); } public void sendMessage(Session session) throws IOException { Object o = history(symbol,marketId,resolution); this.session.getAsyncRemote().sendText(o.toString()); } /** * 发生错误时调用 * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { if (th!=null&&th.isAlive()) { th.stop(); } } public Object config() { Object jsonObject = "{\"supports_search\":true,\"supports_group_request\":false,\"supports_marks\":true,\"supports_timescale_marks\":true,\"supports_time\":true,\"exchanges\":[{\"value\":\"\",\"name\":\"All Exchanges\",\"desc\":\"\"},{\"value\":\"NasdaqNM\",\"name\":\"NasdaqNM\",\"desc\":\"NasdaqNM\"},{\"value\":\"NYSE\",\"name\":\"NYSE\",\"desc\":\"NYSE\"},{\"value\":\"NCM\",\"name\":\"NCM\",\"desc\":\"NCM\"},{\"value\":\"NGM\",\"name\":\"NGM\",\"desc\":\"NGM\"}],\"symbols_types\":[{\"name\":\"All types\",\"value\":\"\"},{\"name\":\"Stock\",\"value\":\"stock\"},{\"name\":\"Index\",\"value\":\"index\"}],\"supported_resolutions\":[\"1\",\"5\",\"15\",\"30\",\"60\",\"240\",\"360\",\"720\",\"1440\"]}"; return jsonObject; } public Object history(String symbol,long marketId, String resolution) { Object o = tradeKlineService.getTvLine(marketId,resolution); return o; } }

jsp页面调用

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE HTML> <html> <head> <title>My WebSocket</title> </head> <body> Welcome<br/> <input id="text" type="text" /><button onclick="send()">Send</button> <button onclick="closeWebSocket()">Close</button> <div id="message"> </div> </body> <script type="text/javascript"> var websocket = null; //判断当前浏览器是否支持WebSocket ,主要此处要更换为自己的地址 if('WebSocket' in window){ websocket = new WebSocket("ws://127.0.0.1:8007/trade/webSocket/KlineWebSocket"); } else{ alert('Not support websocket') } //连接发生错误的回调方法 websocket.onerror = function(){ setMessageInnerHTML("error"); }; //连接成功建立的回调方法 websocket.onopen = function(event){ setMessageInnerHTML("open"); } //接收到消息的回调方法 websocket.onmessage = function(event){ setMessageInnerHTML(event.data); } //连接关闭的回调方法 websocket.onclose = function(){ setMessageInnerHTML("close"); } //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 window.onbeforeunload = function(){ websocket.close(); } //将消息显示在网页上 function setMessageInnerHTML(innerHTML){ document.getElementById('message').innerHTML += innerHTML + '<br/>'; } //关闭连接 function closeWebSocket(){ websocket.close(); } //发送消息 function send(){ var message = document.getElementById('text').value; websocket.send(message); } </script> </html>
最新回复(0)