readConsumer = selectionKey -> ((EnhanceAsynchronousSocketChannel) selectionKey.attachment()).doRead(true, false);
/**
* 初始化异步通道组实例。
@@ -147,7 +147,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup {
//仅同步read会用到此线程资源
EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment();
removeOps(selectionKey, SelectionKey.OP_READ);
- asynchronousSocketChannel.doRead(true);
+ asynchronousSocketChannel.doRead(true, false);
} else {
throw new IllegalStateException("unexpect callback,key valid:" + selectionKey.isValid() + " ,interestOps:" + selectionKey.interestOps());
}
diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java
index 83709d688d67dc171f90b4d0d88d3c81178bfbaa..bdfb8815dd106432eb2cd905e54f2f624ac7868f 100644
--- a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java
+++ b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java
@@ -13,9 +13,11 @@ import java.io.IOException;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AsynchronousChannelProvider;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -28,14 +30,26 @@ import java.util.concurrent.TimeUnit;
* 1. 创建和管理异步通道组,支持多线程处理
* 2. 提供服务器端和客户端Socket通道的创建
* 3. 支持低内存模式运行,优化资源使用
- *
+ *
* 该类是smart-socket框架中异步IO实现的核心组件之一,通过NIO实现了类似JDK7 AIO的编程模型,
* 但性能更优,资源占用更少。在低内存模式下,会采用特殊的内存管理策略以减少内存占用。
- *
+ *
* @author 三刀
* @version V1.0 , 2020/5/25
*/
public final class EnhanceAsynchronousChannelProvider extends AsynchronousChannelProvider {
+ public static final ThreadLocal SYNC_READ_FLAG = ThreadLocal.withInitial(() -> false);
+ public static final CompletionHandler> SYNC_READ_HANDLER = new CompletionHandler>() {
+ @Override
+ public void completed(Integer result, CompletableFuture attachment) {
+ attachment.complete(result);
+ }
+
+ @Override
+ public void failed(Throwable exc, CompletableFuture attachment) {
+ attachment.completeExceptionally(exc);
+ }
+ };
/**
* 读监听信号
* 用于标识通道处于读监听状态,值为-2
@@ -61,7 +75,7 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne
* 创建一个新的异步通道组
* 使用指定的线程数和线程工厂创建一个新的异步通道组,用于管理异步通道
*
- * @param nThreads 线程池中的线程数量
+ * @param nThreads 线程池中的线程数量
* @param threadFactory 创建线程的工厂类
* @return 返回新创建的异步通道组实例
* @throws IOException 如果创建过程中发生IO错误
@@ -73,8 +87,8 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne
/**
* 使用现有的线程池创建异步通道组
- *
- * @param executor 用于执行异步IO操作的线程池
+ *
+ * @param executor 用于执行异步IO操作的线程池
* @param initialSize 初始大小,用于确定内部数据结构的初始容量
* @return 返回新创建的异步通道组实例
* @throws IOException 如果创建过程中发生IO错误
@@ -87,7 +101,7 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne
/**
* 创建一个新的异步服务器Socket通道
* 用于服务器端接受客户端连接请求
- *
+ *
* @param group 关联的异步通道组,用于管理该通道的IO操作
* @return 返回新创建的服务器Socket通道
* @throws IOException 如果创建过程中发生IO错误
@@ -100,7 +114,7 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne
/**
* 创建一个新的异步客户端Socket通道
* 用于客户端发起连接请求和数据传输
- *
+ *
* @param group 关联的异步通道组,用于管理该通道的IO操作
* @return 返回新创建的客户端Socket通道
* @throws IOException 如果创建过程中发生IO错误
@@ -113,7 +127,7 @@ public final class EnhanceAsynchronousChannelProvider extends AsynchronousChanne
/**
* 检查并获取增强型异步通道组实例
* 验证传入的通道组是否为EnhanceAsynchronousChannelGroup类型
- *
+ *
* @param group 待检查的异步通道组
* @return 返回转换后的增强型异步通道组
* @throws RuntimeException 如果传入的通道组类型不正确
diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java
index 20b4ac545034ae524d1a2d47c543488c8058b05d..e2ecd23aaaf35714d41b2d0d9f8d9a73b3d55eff 100644
--- a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java
+++ b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java
@@ -30,13 +30,13 @@ import java.util.concurrent.Future;
* 2. 支持异步接受连接操作
* 3. 管理服务器Socket的生命周期
* 4. 提供回调机制处理连接事件
- *
+ *
* 该类是服务器端网络编程的核心组件,通过非阻塞IO和事件通知机制,实现了高效的连接处理:
* - 支持Future和CompletionHandler两种异步编程模式
* - 实现了连接请求的排队和限流处理,避免服务器资源耗尽
* - 提供了优雅的异常处理和资源管理机制
* - 在低内存模式下采用特殊的资源管理策略
- *
+ *
* @author 三刀
* @version V1.0 , 2020/5/25
*/
@@ -45,37 +45,32 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc
* 底层的服务器Socket通道,用于实际的网络IO操作
*/
private final ServerSocketChannel serverSocketChannel;
-
+
/**
* 异步通道组,用于管理通道的线程资源和事件分发
*/
private final EnhanceAsynchronousChannelGroup enhanceAsynchronousChannelGroup;
-
+
/**
* 接受连接的回调处理器,用于处理新连接建立后的回调逻辑
*/
private CompletionHandler acceptCompletionHandler;
-
- /**
- * 用于Future方式调用时的回调处理器
- */
- private FutureCompletionHandler acceptFuture;
-
+
/**
* 接受连接操作的附加对象,可在回调时传递额外的上下文信息
*/
private Object attachment;
-
+
/**
* 用于接受连接操作的选择键,管理通道的接受事件注册
*/
private SelectionKey selectionKey;
-
+
/**
* 标识是否有待处理的接受连接操作
*/
private boolean acceptPending;
-
+
/**
* 是否启用低内存模式
* 在低内存模式下,会采用特殊的内存管理策略以减少内存占用
@@ -91,9 +86,9 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc
/**
* 构造函数
* 创建一个新的增强型异步服务器Socket通道实例
- *
+ *
* @param enhanceAsynchronousChannelGroup 关联的异步通道组,用于管理该通道的资源
- * @param lowMemory 是否启用低内存模式
+ * @param lowMemory 是否启用低内存模式
* @throws IOException 如果创建底层通道时发生IO错误
*/
EnhanceAsynchronousServerSocketChannel(EnhanceAsynchronousChannelGroup enhanceAsynchronousChannelGroup, boolean lowMemory) throws IOException {
@@ -106,8 +101,8 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc
/**
* 将服务器Socket绑定到指定的本地地址
- *
- * @param local 要绑定的本地地址
+ *
+ * @param local 要绑定的本地地址
* @param backlog 连接请求队列的最大长度
* @return 返回当前服务器Socket通道实例
* @throws IOException 如果绑定操作失败
@@ -120,8 +115,8 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc
/**
* 设置服务器Socket的选项
- *
- * @param name 选项名称
+ *
+ * @param name 选项名称
* @param value 选项值
* @return 返回当前服务器Socket通道实例
* @throws IOException 如果设置选项时发生错误
@@ -134,7 +129,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc
/**
* 获取服务器Socket的选项值
- *
+ *
* @param name 选项名称
* @return 返回指定选项的当前值
* @throws IOException 如果获取选项值时发生错误
@@ -146,7 +141,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc
/**
* 获取服务器Socket支持的所有选项
- *
+ *
* @return 返回支持的选项集合
*/
@Override
@@ -157,9 +152,9 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc
/**
* 异步接受客户端连接请求
* 该方法实现了异步接受连接的功能,通过回调机制通知连接建立的结果
- *
+ *
* @param attachment 附加对象,可在回调时获取
- * @param handler 连接完成的回调处理器
+ * @param handler 连接完成的回调处理器
* @throws AcceptPendingException 如果已有一个待处理的接受连接操作
*/
@Override
@@ -183,12 +178,6 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc
*/
public void doAccept() {
try {
- //此前通过Future调用,且触发了cancel
- if (acceptFuture != null && acceptFuture.isDone()) {
- resetAccept();
- EnhanceAsynchronousChannelGroup.removeOps(selectionKey, SelectionKey.OP_ACCEPT);
- return;
- }
SocketChannel socketChannel = null;
if (acceptInvoker++ < EnhanceAsynchronousChannelGroup.MAX_INVOKER) {
socketChannel = serverSocketChannel.accept();
@@ -232,7 +221,6 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc
*/
private void resetAccept() {
acceptPending = false;
- acceptFuture = null;
acceptCompletionHandler = null;
attachment = null;
}
@@ -240,20 +228,17 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc
/**
* 以Future方式接受连接
* 提供基于Future的异步接受连接方式,允许调用者通过Future对象获取连接结果
- *
+ *
* @return 返回Future对象,可用于获取连接结果
*/
@Override
public Future accept() {
- FutureCompletionHandler acceptFuture = new FutureCompletionHandler<>();
- accept(null, acceptFuture);
- this.acceptFuture = acceptFuture;
- return acceptFuture;
+ throw new UnsupportedOperationException();
}
/**
* 获取服务器Socket的本地地址
- *
+ *
* @return 返回服务器Socket绑定的本地地址
* @throws IOException 如果获取地址时发生IO错误
*/
@@ -264,7 +249,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc
/**
* 检查服务器Socket通道是否打开
- *
+ *
* @return 如果通道处于打开状态返回true,否则返回false
*/
@Override
@@ -275,7 +260,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc
/**
* 关闭服务器Socket通道
* 关闭底层的服务器Socket通道,释放相关资源
- *
+ *
* @throws IOException 如果关闭时发生IO错误
*/
@Override
diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java
index be2ddcecfdf27be6264cff9c3d79768c11d94c81..cac03cba0143b16fa18e9dfbc629a00914be9c96 100644
--- a/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java
+++ b/aio-core/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java
@@ -24,6 +24,7 @@ import java.nio.channels.ShutdownChannelGroupException;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritePendingException;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -151,7 +152,7 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
exception = e;
}
if (readCompletionHandler != null) {
- doRead(true);
+ doRead(true, false);
}
if (readSelectionKey != null) {
readSelectionKey.cancel();
@@ -237,9 +238,9 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
private void doConnect(SocketAddress remote, A attachment, CompletionHandler completionHandler) {
try {
//此前通过Future调用,且触发了cancel
- if (completionHandler instanceof FutureCompletionHandler && ((FutureCompletionHandler) completionHandler).isDone()) {
- return;
- }
+// if (completionHandler instanceof FutureCompletionHandler && ((FutureCompletionHandler) completionHandler).isDone()) {
+// return;
+// }
boolean connected = channel.isConnectionPending();
if (connected || channel.connect(remote)) {
connected = channel.finishConnect();
@@ -264,9 +265,7 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
@Override
public Future connect(SocketAddress remote) {
- FutureCompletionHandler connectFuture = new FutureCompletionHandler<>();
- connect(remote, null, connectFuture);
- return connectFuture;
+ throw new UnsupportedOperationException();
}
@Override
@@ -284,13 +283,20 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
this.readBuffer = readBuffer;
this.readAttachment = attachment;
this.readCompletionHandler = (CompletionHandler) handler;
- doRead(handler instanceof FutureCompletionHandler);
+ boolean syncRead = EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.get();
+ doRead(syncRead, syncRead);
}
+
@Override
public final Future read(ByteBuffer readBuffer) {
- FutureCompletionHandler readFuture = new FutureCompletionHandler<>();
- read(readBuffer, 0, TimeUnit.MILLISECONDS, null, readFuture);
+ CompletableFuture readFuture = new CompletableFuture<>();
+ EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.set(true);
+ try {
+ read(readBuffer, 0, TimeUnit.MILLISECONDS, readFuture, EnhanceAsynchronousChannelProvider.SYNC_READ_HANDLER);
+ } finally {
+ EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.set(false);
+ }
return readFuture;
}
@@ -342,17 +348,17 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
*
* @param direct 是否直接读取,true表示立即读取,false表示通过事件触发读取
*/
- public final void doRead(boolean direct) {
+ public final void doRead(boolean direct, boolean switchThread) {
try {
if (readCompletionHandler == null) {
return;
}
// 处理Future调用被取消的情况
- if (readCompletionHandler instanceof FutureCompletionHandler && ((FutureCompletionHandler) readCompletionHandler).isDone()) {
- EnhanceAsynchronousChannelGroup.removeOps(readSelectionKey, SelectionKey.OP_READ);
- resetRead();
- return;
- }
+// if (readCompletionHandler instanceof FutureCompletionHandler && ((FutureCompletionHandler) readCompletionHandler).isDone()) {
+// EnhanceAsynchronousChannelGroup.removeOps(readSelectionKey, SelectionKey.OP_READ);
+// resetRead();
+// return;
+// }
// 低内存模式下的特殊处理:当没有缓冲区时,直接返回可读信号
if (lowMemory && direct && readBuffer == null) {
CompletionHandler completionHandler = readCompletionHandler;
@@ -376,23 +382,26 @@ class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
}
//注册至异步线程
- if (readSize == 0 && readCompletionHandler instanceof FutureCompletionHandler) {
- EnhanceAsynchronousChannelGroup.removeOps(readSelectionKey, SelectionKey.OP_READ);
- group().commonWorker.addRegister(selector -> {
- try {
- channel.register(selector, SelectionKey.OP_READ, EnhanceAsynchronousSocketChannel.this);
- } catch (ClosedChannelException e) {
- doRead(true);
- }
- });
- return;
- }
- //释放内存
- if (lowMemory && readSize == 0 && readBuffer.position() == 0) {
- readBuffer = null;
- readCompletionHandler.completed(EnhanceAsynchronousChannelProvider.READ_MONITOR_SIGNAL, readAttachment);
+ if (readSize == 0) {
+ if (switchThread) {
+ EnhanceAsynchronousChannelGroup.removeOps(readSelectionKey, SelectionKey.OP_READ);
+ group().commonWorker.addRegister(selector -> {
+ try {
+ channel.register(selector, SelectionKey.OP_READ, EnhanceAsynchronousSocketChannel.this);
+ } catch (ClosedChannelException e) {
+ doRead(true, false);
+ }
+ });
+ return;
+ }
+ //释放内存
+ if (lowMemory && readBuffer.position() == 0) {
+ readBuffer = null;
+ readCompletionHandler.completed(EnhanceAsynchronousChannelProvider.READ_MONITOR_SIGNAL, readAttachment);
+ }
}
+
if (readSize != 0 || !hasRemain) {
CompletionHandler completionHandler = readCompletionHandler;
Object attach = readAttachment;
diff --git a/aio-core/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java b/aio-core/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java
deleted file mode 100644
index 4f72b44f1880e084da8f80d623f5e6fd893ccced..0000000000000000000000000000000000000000
--- a/aio-core/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2017-2021, org.smartboot. All rights reserved.
- * project name: smart-socket
- * file name: FutureCompletionHandler.java
- * Date: 2021-07-29
- * Author: sandao (zhengjunweimail@163.com)
- *
- ******************************************************************************/
-
-package org.smartboot.socket.enhance;
-
-import java.nio.channels.CompletionHandler;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * 一个同时实现了CompletionHandler和Future接口的工具类,用于异步操作的完成处理和Future模式支持。
- *
- * 该类提供了以下功能:
- * 1. 作为CompletionHandler处理异步操作的完成和失败回调
- * 2. 作为Future提供异步操作的结果获取、取消和状态查询
- * 3. 支持带超时的异步操作结果获取
- *
- * @param 异步操作的结果类型
- * @param 异步操作的附加参数类型
- */
-public final class FutureCompletionHandler implements CompletionHandler, Future {
- /** 异步操作的执行结果 */
- private V result;
- /** 标记异步操作是否已完成(成功完成、失败或被取消) */
- private boolean done = false;
- /** 标记异步操作是否被取消 */
- private boolean cancel = false;
- /** 异步操作执行过程中发生的异常 */
- private Throwable exception;
-
- /**
- * 异步操作成功完成时的回调方法
- * @param result 异步操作的执行结果
- * @param selectionKey 异步操作的附加参数
- */
- @Override
- public void completed(V result, A selectionKey) {
- this.result = result;
- done = true;
- synchronized (this) {
- this.notify();
- }
- }
-
- /**
- * 异步操作失败时的回调方法
- * @param exc 异步操作执行过程中发生的异常
- * @param attachment 异步操作的附加参数
- */
- @Override
- public void failed(Throwable exc, A attachment) {
- exception = exc;
- done = true;
- }
-
- /**
- * 尝试取消异步操作的执行
- * @param mayInterruptIfRunning 是否允许中断正在执行的操作(在此实现中该参数不起作用)
- * @return 如果操作成功取消返回true,如果操作已完成或已被取消返回false
- */
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (done || cancel) {
- return false;
- }
- cancel = true;
- done = true;
- synchronized (this) {
- notify();
- }
- return true;
- }
-
- /**
- * 检查异步操作是否被取消
- * @return 如果操作已被取消返回true,否则返回false
- */
- @Override
- public boolean isCancelled() {
- return cancel;
- }
-
- /**
- * 检查异步操作是否已完成
- * @return 如果操作已完成(包括成功完成、失败或被取消)返回true,否则返回false
- */
- @Override
- public boolean isDone() {
- return done;
- }
-
- /**
- * 获取异步操作的执行结果,如果操作未完成则阻塞等待
- * @return 异步操作的执行结果
- * @throws InterruptedException 等待过程中线程被中断
- * @throws ExecutionException 异步操作执行过程中发生异常
- */
- @Override
- public synchronized V get() throws InterruptedException, ExecutionException {
- if (done) {
- if (exception != null) {
- throw new ExecutionException(exception);
- }
- return result;
- } else {
- wait();
- }
- return get();
- }
-
- /**
- * 在指定的超时时间内获取异步操作的执行结果,如果操作未完成则阻塞等待
- * @param timeout 超时时间
- * @param unit 时间单位
- * @return 异步操作的执行结果
- * @throws InterruptedException 等待过程中线程被中断
- * @throws ExecutionException 异步操作执行过程中发生异常
- * @throws TimeoutException 超过指定时间仍未获得结果
- */
- @Override
- public synchronized V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- if (done) {
- return get();
- } else {
- wait(unit.toMillis(timeout));
- }
- if (done) {
- return get();
- }
- throw new TimeoutException();
- }
-
-}
diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java
index 3cede2c44bab4aef65438e1d5e4408b4c9999396..818957f9071c21ac009257a064581b347aef3e5f 100644
--- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java
+++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java
@@ -38,7 +38,7 @@ final class IoServerConfig {
/**
* 当前smart-socket版本号
*/
- public static final String VERSION = "v1.7.4";
+ public static final String VERSION = "v1.7.5-SNAPSHOT";
/**
* 消息体缓存大小,字节
diff --git a/aio-pro/pom.xml b/aio-pro/pom.xml
index 0c9c36801f526962431d452c18bb2e6e74d67a26..cd994b48a405f85eb396b5516d10beda9dba5d61 100644
--- a/aio-pro/pom.xml
+++ b/aio-pro/pom.xml
@@ -19,7 +19,7 @@
io.github.smartboot.socket
smart-socket-parent
- 1.7.4
+ 1.7.5-SNAPSHOT
../smart-socket-parent
diff --git a/aio-pro/src/main/java/org/smartboot/socket/extension/multiplex/MultiplexClient.java b/aio-pro/src/main/java/org/smartboot/socket/extension/multiplex/MultiplexClient.java
index b126fc6750c8c3de1a17a78cffaa53f9f588599e..b55b96a0d966bf542016b47116e4f85cf37035ca 100644
--- a/aio-pro/src/main/java/org/smartboot/socket/extension/multiplex/MultiplexClient.java
+++ b/aio-pro/src/main/java/org/smartboot/socket/extension/multiplex/MultiplexClient.java
@@ -201,9 +201,9 @@ public class MultiplexClient {
boolean noneSslPlugin = true;
// 添加配置的插件
for (Plugin responsePlugin : multiplexOptions.getPlugins()) {
- processor.addPlugin(responsePlugin);
if (responsePlugin instanceof SslPlugin) {
noneSslPlugin = false;
+ break;
}
}
@@ -216,7 +216,10 @@ public class MultiplexClient {
if (multiplexOptions.idleTimeout() > 0) {
processor.addPlugin(new IdleStatePlugin<>(multiplexOptions.idleTimeout()));
}
-
+ // 添加配置的插件
+ for (Plugin responsePlugin : multiplexOptions.getPlugins()) {
+ processor.addPlugin(responsePlugin);
+ }
firstConnected = false;
}
}
diff --git a/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java b/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java
index c3ef601f703bfeb4ce6c4bb557daf0a43b0ec074..092273a681b34c7feb4e15502f32fa5730e37d45 100644
--- a/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java
+++ b/aio-pro/src/main/java/org/smartboot/socket/extension/ssl/SslAsynchronousSocketChannel.java
@@ -13,7 +13,6 @@ import org.smartboot.socket.buffer.BufferPagePool;
import org.smartboot.socket.buffer.VirtualBuffer;
import org.smartboot.socket.channels.AsynchronousSocketChannelProxy;
import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider;
-import org.smartboot.socket.enhance.FutureCompletionHandler;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
@@ -22,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -227,8 +227,13 @@ public class SslAsynchronousSocketChannel extends AsynchronousSocketChannelProxy
@Override
public Future read(ByteBuffer dst) {
- FutureCompletionHandler readFuture = new FutureCompletionHandler<>();
- read(dst, 0, TimeUnit.MILLISECONDS, null, readFuture);
+ CompletableFuture readFuture = new CompletableFuture<>();
+ EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.set(true);
+ try {
+ read(dst, 0, TimeUnit.MILLISECONDS, readFuture, EnhanceAsynchronousChannelProvider.SYNC_READ_HANDLER);
+ } finally {
+ EnhanceAsynchronousChannelProvider.SYNC_READ_FLAG.set(false);
+ }
return readFuture;
}
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index eeba86f7fc4001b399f96b4bee900530b79a6dc0..3148921084598f7772dcf6d9f44613dc29f297f8 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -18,7 +18,7 @@
io.github.smartboot.socket
aio-pro
- 1.7.4
+ 1.7.5-SNAPSHOT
org.slf4j
diff --git a/example/pom.xml b/example/pom.xml
index d39a0985d42cdd13bcc0d56a6e2f8406fa807f6d..11d6778fabb01ab7588c9d34c5da9918a964df85 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -23,7 +23,7 @@
io.github.smartboot.socket
aio-pro
- 1.7.4
+ 1.7.5-SNAPSHOT
org.apache.commons
diff --git a/pom.xml b/pom.xml
index 11eb2a7ab9272209e6ca254ba6963df1f1792046..486a2dab566f9862e622d28d8895a7650006a32e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
io.github.smartboot.socket
smart-socket-parent
- 1.7.4
+ 1.7.5-SNAPSHOT
2.6
diff --git a/smart-socket-parent/pom.xml b/smart-socket-parent/pom.xml
index b71375a8cf69b7797d70923ad347c8d1dbfb5fad..f93a84fd2a2d97c6b46b7773f96113a3270ed620 100644
--- a/smart-socket-parent/pom.xml
+++ b/smart-socket-parent/pom.xml
@@ -15,7 +15,7 @@
4.0.0
io.github.smartboot.socket
smart-socket-parent
- 1.7.4
+ 1.7.5-SNAPSHOT
pom