# message-push-platform **Repository Path**: sunforrain/message-push-platform ## Basic Information - **Project Name**: message-push-platform - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 35 - **Created**: 2024-12-04 - **Last Updated**: 2025-01-22 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 项目结构 ``` crazymaker-server -- 根项目 │ ├─cloud-center -- 微服务的基础设施中心 │ │ ├─clou-springboot-admin-server -- 管理中心 │ │ ├─springcloud-gateway -- 网关服务 │ ├─crazymaker-base -- 公共基础依赖模块 │ │ ├─base-common -- 普通的公共依赖,如 utils 类的公共方法 │ │ ├─redis-starter -- 公共的 redis 操作模块 │ │ ├─zookeeper-starter -- 公共的 zookeeper 操作模块 │ │ ├─session-starter -- 分布式 session 模块 │ │ ├─auth-starter -- 基于 JWT + SpringSecurity 的用户凭证与认证模块 │ │ ├─base-runtime -- 各 provider 的运行时公共依赖,装配的一些通用 Spring IOC Bean 实例 │ │ ├─sharding-jdbc-starter -- shardingJDBC相关的操作模块, 配置等 │ ├─crazymaker-app --业务模块: 用户认证与授权 │ │ ├─app-api -- 用户 DTO、Constants 等 │ │ ├─app-client -- 用户服务的 Feign 远程客户端 │ │ ├─app-provider -- 用户认证与权限的核心实现,包含controller 层、service层、dao层的代码实现 │ ├─message-dispatcher -- │ │ ├─dispatcher-api -- │ │ ├─dispatcher-provider -- │ ├─message-tunnel -- │ │ ├─tunnel-client -- │ │ ├─tunnel-provider -- ``` 接下来,对业务模块的分包规范,做一个说明。在业务模块如何分包的问题上,实际上大部分企业都有自己的统一规范,这里从职责清晰、方便维护、能快速定位代码的角度出发,将 crazy-springcloud 微服务开发脚手架的每一个业务模块,分成了 {module}-api、 {module}-client、 {module}-provider 三个子模块,三个子模块的具体介绍如下: (1) {module}-api 子模块定义了一些公共的 Constants 业务常量和 DTO 传输对象,该子模块既被业务模块内部依赖,也会被使用该业务模块的外部所依赖; (2) {module}-client 子模块定义了一些被外部模块所依赖的 Feign 远程调用客户类,该子模块是专供外部的模块,不能被内部子模块依赖; (3) {module}-provider 子模块是整个业务模块的核心,也是一个能够独立启动、运行的服务提供者应用,该模块包含涉及到业务逻辑的 controller层、service层、dao层的完整代码实现。 crazy-springcloud 微服务开发脚手架在以下几个维度进行了弱化: (1)有关部署,没有使用 docker 容器而是使用 shell 脚本,对容器进行了弱化。有多方面的原因:一是本脚手架初心是学习,所使用的部署方式为 shell 脚本而不是 docker,方便大家学习 shell 命令和脚本;二是 Java 和 docker 其实整合得很好,可以稍加配置就一键发布,找点资料就可以掌握; 三是生产环境的部署、甚至是整个自动化构建和部署的工作,实际上属于运维工作,都有专门的运维岗位人员去完成,而部署的核心,任然是 shell脚本。 (2)有关监控,没有对链路监控、JVM性能指标、断路器监控做专门的封装和介绍。有多方面的原因:一是监控的软件太多,如果介绍太全,篇幅又不够,介绍太少又不是大家所用到的; 二是其实都是软件的操作说明,原理性的内容比较少,使用视频的形式比文字形式知识传递的效果会更好。疯狂创客圈后续会推出一些微服务监控的视频,请大家关注社群博客。反过来说,对于一个编程高手来说,如果了解 Spring Cloud 核心原理,那些监控组件基本上都是一碟小菜。 # 项目讲解见19章 19.1主要为基础组件介绍, shardingJDBC, canel, mysql主从同步等 19.2前半部分为springSecurity的介绍, 后半部分则是消息推送中台的架构设计, 技术亮点等 需要启动cdh2虚拟机 # 78: SpringCloud gateway 过滤器, 完成JWT + RSA令牌校验 ## 涉及的类 JwtAuthConfig --jwt令牌 + RSA 加密的配置类 AuthUtils --认证授权相关工具类, 提供针对token的获取, 加解密等工具方法 AuthGlobalFilter -- 统一鉴权的全局过滤器, 响应式编程 MonoTest -- Mono的演示类 FluxTest -- Flux的演示类 SpringCloudGateway可以看博客 https://www.cnblogs.com/crazymakercircle/p/11704077.html 要演示令牌校验所需要的三个服务启动类 GatewayProviderApplication -- gateway服务 MessageDispatchApplication -- 消息分发服务 UAACloudApplication -- 应用的鉴权服务 # 79:编写SpringCloud gateway过滤器,完成Body的拦截、提取、接力转发, 将消息发送到delay接力队列 ## 涉及的类 com.crazymaker.cloud.nacos.demo.gateway.filter.HttpBodyGlobalFilter com.crazymaker.cloud.nacos.demo.gateway.filter.PushMessageGlobalFilter # 80: 阻塞队列 + 批量消息,实现高性能rocketmq批量消息组装与推送 有关中继队列的实现细节 阻塞队列 + 批量消息, 单个producer实例从阻塞队列获取多个消息, 进行批量消息拼装(4M) 以内, 再发送给高可用rocketMQ集群 rocketMQ发送消息的类是 org.apache.rocketmq.common.message.Message ## 为什么要进行消息拼装? 1. 一般一个消息100Byte, 一个TCP报文一般1500Byte, 拼装可以更高效率的利用TCP报文的空间 2. 网络传输有不稳定因素, 多个消息的话封装为一次发送和多次发送相比, 可以降低发送消息的不稳定性 ## 涉及的类 详解见注释 com.crazymaker.cloud.nacos.demo.gateway.service.RocketmqMessageRelayService.putMessage -> com.crazymaker.cloud.nacos.demo.gateway.service.RocketmqMessageRelayService.doSend 可以看到消息都被封装在一个list中, 然后批量掉发送 com.crazymaker.cloud.nacos.demo.gateway.service.RocketmqMessageRelayService.InnerSender.syncSend -> org.apache.rocketmq.client.producer.DefaultMQProducer.send(java.util.Collection) 可以看到这个rocketmq的这个方法实现是进行了消息拼装的 # 81: 深入rocketmq源码: 实现多线程+多producer 超高并发推送模式 RocketMq的producer与broker之间是通过netty的channel连接的, 且只有一个通道, 那么一个的话肯定存在上限的, 也容易变得繁忙, 需要进行改造为多个通道 多线程 + 多producer, 实现多producer动态, 并发推送模式 生产端可以根据配置, 动态构造多个producer客户端实例, 配合线程池任务, 动态进行producer的发送调度. ## 那么首先要先研究下rocketMQ中producer和broker的channel关联关系以及代码是如何实现关联的 producer和broker是只有一个连接通道的 producer -> channel -> broker 从源码分析: 顺着发送消息的方法找 com.crazymaker.cloud.nacos.demo.gateway.service.RocketmqMessageRelayService.doSend -> com.crazymaker.cloud.nacos.demo.gateway.service.RocketmqMessageRelayService.InnerSender.syncSend -> org.apache.rocketmq.client.producer.DefaultMQProducer.send(java.util.Collection) -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(org.apache.rocketmq.common.message.Message) -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(org.apache.rocketmq.common.message.Message, long) 从这里能看出来是调的同步发送, communicationMode = SYNC -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl 看 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);这一行代码 这里进行分区的计算, 即消息往topic里面哪个队列进行发送 看 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);这一行 进到方法里去 -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl 看名字能看出来这是个发送消息相关的核心方法, 前面会进行一系列消息构造的逻辑, 往里面找 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( 这一行 这里马上就要进行客户端消息发送的RPC调用了, mQClientFactory 这个名字有点混肴, 它实际上是一个 MQClientInstance -> org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(java.lang.String, java.lang.String, org.apache.rocketmq.common.message.Message, org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader, long, org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.producer.SendCallback, org.apache.rocketmq.client.impl.producer.TopicPublishInfo, org.apache.rocketmq.client.impl.factory.MQClientInstance, int, org.apache.rocketmq.client.hook.SendMessageContext, org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl) 我们是SYNC同步发送, 因此到这一行: return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); 可以在这一行打断点, addr就是broker的ip和端口, 值举例如 192.168.56.122:10911, brokerName 就是broker注册的名称 -> org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync 看这一行 RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); 还给往里看 -> org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync 是不是出现netty了? 看这一行 final Channel channel = this.getAndCreateChannel(addr); 这里可以发现, 根据上面的addr创建channel -> org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateChannel 这个方法就是用addr创建channel, 创建好的channel会缓存到channelTables中, 一个ConcurrentMap 这里就是关键, 证明了是一个producerClient绑定了一个channel, 和broker做通讯 channelTables保存的键值对会有: 192.168.56.122:10911 : NettyRemotingClient (即对应的broker) cdh:9876 : NettyRemotingClient (对应的nameServer) 从 MQClientAPIImpl 的成员变量 private final RemotingClient remotingClient 只有一个来看, 一个客户端只有有一个RemotingClient ## 实现原理搞明白了, 那么应该如何改造, 使得producer和broker之间能使用多个channel进行通讯呢? 关键就是要创建多个producerClient, 同时每个client配置的 addr 要在MQ客户端启动的时候, 是不同的, 回到 com.crazymaker.cloud.nacos.demo.gateway.service.RocketmqMessageRelayService.InnerSender.startMqProducer 看 mqProducer.start(); -> org.apache.rocketmq.client.producer.DefaultMQProducer.start 看 this.defaultMQProducerImpl.start(); -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.start() -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.start(boolean) 这一行 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); 这里创建了一个 MQClientInstance, 名字起的有点混肴 -> org.apache.rocketmq.client.impl.MQClientManager.getOrCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook) 看 MQClientInstance instance = this.factoryTable.get(clientId); 这里的参数clientId很重要, 和上面 channelTables 关联起来了, 正式这个key; 在MQClient启动的时候, 初始化好的 MQClientInstance, 通过这个key缓存在 channelTables, 后续发送消息是从map里面取实例进行消息发送 可以在这行打个断点, 值例如: 192.168.56.122@push-producer-relay-0-0 -> org.apache.rocketmq.client.ClientConfig.buildMQClientId 这个方法用来生成clientId, 可见clientId构成就是 客户端实例的ip@实例名称@unitName 看到这, 关键就是对实例名称 instanceName 下手, 去看 com.crazymaker.cloud.nacos.demo.gateway.service.RocketmqMessageRelayService.startMutiSender 根据配置值 rocketmq.producer.concurrent 设置并发数量, 配合线程池, 创建多个消息生产者客户端 其中 sender.setInstanceName(producerRelayGroup + "-" + workerId + "-" + i); workerId是通过zk获取的, 保证多个实例时每个都是不同的, i则是通过配置值 rocketmq.producer.concurrent 设置 如果不设置, InstanceName默认是DEFAULT, 创建多个producer实例是没有意义的 另外注意我们这里讲的是希望多个producer对应单broker的情况, 如果是多个broker, rocketMQ的实现是会有多个channel的 关于多个生产者时, 发送消息如何负载均衡到多个生产者, 见 com.crazymaker.cloud.nacos.demo.gateway.service.RocketmqMessageRelayService.putMessage int index = producerInUseCount.get(); 维护了一个计数, 所以这里是轮询做的负载均衡 # 82: 核心功能介绍: 基于websocket推送的端到端流程演示与解读 介绍分发服务和后续渠道对接服务的内容, 后续实操 架构图会进行一个重新补充 ## 实操涉及的几个基础服务: GatewayProviderApplication -- gateway服务 MessageDispatchApplication -- 消息分发服务 WebSocketCloudApplication -- WebSocket渠道服务 WebSocketMockClient -- WebSocket模拟的客户端, 连接上了WebSocket渠道, 用来展示接收到的消息 UAACloudApplication -- 应用的鉴权服务, 可以关掉SpringSecurity的注解, 本地不进行鉴权 ## 消息类型常量 MessageTypeEnum 消息类型的设计实际上是违反阿里对消息中间件使用规范的, 规范规定一个应用使用一个topic, 应用内的消息则通过tag做隔离 而我们这里则是一个类型就是一个topic 消息发送的方法见 com.crazymaker.springcloud.message.service.impl.MessagePushServiceImpl.saveAndDispatchMessage topic是按照如下规则组成的 String topicKey = topic + "-" + dto.getTargetType(); 一个应用有多个topic, 这一设计是为了后面的扩容和缩容, 后面会详细讲 # 83: 基于Netty的Websocket推送实操的重要性、高价值介绍 介绍了一个推送中台 + Netty的底层 + 底层操作系统的c语言相关的东西的面经 10wQPS推送中台的实操大纲也重新介绍了下 后面netty还会继续做重新讲解 ## 相关学习材料 卷1, 11章WebSocket原理与实战 博文: WebSocket和nginx动态负载均衡 https://www.cnblogs.com/crazymakercircle/p/16110932.html # 84: 高并发推送实操: Websocket推送的功能流程介绍 做一下功能分析, 以 架构班参考资料:多渠道推送 场景&接口&库表 文档中, 场景二: WebSocket/MQ 推送场景分析( MIS 推送给 终端) 为介绍对象 该场景参考了阿里推送的场景构造 业务梳理和角色见 onenote: 文档/p_消息推送中台/业务场景梳理/Websocket通讯的业务场景 下面的几块都是简单介绍, 后续会详细讲(实际类名和视频不一致) ## 模拟websocket客户端的测试类 com.crazymaker.springcloud.websocket.client.WebSocketMockClient ## 对用户标识(token)的解析和校验会用到 com.crazymaker.springcloud.websocket.netty.handler.ClientTokenCheckHandler 最后创建安全校验通过事件 SecurityCheckCompleteEvent complete, 后续处理器会收到这个事件 ## 对用户会话信息的绑定 com.crazymaker.springcloud.websocket.netty.handler.PusMsgWebSocketFrameHandler else if (evt instanceof SecurityCheckCompleteEvent) 这一行 处理上面创建的安全校验通过事件 SecurityCheckCompleteEvent complete ## 对于不同用户, 消息是如何进行路由的, 以及消息发送的过程 com.crazymaker.springcloud.websocket.service.impl.PushMessage2NettyChannelService webserviceMsgProcess.updateAndSend(json); 这个方法里面 ## 用户的token是如何获取的? 消息中台对于用户是透明的, 他只知道自己的业务系统 所以都是从业务系统的后台拿的, 用户登录获取到token后, 用户的token信息是业务系统后台 -> dispatcher-provider -> 分发给websocket-provider的 # 85:高并发推送实操:技术选型与通讯流程的超高并发架构 做一下, dispatcher-provider 这个模块的架构分析草图和内存, 流量的分析 简单比较了tomcat和netty的优劣势 业务梳理和角色见 onenote: 文档/p_消息推送中台/业务场景梳理/技术选型与通讯流程的超高并发架构(草图) # 86:高并发推送实操:基于netty的WebServer、Websocket Server基础开发 资料使用Java高并发核心编程(卷一), 11章WebSocket原理与实战 以及 博文: WebSocket和nginx动态负载均衡 https://www.cnblogs.com/crazymakercircle/p/16110932.html 涉及代码 netty_redis_zookeeper_source_code 项目 ## NettyWebSocketServerDemo/src/main/resources/index.html 客户端展示的网页, 包含websocket连接的创建代码(js写的) ## com.crazymakercircle.netty.websocket.WebPageHandler 用于返回index.html给客户端的handler ## com.crazymakercircle.netty.websocket.WebSocketEchoServer 创建websocket演示用的server端, 尤其是pipeline的组装是有讲究的, 重点看 WebSocketServerProtocolHandler的内部逻辑 参考博文 https://blog.csdn.net/wangwei19871103/article/details/104651753 io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.handlerAdded 进行 WebSocketServerProtocolHandshakeHandler 的添加, 用来先处理websocket的握手请求 io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandshakeHandler.channelRead ctx.pipeline().replace(this, "WS403Responder", WebSocketServerProtocolHandler.forbiddenHttpRequestResponder());//把WebSocketServerProtocolHandshakeHandler处理器替换掉,变成403 这个方法接收的还是http请求, 重点看 final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req); io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker.handshake(io.netty.channel.Channel, io.netty.handler.codec.http.FullHttpRequest) io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker.handshake(io.netty.channel.Channel, io.netty.handler.codec.http.FullHttpRequest, io.netty.handler.codec.http.HttpHeaders, io.netty.channel.ChannelPromise) 该方法完成了pipeline上原来用于http的decoder和encoder替换为websocket的逻辑 再回到 io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandshakeHandler.channelRead handshake成功, 会通过handlerContext发送两个事件 // Kept for compatibility ctx.fireUserEventTriggered(WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); ctx.fireUserEventTriggered(new WebSocketServerProtocolHandler.HandshakeComplete(req.uri(), req.headers(), handshaker.selectedSubprotocol())); 这里两个事件是一个性质的, 都用来表示握手成功, 只是有升级的关系, 原来的事件包含的信息比较少, 后来的事件保存了一些请求信息 ## com.crazymakercircle.netty.websocket.TextWebSocketFrameHandler 用于获取websocket的文本帧, 添加几个字符后返回的handler userEventTriggered 方法中, 收到 WebSocketServerProtocolHandler.HandshakeComplete websocket握手成功事件, 会去掉 pipeline上的WebPageHandler # 87:高并发推送实操:通过抓包方式,深入的分析和学习websocket协议 基于86节的代码, 以及 Java高并发核心编程(卷一), 11章WebSocket原理与实战 11.3节 websocket报文的更详细资料见 https://www.cnblogs.com/crazymakercircle/p/16151804.html 主要内容为报文结构的分析 抓包改ip的说明见 com.crazymakercircle.NettyTest.WebSocketEchoTester#startServer # 88:Netty源码解读:websocket的事件机制,与握手事件的处理流程 首先看一下netty的事件处理机制 netty的pipeline上有三种数据的传递 handler的传递 异常的传递 事件的传递 ## 异常是如何传递的? 在任意的处理器上打个断点, 比如ClientTokenCheckHandler的channelRead方法第一行 顺着调用栈往上找可以看到 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(java.lang.Object) 我们都知道, netty的handler是由ChanelHandlerContext做封装的 这个方法的catch块中调用了 notifyHandlerException(t); io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException 方法中会判断异常是否已经被捕获, 若没有, 走invokeExceptionCaught(cause); io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(java.lang.Throwable) if判断先校验是否有handler调用了 handlerAdd方法, 一般没有, 所以会进 fireExceptionCaught(cause); io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught 这一方法的内部实现会将异常向下一handler传递 而我们自定义的异常处理用handler就是 ServerExceptionHandler 是放在pipeline的最后的 ## 事件是如何传递的? 结合websocket的握手事件来讲 在netty完成webSocket握手后, 会传递出一个事件 WebSocketServerProtocolHandler.HandshakeComplete 这个事件也被我们自定义的handler消费, 见 com.crazymaker.springcloud.websocket.netty.handler.PusMsgWebSocketFrameHandler.userEventTriggered 完成了登录session的保存和与客户端channel的保存等 事件如何发送的见 断点打在 com.crazymaker.springcloud.websocket.netty.handler.PusMsgWebSocketFrameHandler.userEventTriggered 第一行, 顺着调用栈往上找 io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler 里面组装了WebSocketServerProtocolHandshakeHandler io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandshakeHandler.channelRead 这个方法在完成了握手处理, 会调用 // Kept for compatibility ctx.fireUserEventTriggered(WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); ctx.fireUserEventTriggered(new WebSocketServerProtocolHandler.HandshakeComplete(req.uri(), req.headers(), handshaker.selectedSubprotocol())); io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered 这个fire方法和异常的传递很像 里面也调用了 io.netty.channel.AbstractChannelHandlerContext.findContextInbound 会将事件向context的下一个handler传递 # 89:高并发推送实操:消息推送场景中websocket业务handler的功能,以及流水线的装配 流水线上handler的组成见, 说明见注释 com.crazymaker.springcloud.websocket.netty.WebSocketServer.PushServerInitializer.initChannel ## 自定义的handler com.crazymaker.springcloud.websocket.netty.handler.ClientTokenCheckHandler 用于用户token验证的handler com.crazymaker.springcloud.websocket.netty.handler.PusMsgWebSocketFrameHandler 发送消息到已建立websocket连接的客户端 只有一个简单的消息回写, 客户端发到服务端的消息没做什么转发处理, 而是直接原样返回给客户端了 发送服务的消息给用户的方法实际上走 PushMessage2NettyChannelService com.crazymaker.springcloud.websocket.netty.handler.ServerExceptionHandler 异常处理器, 用于异常的兜底处理, 这个handler是pipeLine之间共享的 ## netty提供的handler 剩余的, 怎么使用结合课件和注释 # 90:服务端性能优化:在握手之前完成token认证,而不是握手之后 前面讲过要在netty的 WebSocketServerProtocolHandler 处理器之前进行用户token的认证. 实际包含两个工作: 1. 用户client如何获得用户的令牌 2. 用户client如何获得websocket-provider(渠道) 的ip和端口 ## 用户client如何获得用户的令牌 前面介绍过, 我们的用户是业务系统后台的用户, websocket-provider对于用户client是透明的, websocket-provider 也没有保存用户信息 那么, 用户的token信息需要通过 app-provider服务进行获取 ### 这里能不能直接用业务系统后台的token? 获取业务系统后台的token接口是 app-provider服务下的 com.crazymaker.springcloud.user.controller.AuthController.userToken 答案是不能, 直接暴露内部token, 一旦token泄露, 会造成内部权限全部暴露 这里单独提供一个提供websocket连接用token的接口 app-provider服务下的 com.crazymaker.springcloud.user.controller.AuthController.accountToken 这个token没有设置session, 比较简单 提供的测试类 应用获取令牌的测试类, 即内部token的获取 com.crazymaker.springcloud.websocket.http.AppTokenHttpClient 需要启动 app-provider服务(服务器zk要启动) 演示 应用为account获取WS令牌的测试类, 即用户client到websocket-provider的token的获取 com.crazymaker.springcloud.websocket.http.AppAccountTokenHttpClient 需要启动 app-provider服务(服务器zk要启动) 演示 ## 用户client如何获得websocket-provider(渠道) 的ip和端口 这个问题实际还涉及有多个 websocket-provider 服务时的负载均衡, 我们前面讲过利用nginx进行多个服务的负载均衡, 但是静态的. ### nginx的ip和端口从哪里来? nacos中配置了一个key, 叫 websocketgateway, 以获取到, 另一种更高可用的方式是vip的方式, 这里不涉及; 那么这里nginx的ip和端口实际是固定的 ### 如何进行动态的配置(websocket-provider 服务上下线, nginx可以动态感知并动态更改节点变化) 参考 k8s的 ingress的实现, 后面会介绍 # 91:高并发推送实操:Client如何获取websocket服务的gateway地址、以及推送token 结合上一节一起看 用到上一节的两个测试类获取token和服务的网关地址 ## 客户端的消息发送 测试类 com.crazymaker.springcloud.websocket.client.WebSocketMockClient 基于websocket的netty客户端, 模拟websocket客户端用于消息接收的测试 com.crazymaker.springcloud.websocket.client.WebSocketClientHandler 用于客户端消息处理的测试用handler ## 客户端token验证逻辑详解 对于如何优化websocket握手之前即进行token验证, 以及验证通过后session的设置问题, 是一个业务上的设计逻辑 WebSocketServer中组合pipeline时, 将 ClientTokenCheckHandler 设置在 WebSocketServerProtocolHandler 之前, ClientTokenCheckHandler 会进行token验证, 未通过会返回未授权的响应, 并关闭当前通道, 也就不会进行后续握手 若通过, 会发送 SecurityCheckCompleteEvent 安全校验通过用户事件 WebSocketServerProtocolHandler 会完成握手和pipeline上http相关handler去除, websocket相关handler添加的操作 PusMsgWebSocketFrameHandler 看 userEventTriggered方法, 若事件为SecurityCheckCompleteEvent 安全校验通过用户事件, 会进行session的创建, 维护session到map中; sessionId的创建 user.getUsername(),user.getAppId()组合 com.crazymaker.springcloud.websocket.session.ServerSession.buildSessionId 若事件为 HandshakeComplete 握手成功事件, 会向客户端发送登陆成功的响应 若事件为 IdleStateEvent, 空闲连接过久, 进行会话关闭 # 92:高并发推送实操:本地会话的创建、ID设置、session管理 ## 会话id的方案讨论 uuid + 用户账号 优势: 找到会话比较方便 劣势: 用户可能多端登录, 这个方案只能保证一个通道, 不能实现多端的信息同步 此时分布式会话是必须的, 我们必须知道用户连接的是哪个websocket-provider节点, 然后进行消息路由 在我们的消息中台中, 因为中继队列的消息生产后, websocket-provider中消费者设置的是广播模式, 所以不存在分布式会话的问题, 广播模式的设置在 com.crazymaker.springcloud.websocket.service.impl.PushMessage2NettyChannelService.startConsumer 中继队列的消息发送逻辑回顾一下, 在 com.crazymaker.cloud.nacos.demo.gateway.service.RocketmqMessageRelayService.putMessage ## session管理 其实上一节 客户端token验证逻辑详解 部分已经进行了分析 # 93:高并发推送实操:消息的订阅与推送 按照设计从业务系统后台到dispatcher-provider中间要通过网关, 我们自己测试可以直接通过swagger来进行, 不走网关, 否则逻辑太复杂了, 不好调试 ## 需要启动两个服务: websocket-provider websocket渠道服务 com.crazymaker.springcloud.websocket.start.WebSocketCloudApplication dispatcher-provider 消息分发服务 com.crazymaker.springcloud.message.start.MessageDispatchApplication ## 需要启动的模拟用户客户端测试类 com.crazymaker.springcloud.websocket.client.WebSocketMockClient 跑起来模拟用户登录, 用来接收分发的消息 都跑起来之后打开dispatcher-provider 消息分发服务的swagger页面 找到 /api/crazymaker/message/push/v1 com.crazymaker.springcloud.message.controller.MessageController.syncPush 发送消息