diff --git a/Makefile b/Makefile index b52a6e8c8972cd0aae029f03fa972454c6d0d643..90d5724aea389dc731a1b8dcd8cf969f5bb8ed3a 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ # 当需要升级版本时,执行该命令 -version=1.7.4 +version=1.7.5-SNAPSHOT update_version: sed -i '' 's/public static final String VERSION = ".*";/public static final String VERSION = "v${version}";/' aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java mvn -f smart-socket-parent/pom.xml versions:set -DnewVersion=${version} versions:commit diff --git a/aio-core/pom.xml b/aio-core/pom.xml index 10db4746282f0f7b1796f8c771ac6fbda2be1569..d6ce53ea4c11717312591e744046b84ae679a0e4 100644 --- a/aio-core/pom.xml +++ b/aio-core/pom.xml @@ -19,7 +19,7 @@ io.github.smartboot.socket smart-socket-parent - 1.7.4 + 1.7.5-SNAPSHOT ../smart-socket-parent diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java index e32d16eb25e512a6f2ebb687acd562751d7e2d5b..e5e6f5ceaee14edec9eeabf681c6a80519ba43f0 100644 --- a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java +++ b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java @@ -88,7 +88,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { //仅同步read会用到此线程资源 EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); removeOps(selectionKey, SelectionKey.OP_READ); - asynchronousSocketChannel.doRead(true); + asynchronousSocketChannel.doRead(true, false); } if ((interestOps & SelectionKey.OP_WRITE) > 0) { EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); @@ -97,7 +97,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { } }; - private static final Consumer readConsumer = selectionKey -> ((EnhanceAsynchronousSocketChannel) selectionKey.attachment()).doRead(true); + private static final Consumer readConsumer = selectionKey -> ((EnhanceAsynchronousSocketChannel) selectionKey.attachment()).doRead(true, false); /** * 初始化异步通道组实例。 @@ -147,7 +147,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { //仅同步read会用到此线程资源 EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); removeOps(selectionKey, SelectionKey.OP_READ); - asynchronousSocketChannel.doRead(true); + asynchronousSocketChannel.doRead(true, false); } else { throw new IllegalStateException("unexpect callback,key valid:" + selectionKey.isValid() + " ,interestOps:" + selectionKey.interestOps()); } diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java index 83709d688d67dc171f90b4d0d88d3c81178bfbaa..bdfb8815dd106432eb2cd905e54f2f624ac7868f 100644 --- a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java +++ b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java @@ -13,9 +13,11 @@ import java.io.IOException; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; import java.nio.channels.SocketChannel; import java.nio.channels.spi.AsynchronousChannelProvider; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -28,14 +30,26 @@ import java.util.concurrent.TimeUnit; * 1. 创建和管理异步通道组,支持多线程处理 * 2. 提供服务器端和客户端Socket通道的创建 * 3. 支持低内存模式运行,优化资源使用 - * + *

