網(wǎng)站中英文轉(zhuǎn)換js網(wǎng)盤網(wǎng)頁版
前言
小編我將用CSDN記錄軟件開發(fā)求學(xué)之路上親身所得與所學(xué)的心得與知識,有興趣的小伙伴可以關(guān)注一下!
也許一個人獨行,可以走的很快,但是一群人結(jié)伴而行,才能走的更遠(yuǎn)!讓我們在成長的道路上互相學(xué)習(xí),讓我們共同進(jìn)步,歡迎關(guān)注!
針對websocket技術(shù)的金融alltick股票實戰(zhàn)經(jīng)驗,通過調(diào)用第三方wss的的數(shù)據(jù),來獲取實時數(shù)據(jù),并保持性能高及效率高
1、在springboot中引入websocket相應(yīng)的jar包
<!-- Spring Boot WebSocket Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
2.創(chuàng)建webSocketConfig 暴露endpoint端點
package com.nq.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}
3:創(chuàng)建websocket客戶端用于連接第三方的wss
package com.nq.common;import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nq.pojo.Stock;
import com.nq.service.IStockService;
import com.nq.utils.PropertiesUtil;
import com.nq.utils.StringUtils;
import com.nq.utils.redis.RedisShardedPool;
import com.nq.utils.redis.RedisShardedPoolUtils;
import com.nq.vo.stock.StockListVO;
import com.nq.vo.websocket.CodeVo;
import com.nq.vo.websocket.StockVo;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.hpsf.Decimal;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.websocket.*;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URI;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** @Description: websocket客戶端* 一共2800條code產(chǎn)品數(shù)據(jù),每個webSocketServer處理1000條數(shù)據(jù),分三個webSocketServer處理* 提高效率* @Author: jade* @Date: 2021/8/25 10:25*/
@ClientEndpoint
@Slf4j
@Component //交給spring容器管理
@Data
public class WebSocketJavaExample {private Session session; //會話對象private Boolean flag = true; //用來拯救流的關(guān)閉標(biāo)識符private Map<String,StockVo> stockVoMap;private List<StockVo> stockList; //返回給客戶端的封裝數(shù)據(jù)List集合public final static Integer MAXCAP = 1000; //一次性以1000條@Resource //使用@Resource來裝配,不然會為nullIStockService stockService;private ObjectMapper objectMapper=new ObjectMapper();@OnOpenpublic void onOpen(Session session) {this.session = session;}
/**
接收第三方服務(wù)端的消息
**/@OnMessagepublic void onMessage(String message) {if(message.indexOf("data") != -1) {try {JSONObject jsonObject = JSONUtil.parseObj(message);String dataStr = jsonObject.getStr("data");//第三方響應(yīng)的Json數(shù)據(jù)if (dataStr != null) {
// JSONArray jsonArray = JSONUtil.parseArray(dataStr);
// JSONObject jsonObject = JSONUtil.parseObj(dataStr);
// jsonArray.stream().forEach(item -> {JSONObject json = JSONUtil.parseObj(dataStr);Optional<StockVo> stockVo = stockList.stream()//Optional為java8的Stream API中使用,處理可能為null的元素.filter(p -> json.getStr("code").equals(p.getCode().concat(".US"))).findFirst();
// .filter(p -> json.getStr("code").equals(p.getCode())).findFirst();stockVo.ifPresent(vo -> {// 當(dāng)前價格BigDecimal nowPrice = new BigDecimal(json.getStr("price"));BigDecimal preClosePrice = vo.getPreclose_px();vo.setType(json.getStr("trade_direction"));// alltick websocket 獲取數(shù)據(jù) 替換原來的當(dāng)前價格和漲幅vo.setNowPrice(nowPrice);// 計算漲幅BigDecimal chg = nowPrice.subtract(preClosePrice).divide(preClosePrice, 4, BigDecimal.ROUND_HALF_UP).multiply(new BigDecimal(100));vo.setHcrate(chg);});log.info("Optional<StockVo> send message to client"+stockVo);
// });} else {log.error("data字段不是一個有效的JSON數(shù)組: {}", dataStr);}} catch (Exception e) {log.error("解析消息時發(fā)生異常: {}", e.getMessage());}}}@OnClosepublic void onClose(Session session, CloseReason closeReason) {flag=false;log.info("AllTick API Docs流已關(guān)閉,關(guān)閉原因:{}",closeReason.toString());}@OnErrorpublic void onError(Throwable e) {log.error("AllTick API Docs連接異常{}",e.getMessage());}@Asyncpublic void sendMessage(String key,String message) throws Exception {session.getBasicRemote().sendText(message);
// log.info("client:{}, AllTick API Docs 請求報文: {}", key, message);
// if (this.session != null && this.session.isOpen()) {
// this.session.getBasicRemote().sendText(message);
// } else {
// log.error("會話已關(guān)閉,無法發(fā)送消息: {}", key);
// }}//websocket地址private String url=PropertiesUtil.getProperty("WebSocket.url");//token數(shù)據(jù)private String token= PropertiesUtil.getProperty("WebSocket.token");public static List<WebSocketJavaExampleInfo> webSocketJavaExampleList = new ArrayList<>();@PostConstructpublic void initPool() {new Thread(()->{ //另外起一個線程執(zhí)行websocket,不影響主線程run();}).start();}@PreDestroypublic void destroy() {if (this.session != null && this.session.isOpen()) {try {this.session.close();} catch (IOException e) {log.error("關(guān)閉WebSocket連接時發(fā)生異常: {}", e.getMessage());}}}@Asyncpublic void run(){try {List<Stock> list = stockService.findStockList();int len = list.size();int capacity = (int) Math.ceil((double) len / MAXCAP);//向上取整
// int capacity = (int) Math.ceil(len / MAXCAP);
// if (capacity<1 || len % capacity != 0 ) {
// capacity++;
// }List<CodeVo> codeVos = new ArrayList<>();log.info("開始連接AllTick API Docs,請求url:{}",url.concat(token));WebSocketContainer container = ContainerProvider.getWebSocketContainer();URI uri = new URI(url.concat(token)); // Replace with your websocket endpoint URLfor (int i = 0; i < capacity; i++) {WebSocketJavaExample client = new WebSocketJavaExample(); //多個客戶client執(zhí)行,每個客戶端執(zhí)行1000條數(shù)據(jù)container.connectToServer(client, uri);List<Stock> list1 = list.stream().skip(i * MAXCAP).limit(MAXCAP).collect(Collectors.toList());stockList = new ArrayList<>();list1.forEach(item -> {CodeVo codeVo = new CodeVo();codeVo.setCode(item.getStockCode().concat(".US"));
// codeVo.setCode(item.getStockCode());codeVos.add(codeVo);StockVo stockVo = new StockVo();try {// 數(shù)據(jù)初始化String us = RedisShardedPoolUtils.get(item.getStockGid(), 4);StockListVO stockListVO = objectMapper.readValue(us, StockListVO.class);stockVo.setName(stockListVO.getName());stockVo.setCode(stockListVO.getCode());stockVo.setGid(stockListVO.getGid());stockVo.setStock_type(stockListVO.getStock_type());stockVo.setType(stockListVO.getType());stockVo.setHcrate(stockListVO.getHcrate());stockVo.setOpen_px(new BigDecimal(stockListVO.getOpen_px()));stockVo.setNowPrice(new BigDecimal(stockListVO.getNowPrice()));stockVo.setPreclose_px(new BigDecimal(stockListVO.getPreclose_px()));stockVo.setIsOption(Integer.valueOf(stockListVO.getIsOption()));stockList.add(stockVo);} catch (JsonProcessingException e) {log.info("redis數(shù)據(jù)轉(zhuǎn)換對象stockListVO異常",e.getMessage());}});JSONArray symbolList = new JSONArray(codeVos); // 直接將List轉(zhuǎn)換為JSONArrayclient.setStockList(stockList);// 使用LinkedHashMap來保持順序Map<String, Object> messageMap = new LinkedHashMap<>();messageMap.put("cmd_id", 22004);messageMap.put("seq_id", 123);messageMap.put("trace", "3baaa938-f92c-4a74-a228-fd49d5e2f8bc-1678419657806");Map<String, Object> dataMap = new LinkedHashMap<>();dataMap.put("symbol_list", symbolList);messageMap.put("data", dataMap);// 將LinkedHashMap轉(zhuǎn)換為JSONObjectJSONObject message2 = new JSONObject(messageMap);String message = message2.toString();
// String message = "{\"cmd_id\":22004,\"seq_id\":123,\"trace\":\"3baaa938-f92c-4a74-a228-fd49d5e2f8bc-1678419657806\",\"data\":{\"symbol_list\": "+ JSONUtil.toJsonStr(codeVos) +"}}";client.sendMessage("client" + i, message);webSocketJavaExampleList.add(new WebSocketJavaExampleInfo(client, "client" + i,message));codeVos.clear();// 創(chuàng)建一個TimerTask任務(wù)int finalI = i;TimerTask task3 = new TimerTask() {@SneakyThrows@Overridepublic void run() {//定時獲取心跳try {client.sendMessage("client" + finalI,"{\n" +" \"cmd_id\":22000,\n" +" \"seq_id\":123,\n" +" \"trace\":\"3baaa938-f92c-4a74-a228-fd49d5e2f8bc-1678419657806\",\n" +" \"data\":{\n" +" }\n" +"}");} catch (Exception e) {log.error(e.getMessage());}}};new Timer().schedule(task3, 10000,10000);new Thread().sleep(2000);}}catch (Exception e){e.printStackTrace();log.error("AllTick API Docs 連接失敗:{}",e.getMessage());}}
/**
定時任務(wù),應(yīng)用啟動后延遲 2.5 分鐘開始執(zhí)行,之后每隔 2.5 分鐘執(zhí)行一次,去勘測是否流關(guān)閉,然后拯救連接websocket
**/@Scheduled(fixedRate = 1 * 15 * 10000,initialDelay = 150000)public void Daemon() throws Exception {WebSocketContainer container = ContainerProvider.getWebSocketContainer();URI uri = new URI(url.concat(token)); // Replace with your websocket endpoint URLfor(WebSocketJavaExampleInfo webSocketJavaExampleInfo:webSocketJavaExampleList){if(!webSocketJavaExampleInfo.getClient().flag){container.connectToServer(webSocketJavaExampleInfo.getClient(), uri);webSocketJavaExampleInfo.getClient().sendMessage(webSocketJavaExampleInfo.getKey(), webSocketJavaExampleInfo.getMessage());}}}
}@Data
class WebSocketJavaExampleInfo{private WebSocketJavaExample client;private String key;private String message;public WebSocketJavaExampleInfo(WebSocketJavaExample client, String key,String message) {this.client = client;this.key = key;this.message = message;}
}
4、創(chuàng)建websocket服務(wù)端用于連接客戶端,及供前端訪問
package com.nq.common;import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.nq.vo.websocket.StockVo;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.config.annotation.EnableWebSocket;import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;import static com.nq.common.WebSocketJavaExample.MAXCAP;
import static com.nq.common.WebSocketJavaExample.webSocketJavaExampleList;@ServerEndpoint("/ws/{userId}")
@EnableWebSocket
@Component
@Data
public class WebSocketServer {private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);private static final Integer pageSize = 100;//頁碼private Integer pageNo;//頁數(shù)/*** concurrent包的線程安全Set,用來存放每個客戶端對應(yīng)的MyWebSocket對象。*/private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();/*** 與某個客戶端的連接會話,需要通過它來給客戶端發(fā)送數(shù)據(jù)*/private Session session;/*** 接收userId*/private String userId = "";/*** 查詢code*/private String code = "";/*** 連接建立成功調(diào)用的方法*/@OnOpenpublic void onOpen(Session session, @PathParam("userId") String userId) {this.session = session;this.userId = userId;if (webSocketMap.containsKey(userId)) {webSocketMap.remove(userId);webSocketMap.put(userId, this);} else {webSocketMap.put(userId, this);}}/*** 連接關(guān)閉調(diào)用的方法*/@OnClosepublic void onClose() {if (webSocketMap.containsKey(userId)) {webSocketMap.remove(userId);}}/*** 收到客戶端消息后調(diào)用的方法** @param message 客戶端發(fā)送過來的消息*/@OnMessagepublic void onMessage(String message, Session session) {log.info("用戶消息:" + userId + ",報文:" + message);JSONObject jsonObject = JSONUtil.parseObj(message);
// pageSize = jsonObject.getInt("pageSize");pageNo = jsonObject.getInt("pageNo");code = jsonObject.getStr("code");
// if (ObjectUtil.isNotEmpty(code)) { //如果code不為空,則查詢并推送數(shù)據(jù)
// queryAndSendStockVo(code, session);
// }}/*** @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {log.error("用戶錯誤:" + this.userId + ",原因:" + error.getMessage());}/*** 實現(xiàn)服務(wù)器主動推送*/public void sendMessage(String message) throws IOException {this.session.getBasicRemote().sendText(message);}/*** 群發(fā)消息** @throws IOException*/@PostConstructpublic void BroadCastInfo() throws IOException, InterruptedException {log.info("開始輪詢批量推送數(shù)據(jù)");ThreadUtil.execAsync(() -> {run();});}public void run() {WebSocketServer webSocketServer;List<StockVo> list = null;List<StockVo> additionalList = null;List<StockVo> list2 = null;while (true) {try {if (webSocketMap.size() > 0) {for (Map.Entry<String, WebSocketServer> stringWebSocketServerEntry : webSocketMap.entrySet()) {webSocketServer = stringWebSocketServerEntry.getValue();if (ObjectUtil.isEmpty(webSocketServer.pageNo) && ObjectUtil.isNotEmpty(webSocketServer.pageSize) && ObjectUtil.isEmpty(webSocketServer.getCode())) {//如果默認(rèn)沒有參數(shù) 就傳輸3千條
// if(ObjectUtil.isEmpty(webSocketServer.pageNo)) {//如果默認(rèn)沒有參數(shù) 就傳輸3千條list = webSocketJavaExampleList.get(0).getClient().getStockList().stream().limit(1000).collect(Collectors.toList());} else if (ObjectUtil.isNotEmpty(webSocketServer.pageNo)) {int pageSize = webSocketServer.pageSize;int pageNo = webSocketServer.pageNo;Integer size = pageNo * pageSize;
// int capacity = (int) Math.ceil(size / 20);int capacity = (int) Math.ceil(size / MAXCAP);int pageno = (capacity * 1000 / pageSize);pageNo -= pageno;if (capacity == 0) {list = webSocketJavaExampleList.get(capacity).getClient().getStockList().stream().skip(0).limit(pageNo * pageSize).collect(Collectors.toList());}if (capacity == 1) {list = webSocketJavaExampleList.get(0).getClient().getStockList().stream().collect(Collectors.toList());additionalList = webSocketJavaExampleList.get(capacity).getClient().getStockList().stream().skip(0).limit(size - (MAXCAP * capacity)).collect(Collectors.toList());list = Stream.concat(list.stream(), additionalList.stream()).collect(Collectors.toList());}if (capacity == 2) {list = webSocketJavaExampleList.get(0).getClient().getStockList().stream().collect(Collectors.toList());list2 = webSocketJavaExampleList.get(1).getClient().getStockList().stream().collect(Collectors.toList());additionalList = webSocketJavaExampleList.get(capacity).getClient().getStockList().stream().skip(0).limit(size - (MAXCAP * capacity)).collect(Collectors.toList());list = Stream.concat(Stream.concat(list.stream(), list2.stream()), additionalList.stream()).collect(Collectors.toList());}
// list = webSocketJavaExampleList.get(capacity).getClient().getStockList().stream().skip((pageNo - 1) * pageSize).limit(pageSize).collect(Collectors.toList());} else {String queryCode = webSocketServer.getCode();// 使用并行流處理數(shù)據(jù),減少嵌套循環(huán)list = webSocketJavaExampleList.parallelStream().flatMap(webSocketJavaExampleInfo -> webSocketJavaExampleInfo.getClient().getStockList().stream()).filter(stockVo -> stockVo.getCode().contains(queryCode)).collect(Collectors.toList());}try {stringWebSocketServerEntry.getValue().sendMessage(JSONUtil.toJsonStr(list));} catch (IOException e) {log.error("用戶編碼為:{},推送ws數(shù)據(jù)異常,異常原因:{}", webSocketServer.getUserId(), e.getMessage());}}}} catch (Exception e) {log.error("推送失敗: {}", e.getMessage());} finally {try {new Thread().sleep(2000);} catch (InterruptedException e) {log.error("沒有客戶端:{},webSocketMap:{}", e.getMessage(), webSocketMap.size());}}}}}
以上是基于小編在開發(fā)過程中,針對websocket技術(shù)的實戰(zhàn)經(jīng)驗,通過調(diào)用第三方wss的的數(shù)據(jù),來獲取實時數(shù)據(jù)