* 该类是smart-socket框架中异步IO实现的核心组件之一,通过NIO实现了类似JDK7 AIO的编程模型, * 但性能更优,资源占用更少。在低内存模式下,会采用特殊的内存管理策略以减少内存占用。 - * + * * @author 三刀 * @version V1.0 , 2020/5/25 */ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChannelProvider { + public static final ThreadLocal SYNC_READ_FLAG = ThreadLocal.withInitial(() -> false); + public static final CompletionHandler> SYNC_READ_HANDLER = new CompletionHandler>() { + @Override + public void completed(Integer result, CompletableFuture attachment) { + attachment.complete(result); + } + + @Override + public void failed(Throwable exc, CompletableFuture attachment) { + attachment.completeExceptionally(exc); + } + }; /** * 读监听信号 * 用于标识通道处于读监听状态,值为-2 @@ -61,7 +75,7 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne * 创建一个新的异步通道组 * 使用指定的线程数和线程工厂创建一个新的异步通道组,用于管理异步通道 * - * @param nThreads 线程池中的线程数量 + * @param nThreads 线程池中的线程数量 * @param threadFactory 创建线程的工厂类 * @return 返回新创建的异步通道组实例 * @throws IOException 如果创建过程中发生IO错误 @@ -73,8 +87,8 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne /** * 使用现有的线程池创建异步通道组 - * - * @param executor 用于执行异步IO操作的线程池 + * + * @param executor 用于执行异步IO操作的线程池 * @param initialSize 初始大小,用于确定内部数据结构的初始容量 * @return 返回新创建的异步通道组实例 * @throws IOException 如果创建过程中发生IO错误 @@ -87,7 +101,7 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne /** * 创建一个新的异步服务器Socket通道 * 用于服务器端接受客户端连接请求 - * + * * @param group 关联的异步通道组,用于管理该通道的IO操作 * @return 返回新创建的服务器Socket通道 * @throws IOException 如果创建过程中发生IO错误 @@ -100,7 +114,7 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne /** * 创建一个新的异步客户端Socket通道 * 用于客户端发起连接请求和数据传输 - * + * * @param group 关联的异步通道组,用于管理该通道的IO操作 * @return 返回新创建的客户端Socket通道 * @throws IOException 如果创建过程中发生IO错误 @@ -113,7 +127,7 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne /** * 检查并获取增强型异步通道组实例 * 验证传入的通道组是否为EnhanceAsynchronousChannelGroup类型 - * + * * @param group 待检查的异步通道组 * @return 返回转换后的增强型异步通道组 * @throws RuntimeException 如果传入的通道组类型不正确 diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java index 20b4ac545034ae524d1a2d47c543488c8058b05d..e2ecd23aaaf35714d41b2d0d9f8d9a73b3d55eff 100644 --- a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java +++ b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java @@ -30,13 +30,13 @@ import java.util.concurrent.Future; * 2. 支持异步接受连接操作 * 3. 管理服务器Socket的生命周期 * 4. 提供回调机制处理连接事件 - * + *

* 该类是服务器端网络编程的核心组件,通过非阻塞IO和事件通知机制,实现了高效的连接处理: * - 支持Future和CompletionHandler两种异步编程模式 * - 实现了连接请求的排队和限流处理,避免服务器资源耗尽 * - 提供了优雅的异常处理和资源管理机制 * - 在低内存模式下采用特殊的资源管理策略 - * + * * @author 三刀 * @version V1.0 , 2020/5/25 */ @@ -45,37 +45,32 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc * 底层的服务器Socket通道,用于实际的网络IO操作 */ private final ServerSocketChannel serverSocketChannel; - + /** * 异步通道组,用于管理通道的线程资源和事件分发 */ private final EnhanceAsynchronousChannelGroup enhanceAsynchronousChannelGroup; - + /** * 接受连接的回调处理器,用于处理新连接建立后的回调逻辑 */ private CompletionHandler acceptCompletionHandler; - - /** - * 用于Future方式调用时的回调处理器 - */ - private FutureCompletionHandler acceptFuture; - + /** * 接受连接操作的附加对象,可在回调时传递额外的上下文信息 */ private Object attachment; - + /** * 用于接受连接操作的选择键,管理通道的接受事件注册 */ private SelectionKey selectionKey; - + /** * 标识是否有待处理的接受连接操作 */ private boolean acceptPending; - + /** * 是否启用低内存模式 * 在低内存模式下,会采用特殊的内存管理策略以减少内存占用 @@ -91,9 +86,9 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 构造函数 * 创建一个新的增强型异步服务器Socket通道实例 - * + * * @param enhanceAsynchronousChannelGroup 关联的异步通道组,用于管理该通道的资源 - * @param lowMemory 是否启用低内存模式 + * @param lowMemory 是否启用低内存模式 * @throws IOException 如果创建底层通道时发生IO错误 */ EnhanceAsynchronousServerSocketChannel(EnhanceAsynchronousChannelGroup enhanceAsynchronousChannelGroup, boolean lowMemory) throws IOException { @@ -106,8 +101,8 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 将服务器Socket绑定到指定的本地地址 - * - * @param local 要绑定的本地地址 + * + * @param local 要绑定的本地地址 * @param backlog 连接请求队列的最大长度 * @return 返回当前服务器Socket通道实例 * @throws IOException 如果绑定操作失败 @@ -120,8 +115,8 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 设置服务器Socket的选项 - * - * @param name 选项名称 + * + * @param name 选项名称 * @param value 选项值 * @return 返回当前服务器Socket通道实例 * @throws IOException 如果设置选项时发生错误 @@ -134,7 +129,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 获取服务器Socket的选项值 - * + * * @param name 选项名称 * @return 返回指定选项的当前值 * @throws IOException 如果获取选项值时发生错误 @@ -146,7 +141,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 获取服务器Socket支持的所有选项 - * + * * @return 返回支持的选项集合 */ @Override @@ -157,9 +152,9 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 异步接受客户端连接请求 * 该方法实现了异步接受连接的功能,通过回调机制通知连接建立的结果 - * + * * @param attachment 附加对象,可在回调时获取 - * @param handler 连接完成的回调处理器 + * @param handler 连接完成的回调处理器 * @throws AcceptPendingException 如果已有一个待处理的接受连接操作 */ @Override @@ -183,12 +178,6 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc */ public void doAccept() { try { - //此前通过Future调用,且触发了cancel - if (acceptFuture != null && acceptFuture.isDone()) { - resetAccept(); - EnhanceAsynchronousChannelGroup.removeOps(selectionKey, SelectionKey.OP_ACCEPT); - return; - } SocketChannel socketChannel = null; if (acceptInvoker++ < EnhanceAsynchronousChannelGroup.MAX_INVOKER) { socketChannel = serverSocketChannel.accept(); @@ -232,7 +221,6 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc */ private void resetAccept() { acceptPending = false; - acceptFuture = null; acceptCompletionHandler = null; attachment = null; } @@ -240,20 +228,17 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 以Future方式接受连接 * 提供基于Future的异步接受连接方式,允许调用者通过Future对象获取连接结果 - * + * * @return 返回Future对象,可用于获取连接结果 */ @Override public Future accept() { - FutureCompletionHandler acceptFuture = new FutureCompletionHandler<>(); - accept(null, acceptFuture); - this.acceptFuture = acceptFuture; - return acceptFuture; + throw new UnsupportedOperationException(); } /** * 获取服务器Socket的本地地址 - * + * * @return 返回服务器Socket绑定的本地地址 * @throws IOException 如果获取地址时发生IO错误 */ @@ -264,7 +249,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 检查服务器Socket通道是否打开 - * + * * @return 如果通道处于打开状态返回true,否则返回false */ @Override @@ -275,7 +260,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc /** * 关闭服务器Socket通道 * 关闭底层的服务器Socket通道,释放相关资源 - * + * * @throws IOException 如果关闭时发生IO错误 */ @Override diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java index be2ddcecfdf27be6264cff9c3d79768c11d94c81..cac03cba0143b16fa18e9dfbc629a00914be9c96 100644 --- a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java +++ b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java @@ -24,6 +24,7 @@ import java.nio.channels.ShutdownChannelGroupException; import java.nio.channels.SocketChannel; import java.nio.channels.WritePendingException; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -151,7 +152,7 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { exception = e; } if (readCompletionHandler != null) { - doRead(true); + doRead(true, false); } if (readSelectionKey != null) { readSelectionKey.cancel(); @@ -237,9 +238,9 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { private void doConnect(SocketAddress remote, A attachment, CompletionHandler completionHandler) { try { //此前通过Future调用,且触发了cancel - if (completionHandler instanceof FutureCompletionHandler && ((FutureCompletionHandler) completionHandler).isDone()) { - return; - } +// if (completionHandler instanceof FutureCompletionHandler && ((FutureCompletionHandler) completionHandler).isDone()) { +// return; +// } boolean connected = channel.isConnectionPending(); if (connected || channel.connect(remote)) { connected = channel.finishConnect(); @@ -264,9 +265,7 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { @Override public Future connect(SocketAddress remote) { - FutureCompletionHandler connectFuture = new FutureCompletionHandler<>(); - connect(remote, null, connectFuture); - return connectFuture; + throw new UnsupportedOperationException(); } @Override @@ -284,13 +283,20 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { this.readBuffer = readBuffer; this.readAttachment = attachment; this.readCompletionHandler = (CompletionHandler) handler; - doRead(handler instanceof FutureCompletionHandler); + boolean syncRead = EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.get(); + doRead(syncRead, syncRead); } + @Override public final Future read(ByteBuffer readBuffer) { - FutureCompletionHandler readFuture = new FutureCompletionHandler<>(); - read(readBuffer, 0, TimeUnit.MILLISECONDS, null, readFuture); + CompletableFuture readFuture = new CompletableFuture<>(); + EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.set(true); + try { + read(readBuffer, 0, TimeUnit.MILLISECONDS, readFuture, EnhanceAsynchronousChannelProvider.SYNC_READ_HANDLER); + } finally { + EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.set(false); + } return readFuture; } @@ -342,17 +348,17 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { * * @param direct 是否直接读取,true表示立即读取,false表示通过事件触发读取 */ - public final void doRead(boolean direct) { + public final void doRead(boolean direct, boolean switchThread) { try { if (readCompletionHandler == null) { return; } // 处理Future调用被取消的情况 - if (readCompletionHandler instanceof FutureCompletionHandler && ((FutureCompletionHandler) readCompletionHandler).isDone()) { - EnhanceAsynchronousChannelGroup.removeOps(readSelectionKey, SelectionKey.OP_READ); - resetRead(); - return; - } +// if (readCompletionHandler instanceof FutureCompletionHandler && ((FutureCompletionHandler) readCompletionHandler).isDone()) { +// EnhanceAsynchronousChannelGroup.removeOps(readSelectionKey, SelectionKey.OP_READ); +// resetRead(); +// return; +// } // 低内存模式下的特殊处理:当没有缓冲区时,直接返回可读信号 if (lowMemory && direct && readBuffer == null) { CompletionHandler completionHandler = readCompletionHandler; @@ -376,23 +382,26 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { } //注册至异步线程 - if (readSize == 0 && readCompletionHandler instanceof FutureCompletionHandler) { - EnhanceAsynchronousChannelGroup.removeOps(readSelectionKey, SelectionKey.OP_READ); - group().commonWorker.addRegister(selector -> { - try { - channel.register(selector, SelectionKey.OP_READ, EnhanceAsynchronousSocketChannel.this); - } catch (ClosedChannelException e) { - doRead(true); - } - }); - return; - } - //释放内存 - if (lowMemory && readSize == 0 && readBuffer.position() == 0) { - readBuffer = null; - readCompletionHandler.completed(EnhanceAsynchronousChannelProvider.READ_MONITOR_SIGNAL, readAttachment); + if (readSize == 0) { + if (switchThread) { + EnhanceAsynchronousChannelGroup.removeOps(readSelectionKey, SelectionKey.OP_READ); + group().commonWorker.addRegister(selector -> { + try { + channel.register(selector, SelectionKey.OP_READ, EnhanceAsynchronousSocketChannel.this); + } catch (ClosedChannelException e) { + doRead(true, false); + } + }); + return; + } + //释放内存 + if (lowMemory && readBuffer.position() == 0) { + readBuffer = null; + readCompletionHandler.completed(EnhanceAsynchronousChannelProvider.READ_MONITOR_SIGNAL, readAttachment); + } } + if (readSize != 0 || !hasRemain) { CompletionHandler completionHandler = readCompletionHandler; Object attach = readAttachment; diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java b/aio-core/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java deleted file mode 100644 index 4f72b44f1880e084da8f80d623f5e6fd893ccced..0000000000000000000000000000000000000000 --- a/aio-core/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java +++ /dev/null @@ -1,141 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2017-2021, org.smartboot. All rights reserved. - * project name: smart-socket - * file name: FutureCompletionHandler.java - * Date: 2021-07-29 - * Author: sandao (zhengjunweimail@163.com) - * - ******************************************************************************/ - -package org.smartboot.socket.enhance; - -import java.nio.channels.CompletionHandler; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * 一个同时实现了CompletionHandler和Future接口的工具类,用于异步操作的完成处理和Future模式支持。 - *

- * 该类提供了以下功能: - * 1. 作为CompletionHandler处理异步操作的完成和失败回调 - * 2. 作为Future提供异步操作的结果获取、取消和状态查询 - * 3. 支持带超时的异步操作结果获取 - *

- * @param 异步操作的结果类型 - * @param 异步操作的附加参数类型 - */ -public final class FutureCompletionHandler implements CompletionHandler, Future { - /** 异步操作的执行结果 */ - private V result; - /** 标记异步操作是否已完成(成功完成、失败或被取消) */ - private boolean done = false; - /** 标记异步操作是否被取消 */ - private boolean cancel = false; - /** 异步操作执行过程中发生的异常 */ - private Throwable exception; - - /** - * 异步操作成功完成时的回调方法 - * @param result 异步操作的执行结果 - * @param selectionKey 异步操作的附加参数 - */ - @Override - public void completed(V result, A selectionKey) { - this.result = result; - done = true; - synchronized (this) { - this.notify(); - } - } - - /** - * 异步操作失败时的回调方法 - * @param exc 异步操作执行过程中发生的异常 - * @param attachment 异步操作的附加参数 - */ - @Override - public void failed(Throwable exc, A attachment) { - exception = exc; - done = true; - } - - /** - * 尝试取消异步操作的执行 - * @param mayInterruptIfRunning 是否允许中断正在执行的操作(在此实现中该参数不起作用) - * @return 如果操作成功取消返回true,如果操作已完成或已被取消返回false - */ - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (done || cancel) { - return false; - } - cancel = true; - done = true; - synchronized (this) { - notify(); - } - return true; - } - - /** - * 检查异步操作是否被取消 - * @return 如果操作已被取消返回true,否则返回false - */ - @Override - public boolean isCancelled() { - return cancel; - } - - /** - * 检查异步操作是否已完成 - * @return 如果操作已完成(包括成功完成、失败或被取消)返回true,否则返回false - */ - @Override - public boolean isDone() { - return done; - } - - /** - * 获取异步操作的执行结果,如果操作未完成则阻塞等待 - * @return 异步操作的执行结果 - * @throws InterruptedException 等待过程中线程被中断 - * @throws ExecutionException 异步操作执行过程中发生异常 - */ - @Override - public synchronized V get() throws InterruptedException, ExecutionException { - if (done) { - if (exception != null) { - throw new ExecutionException(exception); - } - return result; - } else { - wait(); - } - return get(); - } - - /** - * 在指定的超时时间内获取异步操作的执行结果,如果操作未完成则阻塞等待 - * @param timeout 超时时间 - * @param unit 时间单位 - * @return 异步操作的执行结果 - * @throws InterruptedException 等待过程中线程被中断 - * @throws ExecutionException 异步操作执行过程中发生异常 - * @throws TimeoutException 超过指定时间仍未获得结果 - */ - @Override - public synchronized V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - if (done) { - return get(); - } else { - wait(unit.toMillis(timeout)); - } - if (done) { - return get(); - } - throw new TimeoutException(); - } - -} diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java index 3cede2c44bab4aef65438e1d5e4408b4c9999396..818957f9071c21ac009257a064581b347aef3e5f 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java @@ -38,7 +38,7 @@ final class IoServerConfig { /** * 当前smart-socket版本号 */ - public static final String VERSION = "v1.7.4"; + public static final String VERSION = "v1.7.5-SNAPSHOT"; /** * 消息体缓存大小,字节 diff --git a/aio-pro/pom.xml b/aio-pro/pom.xml index 0c9c36801f526962431d452c18bb2e6e74d67a26..cd994b48a405f85eb396b5516d10beda9dba5d61 100644 --- a/aio-pro/pom.xml +++ b/aio-pro/pom.xml @@ -19,7 +19,7 @@ io.github.smartboot.socket smart-socket-parent - 1.7.4 + 1.7.5-SNAPSHOT ../smart-socket-parent diff --git a/aio-pro/src/main/java/org/smartboot/socket/extension/multiplex/MultiplexClient.java b/aio-pro/src/main/java/org/smartboot/socket/extension/multiplex/MultiplexClient.java index b126fc6750c8c3de1a17a78cffaa53f9f588599e..b55b96a0d966bf542016b47116e4f85cf37035ca 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/extension/multiplex/MultiplexClient.java +++ b/aio-pro/src/main/java/org/smartboot/socket/extension/multiplex/MultiplexClient.java @@ -201,9 +201,9 @@ public class MultiplexClient { boolean noneSslPlugin = true; // 添加配置的插件 for (Plugin responsePlugin : multiplexOptions.getPlugins()) { - processor.addPlugin(responsePlugin); if (responsePlugin instanceof SslPlugin) { noneSslPlugin = false; + break; } } @@ -216,7 +216,10 @@ public class MultiplexClient { if (multiplexOptions.idleTimeout() > 0) { processor.addPlugin(new IdleStatePlugin<>(multiplexOptions.idleTimeout())); } - + // 添加配置的插件 + for (Plugin responsePlugin : multiplexOptions.getPlugins()) { + processor.addPlugin(responsePlugin); + } firstConnected = false; } } diff --git a/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java b/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java index c3ef601f703bfeb4ce6c4bb557daf0a43b0ec074..092273a681b34c7feb4e15502f32fa5730e37d45 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java +++ b/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java @@ -13,7 +13,6 @@ import org.smartboot.socket.buffer.BufferPagePool; import org.smartboot.socket.buffer.VirtualBuffer; import org.smartboot.socket.channels.AsynchronousSocketChannelProxy; import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider; -import org.smartboot.socket.enhance.FutureCompletionHandler; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; @@ -22,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -227,8 +227,13 @@ public class SslAsynchronousSocketChannel extends AsynchronousSocketChannelProxy @Override public Future read(ByteBuffer dst) { - FutureCompletionHandler readFuture = new FutureCompletionHandler<>(); - read(dst, 0, TimeUnit.MILLISECONDS, null, readFuture); + CompletableFuture readFuture = new CompletableFuture<>(); + EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.set(true); + try { + read(dst, 0, TimeUnit.MILLISECONDS, readFuture, EnhanceAsynchronousChannelProvider.SYNC_READ_HANDLER); + } finally { + EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.set(false); + } return readFuture; } diff --git a/benchmark/pom.xml b/benchmark/pom.xml index eeba86f7fc4001b399f96b4bee900530b79a6dc0..3148921084598f7772dcf6d9f44613dc29f297f8 100644 --- a/benchmark/pom.xml +++ b/benchmark/pom.xml @@ -18,7 +18,7 @@ io.github.smartboot.socket aio-pro - 1.7.4 + 1.7.5-SNAPSHOT org.slf4j diff --git a/example/pom.xml b/example/pom.xml index d39a0985d42cdd13bcc0d56a6e2f8406fa807f6d..11d6778fabb01ab7588c9d34c5da9918a964df85 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -23,7 +23,7 @@ io.github.smartboot.socket aio-pro - 1.7.4 + 1.7.5-SNAPSHOT org.apache.commons diff --git a/pom.xml b/pom.xml index 11eb2a7ab9272209e6ca254ba6963df1f1792046..486a2dab566f9862e622d28d8895a7650006a32e 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ io.github.smartboot.socket smart-socket-parent - 1.7.4 + 1.7.5-SNAPSHOT 2.6 diff --git a/smart-socket-parent/pom.xml b/smart-socket-parent/pom.xml index b71375a8cf69b7797d70923ad347c8d1dbfb5fad..f93a84fd2a2d97c6b46b7773f96113a3270ed620 100644 --- a/smart-socket-parent/pom.xml +++ b/smart-socket-parent/pom.xml @@ -15,7 +15,7 @@ 4.0.0 io.github.smartboot.socket smart-socket-parent - 1.7.4 + 1.7.5-SNAPSHOT pom