From 1e5b77be0acdaad09a380d332288705fd6df9e2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E9=A3=9E?= <1835698775@qq.com> Date: Sun, 14 May 2023 23:12:28 +0800 Subject: [PATCH 01/11] =?UTF-8?q?session=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/com/feige/api/base/Service.java | 30 ---- .../com/feige/api/context/Application.java | 13 ++ .../com/feige/api/context/Disposable.java | 10 ++ .../java/com/feige/api/context/Lifecycle.java | 27 ++++ .../feige/api/context/LifecycleAdapter.java | 19 +++ .../feige/api/{base => context}/Listener.java | 2 +- .../java/com/feige/api/sc/AbstractClient.java | 10 ++ .../com/feige/api/sc/AbstractEndpoint.java | 136 ++++++++++++++++++ .../java/com/feige/api/sc/AbstractServer.java | 25 ++++ .../main/java/com/feige/api/sc/Client.java | 10 ++ .../main/java/com/feige/api/sc/Endpoint.java | 67 +++++++++ .../main/java/com/feige/api/sc/IClient.java | 8 -- .../main/java/com/feige/api/sc/IServer.java | 15 -- .../main/java/com/feige/api/sc/Server.java | 21 +++ .../com/feige/api/session/RemoteSession.java | 4 +- .../api/srd/IServiceRegistryAndDiscovery.java | 2 +- X-fim-common/pom.xml | 1 - X-fim-netty/pom.xml | 1 - .../feige/fim/server/tcp/NettyTcpServer.java | 70 ++++++--- .../feige/fim/server/udp/NettyUdpServer.java | 32 +---- .../feige/fim/server/ws/NettyWsServer.java | 26 +--- 21 files changed, 397 insertions(+), 132 deletions(-) delete mode 100644 X-fim-api/src/main/java/com/feige/api/base/Service.java create mode 100644 X-fim-api/src/main/java/com/feige/api/context/Application.java create mode 100644 X-fim-api/src/main/java/com/feige/api/context/Disposable.java create mode 100644 X-fim-api/src/main/java/com/feige/api/context/Lifecycle.java create mode 100644 X-fim-api/src/main/java/com/feige/api/context/LifecycleAdapter.java rename X-fim-api/src/main/java/com/feige/api/{base => context}/Listener.java (77%) create mode 100644 X-fim-api/src/main/java/com/feige/api/sc/AbstractClient.java create mode 100644 X-fim-api/src/main/java/com/feige/api/sc/AbstractEndpoint.java create mode 100644 X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java create mode 100644 X-fim-api/src/main/java/com/feige/api/sc/Client.java create mode 100644 X-fim-api/src/main/java/com/feige/api/sc/Endpoint.java delete mode 100644 X-fim-api/src/main/java/com/feige/api/sc/IClient.java delete mode 100644 X-fim-api/src/main/java/com/feige/api/sc/IServer.java create mode 100644 X-fim-api/src/main/java/com/feige/api/sc/Server.java diff --git a/X-fim-api/src/main/java/com/feige/api/base/Service.java b/X-fim-api/src/main/java/com/feige/api/base/Service.java deleted file mode 100644 index 4dcf717..0000000 --- a/X-fim-api/src/main/java/com/feige/api/base/Service.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.feige.api.base; - - -import java.util.Map; - -public interface Service { - - /** - * initialization - * @param args arguments - */ - void init(Map args); - - /** - * start service - */ - void start() throws Exception; - - /** - * stop service - */ - void stop(); - - /** - * Running state - * @return Whether to run - */ - boolean isRunning(); - -} diff --git a/X-fim-api/src/main/java/com/feige/api/context/Application.java b/X-fim-api/src/main/java/com/feige/api/context/Application.java new file mode 100644 index 0000000..01beef7 --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/context/Application.java @@ -0,0 +1,13 @@ +package com.feige.api.context; + + +public interface Application extends Lifecycle { + + + /** + * Running state + * @return Whether to run + */ + boolean isRunning(); + +} diff --git a/X-fim-api/src/main/java/com/feige/api/context/Disposable.java b/X-fim-api/src/main/java/com/feige/api/context/Disposable.java new file mode 100644 index 0000000..845aa01 --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/context/Disposable.java @@ -0,0 +1,10 @@ +package com.feige.api.context; + +/** + * An interface for destroying resources + */ +public interface Disposable { + + void destroy(); + +} \ No newline at end of file diff --git a/X-fim-api/src/main/java/com/feige/api/context/Lifecycle.java b/X-fim-api/src/main/java/com/feige/api/context/Lifecycle.java new file mode 100644 index 0000000..41ff641 --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/context/Lifecycle.java @@ -0,0 +1,27 @@ +package com.feige.api.context; + +public interface Lifecycle extends Disposable { + + /** + * Initialize the component before {@link #start() start} + * + * @return current {@link Lifecycle} + * @throws IllegalStateException + */ + void initialize() throws IllegalStateException; + + /** + * Start the component + * + * @return current {@link Lifecycle} + * @throws IllegalStateException + */ + void start() throws IllegalStateException; + + /** + * Destroy the component + * + * @throws IllegalStateException + */ + void destroy() throws IllegalStateException; +} \ No newline at end of file diff --git a/X-fim-api/src/main/java/com/feige/api/context/LifecycleAdapter.java b/X-fim-api/src/main/java/com/feige/api/context/LifecycleAdapter.java new file mode 100644 index 0000000..8b831bf --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/context/LifecycleAdapter.java @@ -0,0 +1,19 @@ +package com.feige.api.context; + +public abstract class LifecycleAdapter implements Lifecycle { + + @Override + public void initialize() throws IllegalStateException { + + } + + @Override + public void start() throws IllegalStateException { + + } + + @Override + public void destroy() throws IllegalStateException { + + } +} \ No newline at end of file diff --git a/X-fim-api/src/main/java/com/feige/api/base/Listener.java b/X-fim-api/src/main/java/com/feige/api/context/Listener.java similarity index 77% rename from X-fim-api/src/main/java/com/feige/api/base/Listener.java rename to X-fim-api/src/main/java/com/feige/api/context/Listener.java index 35fa932..ab7a601 100644 --- a/X-fim-api/src/main/java/com/feige/api/base/Listener.java +++ b/X-fim-api/src/main/java/com/feige/api/context/Listener.java @@ -1,4 +1,4 @@ -package com.feige.api.base; +package com.feige.api.context; public interface Listener { diff --git a/X-fim-api/src/main/java/com/feige/api/sc/AbstractClient.java b/X-fim-api/src/main/java/com/feige/api/sc/AbstractClient.java new file mode 100644 index 0000000..d8d65a4 --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/sc/AbstractClient.java @@ -0,0 +1,10 @@ +package com.feige.api.sc; + +/** + * @author feige
+ * @ClassName: AbstractClient
+ * @Description:
+ * @date: 2023/5/13 14:33
+ */ +public abstract class AbstractClient implements Client { +} diff --git a/X-fim-api/src/main/java/com/feige/api/sc/AbstractEndpoint.java b/X-fim-api/src/main/java/com/feige/api/sc/AbstractEndpoint.java new file mode 100644 index 0000000..014307b --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/sc/AbstractEndpoint.java @@ -0,0 +1,136 @@ +package com.feige.api.sc; + +import com.feige.api.handler.RemotingException; +import com.feige.api.handler.SessionHandler; +import com.feige.api.session.ISession; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * @author feige
+ * @ClassName: AbstractEndpoint
+ * @Description:
+ * @date: 2023/5/13 14:29
+ */ +public abstract class AbstractEndpoint implements Endpoint, SessionHandler { + private final SessionHandler handler; + + public enum State { + /** + * created + */ + CREATED, + /** + * initialize + */ + INITIALIZE, + /** + * starting + */ + STARTING, + /** + * started + */ + STARTED, + /** + * closing + */ + CLOSING, + /** + * closed + */ + CLOSED; + } + + protected final AtomicReference state = new AtomicReference<>(State.CREATED); + + + public AbstractEndpoint(SessionHandler handler) { + if (handler == null) { + throw new IllegalArgumentException("handler == null"); + } + this.handler = handler; + } + + + @Override + public void send(Object message) throws RemotingException { + send(message, false); + } + + + @Override + public void close() { + if (!state.compareAndSet(State.STARTED, State.CLOSED)){ + throw new IllegalStateException("The server has not started"); + } + + } + + @Override + public void close(int timeout) { + close(); + } + + @Override + public void startClose() { + if (isClosed()) { + return; + } + if (!state.compareAndSet(State.STARTED, State.CLOSING)) { + throw new IllegalStateException("The server has not started"); + } + } + + + + @Override + public SessionHandler getSessionHandler() { + return handler; + } + + + + @Override + public boolean isClosed() { + return State.CLOSED.equals(state.get()); + } + + public boolean isClosing() { + return State.CLOSING.equals(state.get()) && !isClosed(); + } + + @Override + public void connected(ISession session) throws RemotingException { + if (isClosed()) { + return; + } + handler.connected(session); + } + + @Override + public void disconnected(ISession session) throws RemotingException { + handler.disconnected(session); + } + + @Override + public void sent(ISession session, Object msg) throws RemotingException { + if (isClosed()) { + return; + } + handler.sent(session, msg); + } + + @Override + public void received(ISession session, Object msg) throws RemotingException { + if (isClosed()) { + return; + } + handler.received(session, msg); + } + + @Override + public void caught(ISession session, Throwable ex) throws RemotingException { + handler.caught(session, ex); + } +} diff --git a/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java b/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java new file mode 100644 index 0000000..b59e1f6 --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java @@ -0,0 +1,25 @@ +package com.feige.api.sc; + +import com.feige.api.handler.SessionHandler; +import com.feige.api.session.SessionRepository; + +/** + * @author feige
+ * @ClassName: AbstractServer
+ * @Description:
+ * @date: 2023/5/13 14:33
+ */ +public abstract class AbstractServer extends AbstractEndpoint implements Server{ + + private final SessionRepository sessionRepository; + public AbstractServer(SessionHandler handler) { + super(handler); + this.sessionRepository = null; + } + + protected abstract void doOpen() throws Throwable; + + protected abstract void doClose() throws Throwable; + + protected abstract int getSessionsSize(); +} diff --git a/X-fim-api/src/main/java/com/feige/api/sc/Client.java b/X-fim-api/src/main/java/com/feige/api/sc/Client.java new file mode 100644 index 0000000..1f98c07 --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/sc/Client.java @@ -0,0 +1,10 @@ +package com.feige.api.sc; + +import com.feige.api.handler.RemotingException; + +public interface Client extends Endpoint { + + + + void reconnect() throws RemotingException; +} diff --git a/X-fim-api/src/main/java/com/feige/api/sc/Endpoint.java b/X-fim-api/src/main/java/com/feige/api/sc/Endpoint.java new file mode 100644 index 0000000..bbf8dbe --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/sc/Endpoint.java @@ -0,0 +1,67 @@ +package com.feige.api.sc; + +import com.feige.api.handler.RemotingException; +import com.feige.api.handler.SessionHandler; + +import java.net.InetSocketAddress; + +/** + * @author feige
+ * @ClassName: Endpoint
+ * @Description:
+ * @date: 2023/5/13 14:19
+ */ +public interface Endpoint { + + + /** + * get session handler. + * + * @return session handler + */ + SessionHandler getSessionHandler(); + + /** + * get local address. + * + * @return local address. + */ + InetSocketAddress getLocalAddress(); + + /** + * send message. + * + * @param message + * @throws RemotingException + */ + void send(Object message) throws RemotingException; + + /** + * send message. + * + * @param message message + * @param sent already sent to socket? + */ + void send(Object message, boolean sent) throws RemotingException; + + /** + * close the channel. + */ + void close(); + + /** + * Graceful close the channel. + */ + void close(int timeout); + + void startClose(); + + /** + * is closed. + * + * @return closed + */ + boolean isClosed(); + + boolean isRunning(); +} diff --git a/X-fim-api/src/main/java/com/feige/api/sc/IClient.java b/X-fim-api/src/main/java/com/feige/api/sc/IClient.java deleted file mode 100644 index 89817cb..0000000 --- a/X-fim-api/src/main/java/com/feige/api/sc/IClient.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.feige.api.sc; - -import com.feige.api.base.Service; - -public interface IClient extends Service { - - void connect(); -} diff --git a/X-fim-api/src/main/java/com/feige/api/sc/IServer.java b/X-fim-api/src/main/java/com/feige/api/sc/IServer.java deleted file mode 100644 index 3a1d1a9..0000000 --- a/X-fim-api/src/main/java/com/feige/api/sc/IServer.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.feige.api.sc; - -import com.feige.api.base.Service; -import com.feige.api.handler.SessionHandler; - - -public interface IServer extends Service { - - /** - *bind - * @param sessionHandler - */ - void bind(SessionHandler sessionHandler); - -} diff --git a/X-fim-api/src/main/java/com/feige/api/sc/Server.java b/X-fim-api/src/main/java/com/feige/api/sc/Server.java new file mode 100644 index 0000000..a59cdcd --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/sc/Server.java @@ -0,0 +1,21 @@ +package com.feige.api.sc; + +import java.net.InetSocketAddress; + + +public interface Server extends Endpoint { + + + /** + * is bound. + * + * @return bound + */ + boolean isBound(); + /** + *bind + * @param bindAddress + */ + void bind(InetSocketAddress bindAddress); + +} diff --git a/X-fim-api/src/main/java/com/feige/api/session/RemoteSession.java b/X-fim-api/src/main/java/com/feige/api/session/RemoteSession.java index cc6572c..ac40b50 100644 --- a/X-fim-api/src/main/java/com/feige/api/session/RemoteSession.java +++ b/X-fim-api/src/main/java/com/feige/api/session/RemoteSession.java @@ -1,6 +1,6 @@ package com.feige.api.session; -import com.feige.api.sc.IClient; +import com.feige.api.sc.Client; -public interface RemoteSession extends IClient, ISession { +public interface RemoteSession extends Client, ISession { } diff --git a/X-fim-api/src/main/java/com/feige/api/srd/IServiceRegistryAndDiscovery.java b/X-fim-api/src/main/java/com/feige/api/srd/IServiceRegistryAndDiscovery.java index 0536dd9..12bcece 100644 --- a/X-fim-api/src/main/java/com/feige/api/srd/IServiceRegistryAndDiscovery.java +++ b/X-fim-api/src/main/java/com/feige/api/srd/IServiceRegistryAndDiscovery.java @@ -1,6 +1,6 @@ package com.feige.api.srd; -import com.feige.api.base.Listener; +import com.feige.api.context.Listener; import com.feige.api.spi.Spi; import java.util.List; diff --git a/X-fim-common/pom.xml b/X-fim-common/pom.xml index 89c0d8f..9c3d078 100644 --- a/X-fim-common/pom.xml +++ b/X-fim-common/pom.xml @@ -15,7 +15,6 @@ com.feige X-fim-api - ${xiaofei.im.version} org.yaml diff --git a/X-fim-netty/pom.xml b/X-fim-netty/pom.xml index 3e6d015..275a2db 100644 --- a/X-fim-netty/pom.xml +++ b/X-fim-netty/pom.xml @@ -19,7 +19,6 @@ com.feige X-fim-common - ${xiaofei.im.version} diff --git a/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java b/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java index f86a3de..c87a7be 100644 --- a/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java +++ b/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java @@ -1,23 +1,22 @@ package com.feige.fim.server.tcp; -import com.feige.fim.config.Configs; import com.feige.api.constant.Const; +import com.feige.api.handler.RemotingException; import com.feige.api.handler.SessionHandler; -import com.feige.api.sc.IServer; +import com.feige.api.sc.AbstractServer; +import com.feige.fim.config.Configs; import com.feige.fim.factory.NettyEventLoopFactory; -import com.feige.fim.spi.SpiLoader; -import org.slf4j.Logger; import com.feige.fim.lg.Loggers; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import org.slf4j.Logger; import java.net.InetSocketAddress; -import java.util.Map; -public class NettyTcpServer implements IServer { +public class NettyTcpServer extends AbstractServer { public static final Logger LOG = Loggers.SERVER; @@ -28,18 +27,19 @@ public class NettyTcpServer implements IServer { private InetSocketAddress bindAddress; private boolean isRunning = false; - @Override - public void init(Map args) { + public NettyTcpServer(SessionHandler handler) { + super(handler); + } + + + public void initialize() throws IllegalStateException { this.tcpBossGroup = NettyEventLoopFactory.createEventLoopGroup(Const.DEFAULT_IO_THREADS, "fim-tcp-server-boss-"); this.tcpWorkGroup = NettyEventLoopFactory.createEventLoopGroup(Const.DEFAULT_IO_THREADS, "fim-tcp-server-work-"); this.serverBootstrap = new ServerBootstrap(); this.bindAddress = new InetSocketAddress(Configs.getString(Configs.ConfigKey.SERVER_TCP_IP_KEY), Configs.getInt(Configs.ConfigKey.SERVER_TCP_PORT_KEY)); } - @Override public void start() { - SessionHandler sessionHandler = SpiLoader.getInstance().getSpiByConfigOrPrimary(SessionHandler.class); - this.bind(sessionHandler); this.channel.newSucceededFuture().addListener(future -> { if (future.isSuccess()) { this.isRunning = true; @@ -51,7 +51,10 @@ public class NettyTcpServer implements IServer { this.channel.closeFuture().addListener(future -> this.stop()); } - @Override + public void destroy() throws IllegalStateException { + + } + public void stop() { if (tcpBossGroup != null && !(tcpBossGroup.isShuttingDown() && tcpBossGroup.isShutdown() && tcpBossGroup.isTerminated())){ tcpBossGroup.shutdownGracefully(); @@ -63,26 +66,55 @@ public class NettyTcpServer implements IServer { } @Override - public boolean isRunning() { + public boolean isBound() { return isRunning; } @Override - public void bind(SessionHandler sessionHandler) { - initServerBootstrap(sessionHandler); + public void bind(InetSocketAddress bindAddress) { + initServerBootstrap(); ChannelFuture channelFuture = this.serverBootstrap - .bind(this.bindAddress) + .bind(bindAddress) .syncUninterruptibly(); - this.channel = channelFuture.channel(); } - public void initServerBootstrap(SessionHandler sessionHandler) { + public void initServerBootstrap() { this.serverBootstrap .group(tcpBossGroup, tcpWorkGroup) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .channel(NettyEventLoopFactory.createServerSocketChannelClass()) - .childHandler(new TcpServerInitializer(sessionHandler)); + .childHandler(new TcpServerInitializer(getSessionHandler())); + } + + @Override + public InetSocketAddress getLocalAddress() { + return null; + } + + @Override + public void send(Object message, boolean sent) throws RemotingException { + + } + + @Override + public String getKey() { + return null; + } + + @Override + protected void doOpen() throws Throwable { + + } + + @Override + protected void doClose() throws Throwable { + + } + + @Override + protected int getSessionsSize() { + return 0; } } diff --git a/X-fim-netty/src/main/java/com/feige/fim/server/udp/NettyUdpServer.java b/X-fim-netty/src/main/java/com/feige/fim/server/udp/NettyUdpServer.java index afcc16e..816a30d 100644 --- a/X-fim-netty/src/main/java/com/feige/fim/server/udp/NettyUdpServer.java +++ b/X-fim-netty/src/main/java/com/feige/fim/server/udp/NettyUdpServer.java @@ -1,33 +1,5 @@ package com.feige.fim.server.udp; -import com.feige.api.handler.SessionHandler; -import com.feige.api.sc.IServer; - -import java.util.Map; - -public class NettyUdpServer implements IServer { - @Override - public void bind(SessionHandler sessionHandler) { - - } - - @Override - public void init(Map args) { - - } - - @Override - public void start() { - - } - - @Override - public void stop() { - - } - - @Override - public boolean isRunning() { - return false; - } +public class NettyUdpServer { + } diff --git a/X-fim-netty/src/main/java/com/feige/fim/server/ws/NettyWsServer.java b/X-fim-netty/src/main/java/com/feige/fim/server/ws/NettyWsServer.java index e0b851b..d7057b8 100644 --- a/X-fim-netty/src/main/java/com/feige/fim/server/ws/NettyWsServer.java +++ b/X-fim-netty/src/main/java/com/feige/fim/server/ws/NettyWsServer.java @@ -1,27 +1,5 @@ package com.feige.fim.server.ws; -import com.feige.api.base.Service; - -import java.util.Map; - -public class NettyWsServer implements Service { - @Override - public void init(Map args) { - - } - - @Override - public void start() { - - } - - @Override - public void stop() { - - } - - @Override - public boolean isRunning() { - return false; - } +public class NettyWsServer { + } -- Gitee From d8257ae022b875a22894c3532060983b58dab266 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E9=A3=9E?= <1835698775@qq.com> Date: Thu, 18 May 2023 23:28:47 +0800 Subject: [PATCH 02/11] =?UTF-8?q?server=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/feige/api/sc/AbstractClient.java | 2 +- .../com/feige/api/sc/AbstractEndpoint.java | 136 --------------- .../java/com/feige/api/sc/AbstractServer.java | 22 ++- .../main/java/com/feige/api/sc/Client.java | 2 +- .../main/java/com/feige/api/sc/Endpoint.java | 67 -------- .../java/com/feige/api/sc/FutureListener.java | 68 ++++++++ .../feige/api/{context => sc}/Listener.java | 2 +- .../main/java/com/feige/api/sc/Server.java | 24 ++- .../main/java/com/feige/api/sc/Service.java | 28 +++ .../java/com/feige/api/sc/ServiceAdapter.java | 161 ++++++++++++++++++ .../com/feige/api/sc/ServiceException.java | 16 ++ .../api/srd/IServiceRegistryAndDiscovery.java | 2 +- ...erver.java => NettyTcpAbstractServer.java} | 34 +++- 13 files changed, 333 insertions(+), 231 deletions(-) delete mode 100644 X-fim-api/src/main/java/com/feige/api/sc/AbstractEndpoint.java delete mode 100644 X-fim-api/src/main/java/com/feige/api/sc/Endpoint.java create mode 100644 X-fim-api/src/main/java/com/feige/api/sc/FutureListener.java rename X-fim-api/src/main/java/com/feige/api/{context => sc}/Listener.java (77%) create mode 100644 X-fim-api/src/main/java/com/feige/api/sc/Service.java create mode 100644 X-fim-api/src/main/java/com/feige/api/sc/ServiceAdapter.java create mode 100644 X-fim-api/src/main/java/com/feige/api/sc/ServiceException.java rename X-fim-netty/src/main/java/com/feige/fim/server/tcp/{NettyTcpServer.java => NettyTcpAbstractServer.java} (85%) diff --git a/X-fim-api/src/main/java/com/feige/api/sc/AbstractClient.java b/X-fim-api/src/main/java/com/feige/api/sc/AbstractClient.java index d8d65a4..aeeb26f 100644 --- a/X-fim-api/src/main/java/com/feige/api/sc/AbstractClient.java +++ b/X-fim-api/src/main/java/com/feige/api/sc/AbstractClient.java @@ -6,5 +6,5 @@ package com.feige.api.sc; * @Description:
* @date: 2023/5/13 14:33
*/ -public abstract class AbstractClient implements Client { +public abstract class AbstractClient extends ServiceAdapter implements Client { } diff --git a/X-fim-api/src/main/java/com/feige/api/sc/AbstractEndpoint.java b/X-fim-api/src/main/java/com/feige/api/sc/AbstractEndpoint.java deleted file mode 100644 index 014307b..0000000 --- a/X-fim-api/src/main/java/com/feige/api/sc/AbstractEndpoint.java +++ /dev/null @@ -1,136 +0,0 @@ -package com.feige.api.sc; - -import com.feige.api.handler.RemotingException; -import com.feige.api.handler.SessionHandler; -import com.feige.api.session.ISession; - -import java.util.concurrent.atomic.AtomicReference; - -/** - * @author feige
- * @ClassName: AbstractEndpoint
- * @Description:
- * @date: 2023/5/13 14:29
- */ -public abstract class AbstractEndpoint implements Endpoint, SessionHandler { - private final SessionHandler handler; - - public enum State { - /** - * created - */ - CREATED, - /** - * initialize - */ - INITIALIZE, - /** - * starting - */ - STARTING, - /** - * started - */ - STARTED, - /** - * closing - */ - CLOSING, - /** - * closed - */ - CLOSED; - } - - protected final AtomicReference state = new AtomicReference<>(State.CREATED); - - - public AbstractEndpoint(SessionHandler handler) { - if (handler == null) { - throw new IllegalArgumentException("handler == null"); - } - this.handler = handler; - } - - - @Override - public void send(Object message) throws RemotingException { - send(message, false); - } - - - @Override - public void close() { - if (!state.compareAndSet(State.STARTED, State.CLOSED)){ - throw new IllegalStateException("The server has not started"); - } - - } - - @Override - public void close(int timeout) { - close(); - } - - @Override - public void startClose() { - if (isClosed()) { - return; - } - if (!state.compareAndSet(State.STARTED, State.CLOSING)) { - throw new IllegalStateException("The server has not started"); - } - } - - - - @Override - public SessionHandler getSessionHandler() { - return handler; - } - - - - @Override - public boolean isClosed() { - return State.CLOSED.equals(state.get()); - } - - public boolean isClosing() { - return State.CLOSING.equals(state.get()) && !isClosed(); - } - - @Override - public void connected(ISession session) throws RemotingException { - if (isClosed()) { - return; - } - handler.connected(session); - } - - @Override - public void disconnected(ISession session) throws RemotingException { - handler.disconnected(session); - } - - @Override - public void sent(ISession session, Object msg) throws RemotingException { - if (isClosed()) { - return; - } - handler.sent(session, msg); - } - - @Override - public void received(ISession session, Object msg) throws RemotingException { - if (isClosed()) { - return; - } - handler.received(session, msg); - } - - @Override - public void caught(ISession session, Throwable ex) throws RemotingException { - handler.caught(session, ex); - } -} diff --git a/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java b/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java index b59e1f6..fed458d 100644 --- a/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java +++ b/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java @@ -1,7 +1,6 @@ package com.feige.api.sc; -import com.feige.api.handler.SessionHandler; -import com.feige.api.session.SessionRepository; +import java.util.concurrent.atomic.AtomicReference; /** * @author feige
@@ -9,17 +8,16 @@ import com.feige.api.session.SessionRepository; * @Description:
* @date: 2023/5/13 14:33
*/ -public abstract class AbstractServer extends AbstractEndpoint implements Server{ - - private final SessionRepository sessionRepository; - public AbstractServer(SessionHandler handler) { - super(handler); - this.sessionRepository = null; - } +public abstract class AbstractServer extends ServiceAdapter implements Server { - protected abstract void doOpen() throws Throwable; + public enum ServerState {Created, Initialized, Starting, Started, Shutdown} - protected abstract void doClose() throws Throwable; + protected final AtomicReference serverState = new AtomicReference<>(ServerState.Created); - protected abstract int getSessionsSize(); + + + @Override + public boolean isRunning() { + return ServerState.Started == serverState.get(); + } } diff --git a/X-fim-api/src/main/java/com/feige/api/sc/Client.java b/X-fim-api/src/main/java/com/feige/api/sc/Client.java index 1f98c07..1de5109 100644 --- a/X-fim-api/src/main/java/com/feige/api/sc/Client.java +++ b/X-fim-api/src/main/java/com/feige/api/sc/Client.java @@ -2,7 +2,7 @@ package com.feige.api.sc; import com.feige.api.handler.RemotingException; -public interface Client extends Endpoint { +public interface Client extends Service { diff --git a/X-fim-api/src/main/java/com/feige/api/sc/Endpoint.java b/X-fim-api/src/main/java/com/feige/api/sc/Endpoint.java deleted file mode 100644 index bbf8dbe..0000000 --- a/X-fim-api/src/main/java/com/feige/api/sc/Endpoint.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.feige.api.sc; - -import com.feige.api.handler.RemotingException; -import com.feige.api.handler.SessionHandler; - -import java.net.InetSocketAddress; - -/** - * @author feige
- * @ClassName: Endpoint
- * @Description:
- * @date: 2023/5/13 14:19
- */ -public interface Endpoint { - - - /** - * get session handler. - * - * @return session handler - */ - SessionHandler getSessionHandler(); - - /** - * get local address. - * - * @return local address. - */ - InetSocketAddress getLocalAddress(); - - /** - * send message. - * - * @param message - * @throws RemotingException - */ - void send(Object message) throws RemotingException; - - /** - * send message. - * - * @param message message - * @param sent already sent to socket? - */ - void send(Object message, boolean sent) throws RemotingException; - - /** - * close the channel. - */ - void close(); - - /** - * Graceful close the channel. - */ - void close(int timeout); - - void startClose(); - - /** - * is closed. - * - * @return closed - */ - boolean isClosed(); - - boolean isRunning(); -} diff --git a/X-fim-api/src/main/java/com/feige/api/sc/FutureListener.java b/X-fim-api/src/main/java/com/feige/api/sc/FutureListener.java new file mode 100644 index 0000000..6d911a7 --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/sc/FutureListener.java @@ -0,0 +1,68 @@ +package com.feige.api.sc; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author feige
+ * @ClassName: FutureListener
+ * @Description:
+ * @date: 2023/5/18 22:48
+ */ +public class FutureListener extends CompletableFuture implements Listener { + private final Listener listener; + private final AtomicBoolean started; + + public FutureListener(AtomicBoolean started) { + this.listener = null; + this.started = started; + } + public FutureListener(Listener listener, AtomicBoolean started) { + this.listener = listener; + this.started = started; + } + + @Override + public void onSuccess(Object... args) { + if (isDone()){ + return; + } + complete(this.started.get()); + if (this.listener != null){ + this.listener.onSuccess(args); + } + } + + /** + * 防止服务长时间卡在某个地方,增加超时监控 + * + * @param service 服务 + */ + public void monitor(ServiceAdapter service) { + if (isDone()) return;// 防止Listener被重复执行 + runAsync(() -> { + try { + this.get(service.timeoutMillis(), TimeUnit.MILLISECONDS); + } catch (Exception e) { + this.onFailure(new ServiceException(String.format("service %s monitor timeout", service.getClass().getSimpleName()))); + } + }); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + throw new UnsupportedOperationException(); + } + + @Override + public void onFailure(Throwable cause) { + if (isDone()){ + completeExceptionally(cause); + } + if (this.listener != null){ + this.listener.onFailure(cause); + } + throw cause instanceof ServiceException ? (ServiceException) cause : new ServiceException(cause); + } +} diff --git a/X-fim-api/src/main/java/com/feige/api/context/Listener.java b/X-fim-api/src/main/java/com/feige/api/sc/Listener.java similarity index 77% rename from X-fim-api/src/main/java/com/feige/api/context/Listener.java rename to X-fim-api/src/main/java/com/feige/api/sc/Listener.java index ab7a601..1fb2dfd 100644 --- a/X-fim-api/src/main/java/com/feige/api/context/Listener.java +++ b/X-fim-api/src/main/java/com/feige/api/sc/Listener.java @@ -1,4 +1,4 @@ -package com.feige.api.context; +package com.feige.api.sc; public interface Listener { diff --git a/X-fim-api/src/main/java/com/feige/api/sc/Server.java b/X-fim-api/src/main/java/com/feige/api/sc/Server.java index a59cdcd..6c62d53 100644 --- a/X-fim-api/src/main/java/com/feige/api/sc/Server.java +++ b/X-fim-api/src/main/java/com/feige/api/sc/Server.java @@ -1,21 +1,31 @@ package com.feige.api.sc; +import com.feige.api.handler.SessionHandler; + import java.net.InetSocketAddress; -public interface Server extends Endpoint { +public interface Server extends Service { + + /** + *bind + * @param bindAddress + */ + void bind(InetSocketAddress bindAddress); /** - * is bound. + * get session handler. * - * @return bound + * @return session handler */ - boolean isBound(); + SessionHandler getSessionHandler(); + /** - *bind - * @param bindAddress + * get local address. + * + * @return local address. */ - void bind(InetSocketAddress bindAddress); + InetSocketAddress getLocalAddress(); } diff --git a/X-fim-api/src/main/java/com/feige/api/sc/Service.java b/X-fim-api/src/main/java/com/feige/api/sc/Service.java new file mode 100644 index 0000000..36d723f --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/sc/Service.java @@ -0,0 +1,28 @@ +package com.feige.api.sc; + +import java.util.concurrent.CompletableFuture; + +/** + * @author feige
+ * @ClassName: Service
+ * @Description: from mpush
+ * @date: 2023/5/13 14:19
+ */ +public interface Service { + + void initialize(); + + void start(Listener listener); + + void stop(Listener listener); + + CompletableFuture start(); + + CompletableFuture stop(); + + boolean syncStart(); + + boolean syncStop(); + + boolean isRunning(); +} diff --git a/X-fim-api/src/main/java/com/feige/api/sc/ServiceAdapter.java b/X-fim-api/src/main/java/com/feige/api/sc/ServiceAdapter.java new file mode 100644 index 0000000..e300170 --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/sc/ServiceAdapter.java @@ -0,0 +1,161 @@ +package com.feige.api.sc; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author feige
+ * @ClassName: ServiceAdapter
+ * @Description: from mpush
+ * @date: 2023/5/13 14:29
+ */ +public abstract class ServiceAdapter implements Service { + + protected final AtomicBoolean started = new AtomicBoolean(); + @Override + public void initialize() { + + } + + @Override + public void start(Listener listener) { + this.tryStart(listener, this::doStart); + } + + @Override + public void stop(Listener listener) { + this.tryStop(listener, this::doStop); + } + + @Override + public CompletableFuture start() { + FutureListener futureListener = new FutureListener(started); + start(futureListener); + return futureListener; + } + + + + + @Override + public CompletableFuture stop() { + FutureListener futureListener = new FutureListener(started); + stop(futureListener); + return futureListener; + } + + + + @Override + public boolean syncStart() { + return start().join(); + } + + @Override + public boolean syncStop() { + return stop().join(); + } + + @Override + public boolean isRunning() { + return started.get(); + } + + + protected void doStart(Listener listener) { + listener.onSuccess(); + } + + + protected void doStop(Listener listener){ + listener.onSuccess(); + } + + protected void tryStart(Listener listener, FunctionEx functionEx){ + FutureListener futureListener = wrap(listener); + if (started.compareAndSet(false, true)){ + try { + initialize(); + functionEx.apply(futureListener); + // 主要用于异步,否则应该放置在function.apply(listener)之前 + futureListener.monitor(this); + }catch (Throwable throwable){ + listener.onFailure(throwable); + throw new ServiceException(throwable); + } + }else { + if (throwIfStarted()){ + futureListener.onFailure(new ServiceException("service already started.")); + }else { + listener.onSuccess(); + } + } + } + + protected void tryStop(Listener listener, FunctionEx functionEx){ + FutureListener futureListener = wrap(listener); + if (started.compareAndSet(true, false)) { + + try { + functionEx.apply(futureListener); + futureListener.monitor(this); + }catch (Throwable throwable){ + futureListener.onFailure(throwable); + throw new ServiceException(throwable); + } + }else { + if (throwIfStopped()){ + listener.onFailure(new ServiceException("service already stopped.")); + }else { + listener.onSuccess(); + } + } + } + /** + * 控制当服务已经启动后,重复调用start方法,是否抛出服务已经启动异常 + * 默认是true + * + * @return true:抛出异常 + */ + protected boolean throwIfStarted() { + return true; + } + + /** + * 控制当服务已经停止后,重复调用stop方法,是否抛出服务已经停止异常 + * 默认是true + * + * @return true:抛出异常 + */ + protected boolean throwIfStopped() { + return true; + } + + + protected interface FunctionEx { + void apply(Listener l) throws Throwable; + } + + /** + * 服务启动停止,超时时间, 默认是10s + * + * @return 超时时间 + */ + protected int timeoutMillis() { + return 1000 * 10; + } + + + /** + * 防止Listener被重复执行 + * + * @param listener listener + * @return FutureListener + */ + public FutureListener wrap(Listener listener) { + if (listener instanceof FutureListener){ + return (FutureListener) listener; + } + return new FutureListener(listener, started); + } +} diff --git a/X-fim-api/src/main/java/com/feige/api/sc/ServiceException.java b/X-fim-api/src/main/java/com/feige/api/sc/ServiceException.java new file mode 100644 index 0000000..6bf9351 --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/sc/ServiceException.java @@ -0,0 +1,16 @@ +package com.feige.api.sc; + +public class ServiceException extends RuntimeException { + + public ServiceException(String message) { + super(message); + } + + public ServiceException(Throwable cause) { + super(cause); + } + + public ServiceException(String message, Throwable cause) { + super(message, cause); + } +} \ No newline at end of file diff --git a/X-fim-api/src/main/java/com/feige/api/srd/IServiceRegistryAndDiscovery.java b/X-fim-api/src/main/java/com/feige/api/srd/IServiceRegistryAndDiscovery.java index 12bcece..f65fba6 100644 --- a/X-fim-api/src/main/java/com/feige/api/srd/IServiceRegistryAndDiscovery.java +++ b/X-fim-api/src/main/java/com/feige/api/srd/IServiceRegistryAndDiscovery.java @@ -1,6 +1,6 @@ package com.feige.api.srd; -import com.feige.api.context.Listener; +import com.feige.api.sc.Listener; import com.feige.api.spi.Spi; import java.util.List; diff --git a/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java b/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpAbstractServer.java similarity index 85% rename from X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java rename to X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpAbstractServer.java index c87a7be..3cb0140 100644 --- a/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java +++ b/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpAbstractServer.java @@ -1,9 +1,10 @@ package com.feige.fim.server.tcp; import com.feige.api.constant.Const; +import com.feige.api.sc.AbstractServer; +import com.feige.api.sc.Listener; import com.feige.api.handler.RemotingException; import com.feige.api.handler.SessionHandler; -import com.feige.api.sc.AbstractServer; import com.feige.fim.config.Configs; import com.feige.fim.factory.NettyEventLoopFactory; import com.feige.fim.lg.Loggers; @@ -15,8 +16,9 @@ import io.netty.channel.EventLoopGroup; import org.slf4j.Logger; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; -public class NettyTcpServer extends AbstractServer { +public class NettyTcpAbstractServer extends AbstractServer { public static final Logger LOG = Loggers.SERVER; @@ -27,7 +29,7 @@ public class NettyTcpServer extends AbstractServer { private InetSocketAddress bindAddress; private boolean isRunning = false; - public NettyTcpServer(SessionHandler handler) { + public NettyTcpAbstractServer(SessionHandler handler) { super(handler); } @@ -39,7 +41,17 @@ public class NettyTcpServer extends AbstractServer { this.bindAddress = new InetSocketAddress(Configs.getString(Configs.ConfigKey.SERVER_TCP_IP_KEY), Configs.getInt(Configs.ConfigKey.SERVER_TCP_PORT_KEY)); } - public void start() { + @Override + public void start(Listener listener) { + + } + + @Override + public void stop(Listener listener) { + + } + + public CompletableFuture start() { this.channel.newSucceededFuture().addListener(future -> { if (future.isSuccess()) { this.isRunning = true; @@ -49,13 +61,14 @@ public class NettyTcpServer extends AbstractServer { } }); this.channel.closeFuture().addListener(future -> this.stop()); + return null; } public void destroy() throws IllegalStateException { } - public void stop() { + public CompletableFuture stop() { if (tcpBossGroup != null && !(tcpBossGroup.isShuttingDown() && tcpBossGroup.isShutdown() && tcpBossGroup.isTerminated())){ tcpBossGroup.shutdownGracefully(); } @@ -63,6 +76,12 @@ public class NettyTcpServer extends AbstractServer { tcpWorkGroup.shutdownGracefully(); } this.isRunning = false; + return null; + } + + @Override + public SessionHandler getSessionHandler() { + return null; } @Override @@ -93,6 +112,11 @@ public class NettyTcpServer extends AbstractServer { return null; } + @Override + public boolean isRunning() { + return false; + } + @Override public void send(Object message, boolean sent) throws RemotingException { -- Gitee From b7b8f5fd4160a5f70d5d5a5607c2e841f2d3add3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E9=A3=9E?= <1835698775@qq.com> Date: Fri, 19 May 2023 12:31:49 +0800 Subject: [PATCH 03/11] =?UTF-8?q?server=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/feige/api/sc/AbstractServer.java | 22 ++++ .../java/com/feige/fim/config/Configs.java | 5 + .../test/java/com/feige/fim/ConfigTest.java | 2 +- ...ctServer.java => AbstractNettyServer.java} | 120 ++++-------------- .../feige/fim/server/tcp/NettyTcpServer.java | 30 +++++ .../feige/fim/server/ws/NettyWsServer.java | 15 ++- 6 files changed, 94 insertions(+), 100 deletions(-) rename X-fim-netty/src/main/java/com/feige/fim/server/{tcp/NettyTcpAbstractServer.java => AbstractNettyServer.java} (33%) create mode 100644 X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java diff --git a/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java b/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java index fed458d..ae6acf5 100644 --- a/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java +++ b/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java @@ -1,5 +1,8 @@ package com.feige.api.sc; +import com.feige.api.handler.SessionHandler; + +import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicReference; /** @@ -14,10 +17,29 @@ public abstract class AbstractServer extends ServiceAdapter implements Server { protected final AtomicReference serverState = new AtomicReference<>(ServerState.Created); + protected SessionHandler sessionHandler; + + protected InetSocketAddress bindAddress; + + public AbstractServer(SessionHandler sessionHandler) { + this.sessionHandler = sessionHandler; + } + @Override public boolean isRunning() { return ServerState.Started == serverState.get(); } + + + @Override + public SessionHandler getSessionHandler() { + return sessionHandler; + } + + @Override + public InetSocketAddress getLocalAddress() { + return bindAddress; + } } diff --git a/X-fim-common/src/main/java/com/feige/fim/config/Configs.java b/X-fim-common/src/main/java/com/feige/fim/config/Configs.java index 42a6996..4406c24 100644 --- a/X-fim-common/src/main/java/com/feige/fim/config/Configs.java +++ b/X-fim-common/src/main/java/com/feige/fim/config/Configs.java @@ -4,6 +4,7 @@ package com.feige.fim.config; import com.feige.api.config.Config; import com.feige.api.config.ConfigFactory; import com.feige.fim.spi.SpiLoader; +import com.feige.fim.utils.StringUtil; import java.io.File; import java.util.List; @@ -118,6 +119,10 @@ public final class Configs { * @return string config */ public static String getString(String key){ + String value = System.getProperty(key); + if (StringUtil.isNotBlank(value)){ + return value; + } return getConfig().getString(key); } diff --git a/X-fim-common/src/test/java/com/feige/fim/ConfigTest.java b/X-fim-common/src/test/java/com/feige/fim/ConfigTest.java index 2704535..cc9fbec 100644 --- a/X-fim-common/src/test/java/com/feige/fim/ConfigTest.java +++ b/X-fim-common/src/test/java/com/feige/fim/ConfigTest.java @@ -16,7 +16,7 @@ public class ConfigTest { @Test public void yamlConfigTest() throws Exception { - System.setProperty(Configs.CONFIG_FILE_KEY, "E:\\project\\im\\X-fim-parent\\X-fim-common\\src\\test\\resources\\conf\\fim.yaml"); + System.setProperty(Configs.CONFIG_FILE_KEY, "E:\\project\\my\\X-fim-parent\\X-fim-common\\src\\test\\resources\\conf\\fim.yaml"); for (String spi : spiArr) { SpiLoader.getInstance().load(spi); } diff --git a/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpAbstractServer.java b/X-fim-netty/src/main/java/com/feige/fim/server/AbstractNettyServer.java similarity index 33% rename from X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpAbstractServer.java rename to X-fim-netty/src/main/java/com/feige/fim/server/AbstractNettyServer.java index 3cb0140..a33f1bf 100644 --- a/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpAbstractServer.java +++ b/X-fim-netty/src/main/java/com/feige/fim/server/AbstractNettyServer.java @@ -1,144 +1,70 @@ -package com.feige.fim.server.tcp; +package com.feige.fim.server; import com.feige.api.constant.Const; +import com.feige.api.handler.SessionHandler; import com.feige.api.sc.AbstractServer; import com.feige.api.sc.Listener; -import com.feige.api.handler.RemotingException; -import com.feige.api.handler.SessionHandler; import com.feige.fim.config.Configs; import com.feige.fim.factory.NettyEventLoopFactory; import com.feige.fim.lg.Loggers; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import org.slf4j.Logger; import java.net.InetSocketAddress; -import java.util.concurrent.CompletableFuture; -public class NettyTcpAbstractServer extends AbstractServer { +public abstract class AbstractNettyServer extends AbstractServer { public static final Logger LOG = Loggers.SERVER; + protected EventLoopGroup bossGroup; + protected EventLoopGroup workGroup; + protected ServerBootstrap serverBootstrap; + protected Channel channel; - private EventLoopGroup tcpBossGroup; - private EventLoopGroup tcpWorkGroup; - private ServerBootstrap serverBootstrap; - private Channel channel; - private InetSocketAddress bindAddress; - private boolean isRunning = false; - - public NettyTcpAbstractServer(SessionHandler handler) { - super(handler); - } - - - public void initialize() throws IllegalStateException { - this.tcpBossGroup = NettyEventLoopFactory.createEventLoopGroup(Const.DEFAULT_IO_THREADS, "fim-tcp-server-boss-"); - this.tcpWorkGroup = NettyEventLoopFactory.createEventLoopGroup(Const.DEFAULT_IO_THREADS, "fim-tcp-server-work-"); - this.serverBootstrap = new ServerBootstrap(); - this.bindAddress = new InetSocketAddress(Configs.getString(Configs.ConfigKey.SERVER_TCP_IP_KEY), Configs.getInt(Configs.ConfigKey.SERVER_TCP_PORT_KEY)); + public AbstractNettyServer(SessionHandler sessionHandler) { + super(sessionHandler); } @Override - public void start(Listener listener) { - + public void initialize() { + this.bossGroup = NettyEventLoopFactory.createEventLoopGroup(Const.DEFAULT_IO_THREADS, "fim-tcp-server-boss-"); + this.workGroup = NettyEventLoopFactory.createEventLoopGroup(Const.DEFAULT_IO_THREADS, "fim-tcp-server-work-"); + this.serverBootstrap = new ServerBootstrap(); } @Override - public void stop(Listener listener) { - - } - - public CompletableFuture start() { + protected void doStart(Listener listener) { this.channel.newSucceededFuture().addListener(future -> { if (future.isSuccess()) { - this.isRunning = true; LOG.info("netty tcp server in {} port start finish....", Configs.getInt(Configs.ConfigKey.SERVER_TCP_PORT_KEY)); }else { LOG.error("netty tcp server in {} port start fail....", Configs.getInt(Configs.ConfigKey.SERVER_TCP_PORT_KEY)); } }); this.channel.closeFuture().addListener(future -> this.stop()); - return null; } - public void destroy() throws IllegalStateException { - - } - - public CompletableFuture stop() { - if (tcpBossGroup != null && !(tcpBossGroup.isShuttingDown() && tcpBossGroup.isShutdown() && tcpBossGroup.isTerminated())){ - tcpBossGroup.shutdownGracefully(); + @Override + protected void doStop(Listener listener) { + if (bossGroup != null && !(bossGroup.isShuttingDown() && bossGroup.isShutdown() && bossGroup.isTerminated())){ + bossGroup.shutdownGracefully(); } - if (tcpWorkGroup != null && !(tcpWorkGroup.isShuttingDown() && tcpWorkGroup.isShutdown() && tcpWorkGroup.isTerminated())){ - tcpWorkGroup.shutdownGracefully(); + if (workGroup != null && !(workGroup.isShuttingDown() && workGroup.isShutdown() && workGroup.isTerminated())){ + workGroup.shutdownGracefully(); } - this.isRunning = false; - return null; - } - - @Override - public SessionHandler getSessionHandler() { - return null; - } - - @Override - public boolean isBound() { - return isRunning; } @Override public void bind(InetSocketAddress bindAddress) { + this.bindAddress = bindAddress; initServerBootstrap(); ChannelFuture channelFuture = this.serverBootstrap .bind(bindAddress) .syncUninterruptibly(); this.channel = channelFuture.channel(); } - - public void initServerBootstrap() { - this.serverBootstrap - .group(tcpBossGroup, tcpWorkGroup) - .childOption(ChannelOption.TCP_NODELAY, true) - .childOption(ChannelOption.SO_KEEPALIVE, true) - .channel(NettyEventLoopFactory.createServerSocketChannelClass()) - .childHandler(new TcpServerInitializer(getSessionHandler())); - } - - @Override - public InetSocketAddress getLocalAddress() { - return null; - } - - @Override - public boolean isRunning() { - return false; - } - - @Override - public void send(Object message, boolean sent) throws RemotingException { - - } - - @Override - public String getKey() { - return null; - } - - @Override - protected void doOpen() throws Throwable { - - } - - @Override - protected void doClose() throws Throwable { - - } - - @Override - protected int getSessionsSize() { - return 0; - } + + protected abstract void initServerBootstrap(); } diff --git a/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java b/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java new file mode 100644 index 0000000..06f348c --- /dev/null +++ b/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java @@ -0,0 +1,30 @@ +package com.feige.fim.server.tcp; + +import com.feige.api.handler.SessionHandler; +import com.feige.fim.factory.NettyEventLoopFactory; +import com.feige.fim.lg.Loggers; +import com.feige.fim.server.AbstractNettyServer; +import io.netty.channel.ChannelOption; +import org.slf4j.Logger; + +public class NettyTcpServer extends AbstractNettyServer { + + public static final Logger LOG = Loggers.SERVER; + + + public NettyTcpServer(SessionHandler handler) { + super(handler); + } + + + @Override + public void initServerBootstrap() { + this.serverBootstrap + .group(bossGroup, workGroup) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .channel(NettyEventLoopFactory.createServerSocketChannelClass()) + .childHandler(new TcpServerInitializer(getSessionHandler())); + } + +} diff --git a/X-fim-netty/src/main/java/com/feige/fim/server/ws/NettyWsServer.java b/X-fim-netty/src/main/java/com/feige/fim/server/ws/NettyWsServer.java index d7057b8..c9929e7 100644 --- a/X-fim-netty/src/main/java/com/feige/fim/server/ws/NettyWsServer.java +++ b/X-fim-netty/src/main/java/com/feige/fim/server/ws/NettyWsServer.java @@ -1,5 +1,16 @@ package com.feige.fim.server.ws; -public class NettyWsServer { - +import com.feige.api.handler.SessionHandler; +import com.feige.fim.server.AbstractNettyServer; + +public class NettyWsServer extends AbstractNettyServer { + + public NettyWsServer(SessionHandler sessionHandler) { + super(sessionHandler); + } + + @Override + protected void initServerBootstrap() { + + } } -- Gitee From 196e94df631dda707a441dd9c65908921b53833a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E9=A3=9E?= <1835698775@qq.com> Date: Fri, 19 May 2023 15:24:27 +0800 Subject: [PATCH 04/11] =?UTF-8?q?server=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/feige/api/sc/AbstractServer.java | 13 +- .../main/java/com/feige/api/sc/Server.java | 14 +-- .../feige/fim/server/AbstractNettyServer.java | 115 ++++++++++++++---- .../feige/fim/server/tcp/NettyTcpServer.java | 7 +- .../feige/fim/server/ws/NettyWsServer.java | 21 +++- .../fim/server/ws/WsServerInitializer.java | 16 ++- 6 files changed, 143 insertions(+), 43 deletions(-) diff --git a/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java b/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java index ae6acf5..603cd30 100644 --- a/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java +++ b/X-fim-api/src/main/java/com/feige/api/sc/AbstractServer.java @@ -13,16 +13,19 @@ import java.util.concurrent.atomic.AtomicReference; */ public abstract class AbstractServer extends ServiceAdapter implements Server { + public enum ServerState {Created, Initialized, Starting, Started, Shutdown} protected final AtomicReference serverState = new AtomicReference<>(ServerState.Created); protected SessionHandler sessionHandler; - protected InetSocketAddress bindAddress; + protected InetSocketAddress address; + - public AbstractServer(SessionHandler sessionHandler) { + public AbstractServer(SessionHandler sessionHandler, InetSocketAddress address) { this.sessionHandler = sessionHandler; + this.address = address; } @@ -39,7 +42,9 @@ public abstract class AbstractServer extends ServiceAdapter implements Server { } @Override - public InetSocketAddress getLocalAddress() { - return bindAddress; + public InetSocketAddress getAddress() { + return address; } + + } diff --git a/X-fim-api/src/main/java/com/feige/api/sc/Server.java b/X-fim-api/src/main/java/com/feige/api/sc/Server.java index 6c62d53..ea47a69 100644 --- a/X-fim-api/src/main/java/com/feige/api/sc/Server.java +++ b/X-fim-api/src/main/java/com/feige/api/sc/Server.java @@ -7,13 +7,8 @@ import java.net.InetSocketAddress; public interface Server extends Service { - /** - *bind - * @param bindAddress - */ - void bind(InetSocketAddress bindAddress); - + /** * get session handler. * @@ -22,10 +17,11 @@ public interface Server extends Service { SessionHandler getSessionHandler(); /** - * get local address. + * get address. * - * @return local address. + * @return address. */ - InetSocketAddress getLocalAddress(); + InetSocketAddress getAddress(); + } diff --git a/X-fim-netty/src/main/java/com/feige/fim/server/AbstractNettyServer.java b/X-fim-netty/src/main/java/com/feige/fim/server/AbstractNettyServer.java index a33f1bf..90d65b1 100644 --- a/X-fim-netty/src/main/java/com/feige/fim/server/AbstractNettyServer.java +++ b/X-fim-netty/src/main/java/com/feige/fim/server/AbstractNettyServer.java @@ -4,17 +4,20 @@ import com.feige.api.constant.Const; import com.feige.api.handler.SessionHandler; import com.feige.api.sc.AbstractServer; import com.feige.api.sc.Listener; -import com.feige.fim.config.Configs; +import com.feige.api.sc.ServiceException; import com.feige.fim.factory.NettyEventLoopFactory; import com.feige.fim.lg.Loggers; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.Future; import org.slf4j.Logger; import java.net.InetSocketAddress; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + public abstract class AbstractNettyServer extends AbstractServer { public static final Logger LOG = Loggers.SERVER; @@ -23,48 +26,112 @@ public abstract class AbstractNettyServer extends AbstractServer { protected ServerBootstrap serverBootstrap; protected Channel channel; - public AbstractNettyServer(SessionHandler sessionHandler) { - super(sessionHandler); + public AbstractNettyServer(SessionHandler sessionHandler, InetSocketAddress address) { + super(sessionHandler, address); } @Override public void initialize() { - this.bossGroup = NettyEventLoopFactory.createEventLoopGroup(Const.DEFAULT_IO_THREADS, "fim-tcp-server-boss-"); - this.workGroup = NettyEventLoopFactory.createEventLoopGroup(Const.DEFAULT_IO_THREADS, "fim-tcp-server-work-"); + this.bossGroup = createBossGroup(); + this.workGroup = createWorkerGroup(); this.serverBootstrap = new ServerBootstrap(); } @Override protected void doStart(Listener listener) { - this.channel.newSucceededFuture().addListener(future -> { - if (future.isSuccess()) { - LOG.info("netty tcp server in {} port start finish....", Configs.getInt(Configs.ConfigKey.SERVER_TCP_PORT_KEY)); - }else { - LOG.error("netty tcp server in {} port start fail....", Configs.getInt(Configs.ConfigKey.SERVER_TCP_PORT_KEY)); + try { + if (!serverState.compareAndSet(ServerState.Initialized, ServerState.Starting)) { + throw new ServiceException("Server already started or have not init"); + } + initServerBootstrap(); + ChannelFuture channelFuture = this.serverBootstrap + .bind(address) + .addListener(future -> { + if (future.isSuccess()) { + serverState.set(ServerState.Started); + LOG.info("netty tcp server in {} port start finish....", getAddress().getPort()); + if (listener != null){ + listener.onSuccess(address); + } + }else { + LOG.error("server start failure on:{}", getAddress().getPort(), future.cause()); + if (listener != null) { + listener.onFailure(future.cause()); + } + } + }); + this.channel = channelFuture.channel(); + }catch (Throwable throwable){ + LOG.error("server start exception", throwable); + if (listener != null) { + listener.onFailure(throwable); } - }); - this.channel.closeFuture().addListener(future -> this.stop()); + throw new ServiceException("server start exception, port=" + getAddress().getPort(), throwable); + } } @Override protected void doStop(Listener listener) { - if (bossGroup != null && !(bossGroup.isShuttingDown() && bossGroup.isShutdown() && bossGroup.isTerminated())){ - bossGroup.shutdownGracefully(); + if (!serverState.compareAndSet(ServerState.Started, ServerState.Shutdown)) { + if (listener != null) { + listener.onFailure(new ServiceException("server was already shutdown.")); + } + LOG.error("{} was already shutdown.", this.getClass().getSimpleName()); + return; } - if (workGroup != null && !(workGroup.isShuttingDown() && workGroup.isShutdown() && workGroup.isTerminated())){ - workGroup.shutdownGracefully(); + LOG.info("try shutdown {}...", this.getClass().getSimpleName()); + try { + if (channel != null) { + // unbind. + channel.close(); + } + } catch (Throwable e) { + LOG.warn(e.getMessage(), e); + } + try { + if (this.serverBootstrap != null) { + long timeout = timeoutMillis(); + long quietPeriod = Math.min(2000L, timeout); + Future bossGroupShutdownFuture = bossGroup.shutdownGracefully(quietPeriod, timeout, MILLISECONDS); + Future workerGroupShutdownFuture = workGroup.shutdownGracefully(quietPeriod, timeout, MILLISECONDS); + bossGroupShutdownFuture.syncUninterruptibly(); + workerGroupShutdownFuture.syncUninterruptibly(); + } + } catch (Throwable e) { + LOG.warn(e.getMessage(), e); + } + LOG.info("{} shutdown success.", this.getClass().getSimpleName()); + if (listener != null) { + listener.onSuccess(address); } } - @Override - public void bind(InetSocketAddress bindAddress) { - this.bindAddress = bindAddress; - initServerBootstrap(); - ChannelFuture channelFuture = this.serverBootstrap - .bind(bindAddress) - .syncUninterruptibly(); - this.channel = channelFuture.channel(); + protected EventLoopGroup createBossGroup() { + return NettyEventLoopFactory.createEventLoopGroup(getBossGroupThreadNum(), getBossGroupThreadName()); + } + + protected EventLoopGroup createWorkerGroup() { + return NettyEventLoopFactory.createEventLoopGroup(getWorkerGroupThreadNum(), getWorkerGroupThreadName()); + } + + protected String getBossGroupThreadName(){ + return "fim-server-boss-"; + } + + protected String getWorkerGroupThreadName(){ + return "fim-server-work-"; + } + + protected int getBossGroupThreadNum(){ + return Const.DEFAULT_IO_THREADS; + } + + protected int getWorkerGroupThreadNum(){ + return Const.DEFAULT_IO_THREADS; } protected abstract void initServerBootstrap(); + + + } diff --git a/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java b/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java index 06f348c..606015e 100644 --- a/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java +++ b/X-fim-netty/src/main/java/com/feige/fim/server/tcp/NettyTcpServer.java @@ -4,16 +4,19 @@ import com.feige.api.handler.SessionHandler; import com.feige.fim.factory.NettyEventLoopFactory; import com.feige.fim.lg.Loggers; import com.feige.fim.server.AbstractNettyServer; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import org.slf4j.Logger; +import java.net.InetSocketAddress; + public class NettyTcpServer extends AbstractNettyServer { public static final Logger LOG = Loggers.SERVER; - public NettyTcpServer(SessionHandler handler) { - super(handler); + public NettyTcpServer(SessionHandler handler, InetSocketAddress address) { + super(handler, address); } diff --git a/X-fim-netty/src/main/java/com/feige/fim/server/ws/NettyWsServer.java b/X-fim-netty/src/main/java/com/feige/fim/server/ws/NettyWsServer.java index c9929e7..65f7138 100644 --- a/X-fim-netty/src/main/java/com/feige/fim/server/ws/NettyWsServer.java +++ b/X-fim-netty/src/main/java/com/feige/fim/server/ws/NettyWsServer.java @@ -1,16 +1,31 @@ package com.feige.fim.server.ws; import com.feige.api.handler.SessionHandler; +import com.feige.fim.factory.NettyEventLoopFactory; import com.feige.fim.server.AbstractNettyServer; +import com.feige.fim.server.tcp.TcpServerInitializer; +import io.netty.channel.ChannelOption; + +import java.net.InetSocketAddress; public class NettyWsServer extends AbstractNettyServer { - public NettyWsServer(SessionHandler sessionHandler) { - super(sessionHandler); + public NettyWsServer(SessionHandler sessionHandler, InetSocketAddress address) { + super(sessionHandler, address); } @Override protected void initServerBootstrap() { - + this.serverBootstrap + .group(bossGroup, workGroup) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .channel(NettyEventLoopFactory.createServerSocketChannelClass()) + .childHandler(new WsServerInitializer(getSessionHandler(), wsPath())); + } + + + protected String wsPath(){ + return "/ws"; } } diff --git a/X-fim-netty/src/main/java/com/feige/fim/server/ws/WsServerInitializer.java b/X-fim-netty/src/main/java/com/feige/fim/server/ws/WsServerInitializer.java index 6f804fc..afa02c7 100644 --- a/X-fim-netty/src/main/java/com/feige/fim/server/ws/WsServerInitializer.java +++ b/X-fim-netty/src/main/java/com/feige/fim/server/ws/WsServerInitializer.java @@ -1,4 +1,18 @@ package com.feige.fim.server.ws; -public class WsServerInitializer { +import com.feige.api.handler.SessionHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; + + +public class WsServerInitializer extends ChannelInitializer { + + public WsServerInitializer(SessionHandler sessionHandler, String wsPath) { + + } + + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + + } } -- Gitee From 573018891ddba492bbefeda361ff52ee1bcadeb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E9=A3=9E?= <1835698775@qq.com> Date: Fri, 19 May 2023 16:36:42 +0800 Subject: [PATCH 05/11] =?UTF-8?q?server=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/com/feige/api/sc/FutureListener.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/X-fim-api/src/main/java/com/feige/api/sc/FutureListener.java b/X-fim-api/src/main/java/com/feige/api/sc/FutureListener.java index 6d911a7..0018461 100644 --- a/X-fim-api/src/main/java/com/feige/api/sc/FutureListener.java +++ b/X-fim-api/src/main/java/com/feige/api/sc/FutureListener.java @@ -5,9 +5,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** - * @author feige
+ * @author mpush
* @ClassName: FutureListener
- * @Description:
+ * @Description: from mpush
* @date: 2023/5/18 22:48
*/ public class FutureListener extends CompletableFuture implements Listener { @@ -40,7 +40,9 @@ public class FutureListener extends CompletableFuture implements Listen * @param service 服务 */ public void monitor(ServiceAdapter service) { - if (isDone()) return;// 防止Listener被重复执行 + if (isDone()) { + return;// 防止Listener被重复执行 + } runAsync(() -> { try { this.get(service.timeoutMillis(), TimeUnit.MILLISECONDS); -- Gitee From 99ca2d32c64c12eb83ece99f9bcf6b134f191bf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E9=A3=9E?= <1835698775@qq.com> Date: Fri, 19 May 2023 18:08:34 +0800 Subject: [PATCH 06/11] =?UTF-8?q?=E5=BA=94=E7=94=A8=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/feige/api/context/Application.java | 5 ----- .../java/com/feige/api/context/Environment.java | 14 ++++++++++++++ 2 files changed, 14 insertions(+), 5 deletions(-) create mode 100644 X-fim-api/src/main/java/com/feige/api/context/Environment.java diff --git a/X-fim-api/src/main/java/com/feige/api/context/Application.java b/X-fim-api/src/main/java/com/feige/api/context/Application.java index 01beef7..c113e72 100644 --- a/X-fim-api/src/main/java/com/feige/api/context/Application.java +++ b/X-fim-api/src/main/java/com/feige/api/context/Application.java @@ -4,10 +4,5 @@ package com.feige.api.context; public interface Application extends Lifecycle { - /** - * Running state - * @return Whether to run - */ - boolean isRunning(); } diff --git a/X-fim-api/src/main/java/com/feige/api/context/Environment.java b/X-fim-api/src/main/java/com/feige/api/context/Environment.java new file mode 100644 index 0000000..833fbf0 --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/context/Environment.java @@ -0,0 +1,14 @@ +package com.feige.api.context; + +import com.feige.api.config.Config; + +public class Environment extends LifecycleAdapter implements Application { + + private Config config; + + @Override + public void initialize(){ + + } + +} -- Gitee From f2313e6f8a761128fd44ec3ce55f80fc44048752 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E9=A3=9E?= <1835698775@qq.com> Date: Sat, 20 May 2023 18:12:50 +0800 Subject: [PATCH 07/11] =?UTF-8?q?spi=E5=92=8Cconfig=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...dOnlyTheFirstOne.java => LoadOnlyOne.java} | 4 +- .../main/java/com/feige/api/codec/Codec.java | 4 +- .../java/com/feige/api/codec/ICheckSum.java | 4 +- .../java/com/feige/api/config/Config.java | 60 ++++++-- .../com/feige/api/config/ConfigFactory.java | 4 +- .../src/main/java/com/feige/api/spi/Spi.java | 2 +- .../java/com/feige/api/spi/SpiLoader.java | 59 ++++++++ .../feige/api}/spi/SpiNotFoundException.java | 2 +- .../com/feige/fim/codec/CheckSumUtils.java | 6 +- .../com/feige/fim/codec/DefaultCheckSum.java | 2 +- .../com/feige/fim/codec/TransportCodec.java | 2 +- .../java/com/feige/fim/config/Configs.java | 63 +++++---- .../fim/config/impl/CompositeConfig.java | 132 ++++++++++++++++++ .../com/feige/fim/config/impl/EnvConfig.java | 71 ++++++++++ .../feige/fim/config/impl/SystemConfig.java | 71 ++++++++++ .../{YamlConfigImpl.java => YamlConfig.java} | 60 +++----- .../fim/config/impl/YamlConfigFactory.java | 4 +- .../spi/{SpiLoader.java => JdkSpiLoader.java} | 115 ++++++++------- .../com/feige/fim/spi/SpiLoaderUtils.java | 76 ++++++++++ .../test/java/com/feige/fim/ConfigTest.java | 4 +- .../feige/fim/adapter/NettyCodecAdapter.java | 6 +- 21 files changed, 596 insertions(+), 155 deletions(-) rename X-fim-api/src/main/java/com/feige/api/annotation/{LoadOnlyTheFirstOne.java => LoadOnlyOne.java} (83%) create mode 100644 X-fim-api/src/main/java/com/feige/api/spi/SpiLoader.java rename {X-fim-common/src/main/java/com/feige/fim => X-fim-api/src/main/java/com/feige/api}/spi/SpiNotFoundException.java (96%) create mode 100644 X-fim-common/src/main/java/com/feige/fim/config/impl/CompositeConfig.java create mode 100644 X-fim-common/src/main/java/com/feige/fim/config/impl/EnvConfig.java create mode 100644 X-fim-common/src/main/java/com/feige/fim/config/impl/SystemConfig.java rename X-fim-common/src/main/java/com/feige/fim/config/impl/{YamlConfigImpl.java => YamlConfig.java} (57%) rename X-fim-common/src/main/java/com/feige/fim/spi/{SpiLoader.java => JdkSpiLoader.java} (42%) create mode 100644 X-fim-common/src/main/java/com/feige/fim/spi/SpiLoaderUtils.java diff --git a/X-fim-api/src/main/java/com/feige/api/annotation/LoadOnlyTheFirstOne.java b/X-fim-api/src/main/java/com/feige/api/annotation/LoadOnlyOne.java similarity index 83% rename from X-fim-api/src/main/java/com/feige/api/annotation/LoadOnlyTheFirstOne.java rename to X-fim-api/src/main/java/com/feige/api/annotation/LoadOnlyOne.java index a4406f2..a226bd6 100644 --- a/X-fim-api/src/main/java/com/feige/api/annotation/LoadOnlyTheFirstOne.java +++ b/X-fim-api/src/main/java/com/feige/api/annotation/LoadOnlyOne.java @@ -9,12 +9,12 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** - * Whether to load only the first one + * Whether to load only one */ @Retention(value = RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) @Inherited @Documented -public @interface LoadOnlyTheFirstOne { +public @interface LoadOnlyOne { } diff --git a/X-fim-api/src/main/java/com/feige/api/codec/Codec.java b/X-fim-api/src/main/java/com/feige/api/codec/Codec.java index d11759a..83aaa36 100644 --- a/X-fim-api/src/main/java/com/feige/api/codec/Codec.java +++ b/X-fim-api/src/main/java/com/feige/api/codec/Codec.java @@ -1,10 +1,10 @@ package com.feige.api.codec; -import com.feige.api.annotation.LoadOnlyTheFirstOne; +import com.feige.api.annotation.LoadOnlyOne; import com.feige.api.session.ISession; import com.feige.api.spi.Spi; -@LoadOnlyTheFirstOne +@LoadOnlyOne public interface Codec extends Spi { /** diff --git a/X-fim-api/src/main/java/com/feige/api/codec/ICheckSum.java b/X-fim-api/src/main/java/com/feige/api/codec/ICheckSum.java index 1def527..d0acca7 100644 --- a/X-fim-api/src/main/java/com/feige/api/codec/ICheckSum.java +++ b/X-fim-api/src/main/java/com/feige/api/codec/ICheckSum.java @@ -1,9 +1,9 @@ package com.feige.api.codec; -import com.feige.api.annotation.LoadOnlyTheFirstOne; +import com.feige.api.annotation.LoadOnlyOne; import com.feige.api.spi.Spi; -@LoadOnlyTheFirstOne +@LoadOnlyOne public interface ICheckSum extends Spi { /** * diff --git a/X-fim-api/src/main/java/com/feige/api/config/Config.java b/X-fim-api/src/main/java/com/feige/api/config/Config.java index d7ba8c7..bd5518f 100644 --- a/X-fim-api/src/main/java/com/feige/api/config/Config.java +++ b/X-fim-api/src/main/java/com/feige/api/config/Config.java @@ -1,7 +1,7 @@ package com.feige.api.config; import java.io.File; -import java.io.InputStream; +import java.nio.file.Files; import java.util.List; import java.util.Map; @@ -12,13 +12,17 @@ public interface Config { * @param file config file * @throws Exception */ - void parseFile(File file) throws Exception; + default void parseFile(File file) throws Exception { + parseConfig(Files.newInputStream(file.toPath())); + } /** - * parse config file - * @param is config file stream + * parse config object + * @param config config object * @throws Exception */ - void parseFile(InputStream is) throws Exception; + void parseConfig(Object config) throws Exception; + + /** * get int config @@ -33,7 +37,9 @@ public interface Config { * @param key key * @return int config */ - Integer getInt(String key); + default Integer getInt(String key){ + return getInt(key, null); + } /** @@ -49,7 +55,9 @@ public interface Config { * @param key key * @return long config */ - Long getLong(String key); + default Long getLong(String key){ + return getLong(key, null); + } /** * get double config @@ -64,7 +72,9 @@ public interface Config { * @param key key * @return double config */ - Double getDouble(String key); + default Double getDouble(String key) { + return getDouble(key, null); + } /** * get string config @@ -79,7 +89,9 @@ public interface Config { * @param key key * @return string config */ - String getString(String key); + default String getString(String key) { + return getString(key, null); + } /** * get boolean config @@ -94,7 +106,9 @@ public interface Config { * @param key key * @return boolean config */ - Boolean getBoolean(String key); + default Boolean getBoolean(String key) { + return getBoolean(key, null); + } /** * get map config @@ -116,4 +130,30 @@ public interface Config { * @return array config */ String[] getArr(String key); + + + /** + * get object + * @param key key + * @return object + */ + Object getObject(String key); + + /** + * get object + * @param key key + * @param type type + * @return object + * @param type + */ + default T getObject(String key, Class type) { + Object object = getObject(key); + return type.cast(object); + } + + /** + * 序号 + * @return 序号 + */ + int order(); } diff --git a/X-fim-api/src/main/java/com/feige/api/config/ConfigFactory.java b/X-fim-api/src/main/java/com/feige/api/config/ConfigFactory.java index 1f9e09a..069578f 100644 --- a/X-fim-api/src/main/java/com/feige/api/config/ConfigFactory.java +++ b/X-fim-api/src/main/java/com/feige/api/config/ConfigFactory.java @@ -1,9 +1,9 @@ package com.feige.api.config; -import com.feige.api.annotation.LoadOnlyTheFirstOne; +import com.feige.api.annotation.LoadOnlyOne; import com.feige.api.spi.Spi; -@LoadOnlyTheFirstOne +@LoadOnlyOne public interface ConfigFactory extends Spi { /** diff --git a/X-fim-api/src/main/java/com/feige/api/spi/Spi.java b/X-fim-api/src/main/java/com/feige/api/spi/Spi.java index 17d0717..331292b 100644 --- a/X-fim-api/src/main/java/com/feige/api/spi/Spi.java +++ b/X-fim-api/src/main/java/com/feige/api/spi/Spi.java @@ -11,7 +11,7 @@ public interface Spi { * Whether it is primary * @return is primary */ - default boolean isPrimary() { + default boolean primary() { return false; } diff --git a/X-fim-api/src/main/java/com/feige/api/spi/SpiLoader.java b/X-fim-api/src/main/java/com/feige/api/spi/SpiLoader.java new file mode 100644 index 0000000..f7f555b --- /dev/null +++ b/X-fim-api/src/main/java/com/feige/api/spi/SpiLoader.java @@ -0,0 +1,59 @@ +package com.feige.api.spi; + +import java.util.List; + +/** + * @author feige
+ * @ClassName: SpiLoader
+ * @Description:
+ * @date: 2023/5/20 14:22
+ */ +public interface SpiLoader { + + /** + * register objects + * @param clazz class + * @param objects object list + */ + void register(Class clazz, List objects); + + + /** + * get object by key + * @param key key + * @param clazz class + * @return Object + * @param class type + * @throws SpiNotFoundException + */ + T get(String key, Class clazz) throws SpiNotFoundException; + + + /** + * get object by config + * @param clazz class + * @param configNullReturnPrimary Whether to take the first object whose primary is true when configured null + * @return object + * @param class type + * @throws SpiNotFoundException + */ + T getByConfig(Class clazz, boolean configNullReturnPrimary) throws SpiNotFoundException; + + + /** + * get all object + * @param clazz class + * @return object list + * @param class type + * @throws SpiNotFoundException + */ + List getAll(Class clazz) throws SpiNotFoundException; + + + /** + * load class + * @param className class name + */ + void load(String className); + +} diff --git a/X-fim-common/src/main/java/com/feige/fim/spi/SpiNotFoundException.java b/X-fim-api/src/main/java/com/feige/api/spi/SpiNotFoundException.java similarity index 96% rename from X-fim-common/src/main/java/com/feige/fim/spi/SpiNotFoundException.java rename to X-fim-api/src/main/java/com/feige/api/spi/SpiNotFoundException.java index c1d9ebf..f4ca1ac 100644 --- a/X-fim-common/src/main/java/com/feige/fim/spi/SpiNotFoundException.java +++ b/X-fim-api/src/main/java/com/feige/api/spi/SpiNotFoundException.java @@ -1,4 +1,4 @@ -package com.feige.fim.spi; +package com.feige.api.spi; public class SpiNotFoundException extends RuntimeException { diff --git a/X-fim-common/src/main/java/com/feige/fim/codec/CheckSumUtils.java b/X-fim-common/src/main/java/com/feige/fim/codec/CheckSumUtils.java index 32a046f..6815016 100644 --- a/X-fim-common/src/main/java/com/feige/fim/codec/CheckSumUtils.java +++ b/X-fim-common/src/main/java/com/feige/fim/codec/CheckSumUtils.java @@ -2,7 +2,7 @@ package com.feige.fim.codec; import com.feige.api.codec.CheckSumException; import com.feige.api.codec.ICheckSum; -import com.feige.fim.spi.SpiLoader; +import com.feige.fim.spi.JdkSpiLoader; public class CheckSumUtils { @@ -15,7 +15,7 @@ public class CheckSumUtils { * @throws CheckSumException */ public static void check(byte[] data, short expectedCheckSum) throws CheckSumException { - ICheckSum iCheckSum = SpiLoader.getInstance().getSpiByConfigOrPrimary(ICheckSum.class); + ICheckSum iCheckSum = JdkSpiLoader.getInstance().getSpiByConfigOrPrimary(ICheckSum.class); iCheckSum.check(data, expectedCheckSum); } @@ -26,7 +26,7 @@ public class CheckSumUtils { * @return check sum */ public static short calculate(byte[] body) { - ICheckSum iCheckSum = SpiLoader.getInstance().getSpiByConfigOrPrimary(ICheckSum.class); + ICheckSum iCheckSum = JdkSpiLoader.getInstance().getSpiByConfigOrPrimary(ICheckSum.class); return iCheckSum.getCheckSum(body); } } diff --git a/X-fim-common/src/main/java/com/feige/fim/codec/DefaultCheckSum.java b/X-fim-common/src/main/java/com/feige/fim/codec/DefaultCheckSum.java index 222ebc8..657b1a8 100644 --- a/X-fim-common/src/main/java/com/feige/fim/codec/DefaultCheckSum.java +++ b/X-fim-common/src/main/java/com/feige/fim/codec/DefaultCheckSum.java @@ -19,7 +19,7 @@ public class DefaultCheckSum implements ICheckSum { } @Override - public boolean isPrimary() { + public boolean primary() { return true; } diff --git a/X-fim-common/src/main/java/com/feige/fim/codec/TransportCodec.java b/X-fim-common/src/main/java/com/feige/fim/codec/TransportCodec.java index ac52977..d213c5b 100644 --- a/X-fim-common/src/main/java/com/feige/fim/codec/TransportCodec.java +++ b/X-fim-common/src/main/java/com/feige/fim/codec/TransportCodec.java @@ -109,7 +109,7 @@ public class TransportCodec implements Codec { } @Override - public boolean isPrimary() { + public boolean primary() { return true; } } diff --git a/X-fim-common/src/main/java/com/feige/fim/config/Configs.java b/X-fim-common/src/main/java/com/feige/fim/config/Configs.java index 4406c24..a9e52a2 100644 --- a/X-fim-common/src/main/java/com/feige/fim/config/Configs.java +++ b/X-fim-common/src/main/java/com/feige/fim/config/Configs.java @@ -3,8 +3,10 @@ package com.feige.fim.config; import com.feige.api.config.Config; import com.feige.api.config.ConfigFactory; -import com.feige.fim.spi.SpiLoader; -import com.feige.fim.utils.StringUtil; +import com.feige.fim.config.impl.CompositeConfig; +import com.feige.fim.config.impl.EnvConfig; +import com.feige.fim.config.impl.SystemConfig; +import com.feige.fim.spi.SpiLoaderUtils; import java.io.File; import java.util.List; @@ -33,18 +35,35 @@ public final class Configs { String SERVER_UDP_PORT_KEY = "fim.server.udp-port"; } - private static Config CONFIG; + private final static CompositeConfig COMPOSITE_CONFIG = new CompositeConfig(); + private final static Config SYSTEM_CONFIG = new SystemConfig(); + private final static Config ENV_CONFIG = new EnvConfig(); + private static Config APP_CONFIG = null; public static void loadConfig() throws Exception { - ConfigFactory configFactory = SpiLoader.getInstance().getSpiByConfigOrPrimary(ConfigFactory.class); - CONFIG = configFactory.create(); + COMPOSITE_CONFIG.addConfig(SYSTEM_CONFIG); + COMPOSITE_CONFIG.addConfig(ENV_CONFIG); + ConfigFactory configFactory = SpiLoaderUtils.getByConfig(ConfigFactory.class, true); + APP_CONFIG = configFactory.create(); + COMPOSITE_CONFIG.addConfig(APP_CONFIG); } - public static Config getConfig(){ - return CONFIG; + public static Config getCompositeConfig(){ + return COMPOSITE_CONFIG; } + public static Config getSystemConfig(){ + return SYSTEM_CONFIG; + } + + public static Config getAppConfig(){ + return APP_CONFIG; + } + + public static Config getEnvConfig(){ + return ENV_CONFIG; + } /** * get int config * @param key key @@ -52,7 +71,7 @@ public final class Configs { * @return int config */ public static Integer getInt(String key, Integer defaultValue){ - return getConfig().getInt(key, defaultValue); + return getCompositeConfig().getInt(key, defaultValue); } /** @@ -61,7 +80,7 @@ public final class Configs { * @return int config */ public static Integer getInt(String key){ - return getConfig().getInt(key); + return getCompositeConfig().getInt(key); } @@ -72,7 +91,7 @@ public final class Configs { * @return long config */ public static Long getLong(String key, Long defaultValue){ - return getConfig().getLong(key, defaultValue); + return getCompositeConfig().getLong(key, defaultValue); } /** @@ -81,7 +100,7 @@ public final class Configs { * @return long config */ public static Long getLong(String key){ - return getConfig().getLong(key); + return getCompositeConfig().getLong(key); } /** @@ -91,7 +110,7 @@ public final class Configs { * @return double config */ public static Double getDouble(String key, Double defaultValue){ - return getConfig().getDouble(key); + return getCompositeConfig().getDouble(key); } /** @@ -100,7 +119,7 @@ public final class Configs { * @return double config */ public static Double getDouble(String key){ - return getConfig().getDouble(key); + return getCompositeConfig().getDouble(key); } /** @@ -110,7 +129,7 @@ public final class Configs { * @return string config */ public static String getString(String key, String defaultValue){ - return getConfig().getString(key, defaultValue); + return getCompositeConfig().getString(key, defaultValue); } /** @@ -119,11 +138,7 @@ public final class Configs { * @return string config */ public static String getString(String key){ - String value = System.getProperty(key); - if (StringUtil.isNotBlank(value)){ - return value; - } - return getConfig().getString(key); + return getCompositeConfig().getString(key); } /** @@ -133,7 +148,7 @@ public final class Configs { * @return boolean config */ public static Boolean getBoolean(String key, Boolean defaultValue){ - return getConfig().getBoolean(key, defaultValue); + return getCompositeConfig().getBoolean(key, defaultValue); } /** @@ -142,7 +157,7 @@ public final class Configs { * @return boolean config */ public static Boolean getBoolean(String key){ - return getConfig().getBoolean(key); + return getCompositeConfig().getBoolean(key); } /** @@ -151,7 +166,7 @@ public final class Configs { * @return map config */ public static Map getMap(String key){ - return getConfig().getMap(key); + return getCompositeConfig().getMap(key); } /** @@ -160,7 +175,7 @@ public final class Configs { * @return list config */ public static List getList(String key){ - return getConfig().getList(key); + return getCompositeConfig().getList(key); } /** @@ -169,7 +184,7 @@ public final class Configs { * @return array config */ public static String[] getArr(String key){ - return getConfig().getArr(key); + return getCompositeConfig().getArr(key); } } diff --git a/X-fim-common/src/main/java/com/feige/fim/config/impl/CompositeConfig.java b/X-fim-common/src/main/java/com/feige/fim/config/impl/CompositeConfig.java new file mode 100644 index 0000000..dcfd169 --- /dev/null +++ b/X-fim-common/src/main/java/com/feige/fim/config/impl/CompositeConfig.java @@ -0,0 +1,132 @@ +package com.feige.fim.config.impl; + +import com.feige.api.config.Config; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +/** + * @author feige
+ * @ClassName: CompositeConfig
+ * @Description:
+ * @date: 2023/5/20 15:23
+ */ +public class CompositeConfig implements Config { + + private final List configList = new ArrayList<>(); + @Override + public void parseConfig(Object config) throws Exception { + + } + + @Override + public Integer getInt(String key, Integer defaultValue) { + for (Config config : configList) { + Integer value = config.getInt(key, defaultValue); + if (value != null){ + return value; + } + } + return defaultValue; + } + + @Override + public Long getLong(String key, Long defaultValue) { + for (Config config : configList) { + Long value = config.getLong(key, defaultValue); + if (value != null){ + return value; + } + } + return defaultValue; + } + + @Override + public Double getDouble(String key, Double defaultValue) { + for (Config config : configList) { + Double value = config.getDouble(key, defaultValue); + if (value != null){ + return value; + } + } + return defaultValue; + } + + @Override + public String getString(String key, String defaultValue) { + for (Config config : configList) { + String value = config.getString(key, defaultValue); + if (value != null){ + return value; + } + } + return defaultValue; + } + + @Override + public Boolean getBoolean(String key, Boolean defaultValue) { + for (Config config : configList) { + Boolean value = config.getBoolean(key, defaultValue); + if (value != null){ + return value; + } + } + return defaultValue; + } + + @Override + public Map getMap(String key) { + for (Config config : configList) { + Map value = config.getMap(key); + if (value != null){ + return value; + } + } + return null; + } + + @Override + public List getList(String key) { + for (Config config : configList) { + List value = config.getList(key); + if (value != null){ + return value; + } + } + return null; + } + + @Override + public String[] getArr(String key) { + for (Config config : configList) { + String[] value = config.getArr(key); + if (value != null){ + return value; + } + } + return null; + } + + @Override + public Object getObject(String key) { + for (Config config : configList) { + Object value = config.getObject(key); + if (value != null){ + return value; + } + } + return null; + } + + @Override + public int order() { + return 0; + } + + public void addConfig(Config config){ + configList.add(config); + configList.sort(Comparator.comparing(Config::order)); + } +} diff --git a/X-fim-common/src/main/java/com/feige/fim/config/impl/EnvConfig.java b/X-fim-common/src/main/java/com/feige/fim/config/impl/EnvConfig.java new file mode 100644 index 0000000..e01d39b --- /dev/null +++ b/X-fim-common/src/main/java/com/feige/fim/config/impl/EnvConfig.java @@ -0,0 +1,71 @@ +package com.feige.fim.config.impl; + +import com.feige.api.config.Config; + +import java.util.List; +import java.util.Map; + +/** + * @author feige
+ * @ClassName: EnvConfig
+ * @Description:
+ * @date: 2023/5/20 15:24
+ */ +public class EnvConfig implements Config { + + private final Map envMap = System.getenv(); + @Override + public void parseConfig(Object config) throws Exception { + + } + + @Override + public Integer getInt(String key, Integer defaultValue) { + return defaultValue; + } + + @Override + public Long getLong(String key, Long defaultValue) { + return defaultValue; + } + + @Override + public Double getDouble(String key, Double defaultValue) { + return defaultValue; + } + + @Override + public String getString(String key, String defaultValue) { + return System.getenv(key); + } + + @Override + public Boolean getBoolean(String key, Boolean defaultValue) { + return defaultValue; + } + + @Override + public Map getMap(String key) { + return null; + } + + @Override + public List getList(String key) { + return null; + } + + @Override + public String[] getArr(String key) { + return new String[0]; + } + + @Override + public Object getObject(String key) { + return null; + } + + @Override + public int order() { + return 0; + } +} diff --git a/X-fim-common/src/main/java/com/feige/fim/config/impl/SystemConfig.java b/X-fim-common/src/main/java/com/feige/fim/config/impl/SystemConfig.java new file mode 100644 index 0000000..06bdae7 --- /dev/null +++ b/X-fim-common/src/main/java/com/feige/fim/config/impl/SystemConfig.java @@ -0,0 +1,71 @@ +package com.feige.fim.config.impl; + +import com.feige.api.config.Config; + +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * @author feige
+ * @ClassName: SystemConfig
+ * @Description:
+ * @date: 2023/5/20 10:08
+ */ +public class SystemConfig implements Config { + private final Properties prop = System.getProperties(); + @Override + public void parseConfig(Object obj) throws Exception { + + } + + @Override + public Integer getInt(String key, Integer defaultValue) { + return (Integer) prop.getOrDefault(key, defaultValue); + } + + @Override + public Long getLong(String key, Long defaultValue) { + return (Long) prop.getOrDefault(key, defaultValue); + } + + @Override + public Double getDouble(String key, Double defaultValue) { + return (Double) prop.getOrDefault(key, defaultValue); + } + + @Override + public String getString(String key, String defaultValue) { + return System.getProperty(key, defaultValue); + } + + @Override + public Boolean getBoolean(String key, Boolean defaultValue) { + return (Boolean) prop.getOrDefault(key, defaultValue); + } + + @Override + public Map getMap(String key) { + return null; + } + + @Override + public List getList(String key) { + return null; + } + + @Override + public String[] getArr(String key) { + return null; + } + + @Override + public Object getObject(String key) { + return this.prop.get(key); + } + + @Override + public int order() { + return Integer.MIN_VALUE; + } +} diff --git a/X-fim-common/src/main/java/com/feige/fim/config/impl/YamlConfigImpl.java b/X-fim-common/src/main/java/com/feige/fim/config/impl/YamlConfig.java similarity index 57% rename from X-fim-common/src/main/java/com/feige/fim/config/impl/YamlConfigImpl.java rename to X-fim-common/src/main/java/com/feige/fim/config/impl/YamlConfig.java index 8e53a04..5cd452c 100644 --- a/X-fim-common/src/main/java/com/feige/fim/config/impl/YamlConfigImpl.java +++ b/X-fim-common/src/main/java/com/feige/fim/config/impl/YamlConfig.java @@ -1,79 +1,51 @@ package com.feige.fim.config.impl; -import com.feige.fim.utils.YamlUtils; import com.feige.api.config.Config; +import com.feige.fim.utils.YamlUtils; import org.apache.commons.collections4.MapUtils; -import java.io.File; -import java.io.FileInputStream; import java.io.InputStream; import java.util.List; import java.util.Map; -public class YamlConfigImpl implements Config { +public class YamlConfig implements Config { private Map config; - @Override - public void parseFile(File file) throws Exception { - this.config = YamlUtils.parser(new FileInputStream(file)); - } - - @Override - public void parseFile(InputStream is) throws Exception { - this.config = YamlUtils.parser(is); + public void parseConfig(Object obj) throws Exception { + if (obj instanceof InputStream){ + InputStream is = (InputStream) obj; + this.config = YamlUtils.parser(is); + } } @Override public Integer getInt(String key, Integer defaultValue) { return MapUtils.getInteger(this.config, key, defaultValue); } - - @Override - public Integer getInt(String key) { - return MapUtils.getInteger(this.config, key); - } + @Override public Long getLong(String key, Long defaultValue) { return MapUtils.getLong(this.config, key, defaultValue); } - - @Override - public Long getLong(String key) { - return MapUtils.getLong(this.config, key); - } + @Override public Double getDouble(String key, Double defaultValue) { return MapUtils.getDouble(this.config, key, defaultValue); } - @Override - public Double getDouble(String key) { - return MapUtils.getDouble(this.config, key); - } - @Override public String getString(String key, String defaultValue) { return MapUtils.getString(this.config, key, defaultValue); } - @Override - public String getString(String key) { - return MapUtils.getString(this.config, key); - } - @Override public Boolean getBoolean(String key, Boolean defaultValue) { return MapUtils.getBoolean(this.config, key, defaultValue); } - @Override - public Boolean getBoolean(String key) { - return MapUtils.getBoolean(this.config, key); - } - @Override public Map getMap(String key) { return (Map) MapUtils.getMap(this.config, key); @@ -81,11 +53,21 @@ public class YamlConfigImpl implements Config { @Override public List getList(String key) { - return (List) MapUtils.getObject(this.config, key); + return this.getObject(key, List.class); } @Override public String[] getArr(String key) { - return (String[]) MapUtils.getObject(this.config, key); + return this.getObject(key, String[].class); + } + + @Override + public Object getObject(String key) { + return MapUtils.getObject(this.config, key); + } + + @Override + public int order() { + return 2; } } diff --git a/X-fim-common/src/main/java/com/feige/fim/config/impl/YamlConfigFactory.java b/X-fim-common/src/main/java/com/feige/fim/config/impl/YamlConfigFactory.java index 71e6e9b..4f9a3ed 100644 --- a/X-fim-common/src/main/java/com/feige/fim/config/impl/YamlConfigFactory.java +++ b/X-fim-common/src/main/java/com/feige/fim/config/impl/YamlConfigFactory.java @@ -10,7 +10,7 @@ import java.io.File; public class YamlConfigFactory implements ConfigFactory { @Override public Config create() throws Exception{ - YamlConfigImpl yamlConfig = new YamlConfigImpl(); + YamlConfig yamlConfig = new YamlConfig(); yamlConfig.parseFile(getFile()); return yamlConfig; } @@ -44,7 +44,7 @@ public class YamlConfigFactory implements ConfigFactory { } @Override - public boolean isPrimary() { + public boolean primary() { return true; } } diff --git a/X-fim-common/src/main/java/com/feige/fim/spi/SpiLoader.java b/X-fim-common/src/main/java/com/feige/fim/spi/JdkSpiLoader.java similarity index 42% rename from X-fim-common/src/main/java/com/feige/fim/spi/SpiLoader.java rename to X-fim-common/src/main/java/com/feige/fim/spi/JdkSpiLoader.java index 7f1b3cb..c65babb 100644 --- a/X-fim-common/src/main/java/com/feige/fim/spi/SpiLoader.java +++ b/X-fim-common/src/main/java/com/feige/fim/spi/JdkSpiLoader.java @@ -1,11 +1,13 @@ package com.feige.fim.spi; -import com.feige.fim.utils.StringUtil; -import com.feige.api.annotation.LoadOnlyTheFirstOne; -import com.feige.fim.config.Configs; +import com.feige.api.annotation.LoadOnlyOne; import com.feige.api.spi.Spi; -import org.slf4j.Logger; +import com.feige.api.spi.SpiLoader; +import com.feige.api.spi.SpiNotFoundException; +import com.feige.fim.config.Configs; import com.feige.fim.lg.Loggers; +import com.feige.fim.utils.StringUtil; +import org.slf4j.Logger; import java.util.ArrayList; import java.util.Comparator; @@ -14,29 +16,26 @@ import java.util.Map; import java.util.Objects; import java.util.ServiceLoader; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; -public class SpiLoader { +public class JdkSpiLoader implements SpiLoader { private static final Logger LOG = Loggers.LOADER; - private final Map> spiMap = new ConcurrentHashMap<>(); + private final Map, List> spiMap = new ConcurrentHashMap<>(); private final Class spiClass = Spi.class; - private SpiLoader(){ - - } - - public static SpiLoader getInstance(){ - return InnerSpiLoader.spiLoader; - } + - public void register(String className, List spiList){ - if (spiList.size() > 1){ - spiList.sort(Comparator.comparing(Spi::order)); + @Override + public void register(Class clazz, List objects) { + if (objects.size() > 1){ + objects.sort(Comparator.comparing(Spi::order)); } - this.spiMap.put(className, spiList); + this.spiMap.put(clazz, objects); } - public T get(String key, Class clazz) { - List spiList = spiMap.get(clazz.getCanonicalName()); + @Override + public T get(String key, Class clazz) throws SpiNotFoundException { + List spiList = spiMap.get(clazz); if (spiList != null && !spiList.isEmpty()){ for (Spi spi : spiList) { if (Objects.equals(spi.getKey(), key)){ @@ -48,20 +47,13 @@ public class SpiLoader { throw new SpiNotFoundException(clazz); } - /** - * The value is obtained through the configuration first. - * If the value is not configured or cannot be obtained, the value of primary is true - * @param clazz class - * @param class type - * @return instance - */ - public T getSpiByConfigOrPrimary(Class clazz) { - String name = clazz.getCanonicalName(); + @Override + public T getByConfig(Class clazz, boolean configNullReturnPrimary) throws SpiNotFoundException { String key = null; try { - key = Configs.getString(name); + key = Configs.getString(clazz.getName()); }catch (NullPointerException ignore){} - List spiList = spiMap.get(name); + List spiList = spiMap.get(clazz); if (spiList != null && !spiList.isEmpty()){ if (StringUtil.isNotBlank(key)) { for (Spi spi : spiList) { @@ -70,9 +62,11 @@ public class SpiLoader { } } } - for (Spi spi : spiList) { - if (spi.isPrimary()){ - return clazz.cast(spi); + if (configNullReturnPrimary){ + for (Spi spi : spiList) { + if (spi.primary()){ + return clazz.cast(spi); + } } } throw new SpiNotFoundException(clazz, key); @@ -80,44 +74,45 @@ public class SpiLoader { throw new SpiNotFoundException(clazz); } - public T getDefault(Class clazz) { - String name = clazz.getCanonicalName(); - List spiList = spiMap.get(name); - if (spiList != null && !spiList.isEmpty()){ - for (Spi spi : spiList) { - if (spi.isPrimary()){ - return clazz.cast(spi); - } - } - } - throw new SpiNotFoundException(clazz); + @Override + public List getAll(Class clazz) throws SpiNotFoundException { + List spiList = spiMap.get(clazz); + return spiList.stream() + .map(clazz::cast) + .collect(Collectors.toList()); } - public synchronized void load(String className) { + + @Override + public void load(String className) { try { Class loadClass = Class.forName(className); if (!spiClass.isAssignableFrom(loadClass)){ - LOG.warn("Must implement {}.", spiClass.getCanonicalName()); + LOG.warn("Must implement {}.", spiClass.getName()); return; } - ServiceLoader loader = ServiceLoader.load(loadClass); - ArrayList spiList = new ArrayList<>(); - LoadOnlyTheFirstOne loadOnlyTheFirstOne = loadClass.getAnnotation(LoadOnlyTheFirstOne.class); - for (Object next : loader) { - Spi spi = (Spi) next; - spiList.add(spi); - if(loadOnlyTheFirstOne != null){ - break; + List list = spiMap.get(loadClass); + if (list == null){ + synchronized (className.intern()){ + list = spiMap.get(loadClass); + if (list == null){ + ServiceLoader loader = ServiceLoader.load(loadClass); + List spiList = new ArrayList<>(); + LoadOnlyOne loadOnlyOne = loadClass.getAnnotation(LoadOnlyOne.class); + for (Object next : loader) { + Spi spi = (Spi) next; + spiList.add(spi); + if(loadOnlyOne != null){ + break; + } + } + register(loadClass, spiList); + } } } - register(className, spiList); }catch (Exception e){ LOG.error("spi loader error:", e); } } - - - private static class InnerSpiLoader { - private static final SpiLoader spiLoader = new SpiLoader(); - } + } diff --git a/X-fim-common/src/main/java/com/feige/fim/spi/SpiLoaderUtils.java b/X-fim-common/src/main/java/com/feige/fim/spi/SpiLoaderUtils.java new file mode 100644 index 0000000..61a0d2c --- /dev/null +++ b/X-fim-common/src/main/java/com/feige/fim/spi/SpiLoaderUtils.java @@ -0,0 +1,76 @@ +package com.feige.fim.spi; + +import com.feige.api.spi.Spi; +import com.feige.api.spi.SpiLoader; +import com.feige.api.spi.SpiNotFoundException; + +import java.util.List; + +/** + * @author feige
+ * @ClassName: SpiLoaderUtils
+ * @Description:
+ * @date: 2023/5/20 15:13
+ */ +public class SpiLoaderUtils { + + + private static volatile SpiLoader spiLoader; + + public static SpiLoader getSpiLoader(){ + if (spiLoader == null){ + synchronized (SpiLoaderUtils.class){ + if (spiLoader == null){ + spiLoader = new JdkSpiLoader(); + } + } + } + return spiLoader; + } + + public static void setSpiLoader(SpiLoader loader){ + spiLoader = loader; + } + + /** + * get object by key + * @param key key + * @param clazz class + * @return Object + * @param class type + * @throws SpiNotFoundException + */ + public static T get(String key, Class clazz) throws SpiNotFoundException { + return getSpiLoader().get(key, clazz); + } + + + /** + * get object by config + * @param clazz class + * @param configNullReturnPrimary Whether to take the first object whose primary is true when configured null + * @return object + * @param class type + * @throws SpiNotFoundException + */ + public static T getByConfig(Class clazz, boolean configNullReturnPrimary) throws SpiNotFoundException { + return getSpiLoader().getByConfig(clazz, configNullReturnPrimary); + } + + + /** + * get all object + * @param clazz class + * @return object list + * @param class type + * @throws SpiNotFoundException + */ + public static List getAll(Class clazz) throws SpiNotFoundException { + return getSpiLoader().getAll(clazz); + } + + public static void load(String className){ + getSpiLoader().load(className); + } + +} diff --git a/X-fim-common/src/test/java/com/feige/fim/ConfigTest.java b/X-fim-common/src/test/java/com/feige/fim/ConfigTest.java index cc9fbec..13ae700 100644 --- a/X-fim-common/src/test/java/com/feige/fim/ConfigTest.java +++ b/X-fim-common/src/test/java/com/feige/fim/ConfigTest.java @@ -1,7 +1,7 @@ package com.feige.fim; import com.feige.fim.config.Configs; -import com.feige.fim.spi.SpiLoader; +import com.feige.fim.spi.SpiLoaderUtils; import org.apache.commons.collections4.CollectionUtils; import org.junit.Assert; import org.junit.Test; @@ -18,7 +18,7 @@ public class ConfigTest { public void yamlConfigTest() throws Exception { System.setProperty(Configs.CONFIG_FILE_KEY, "E:\\project\\my\\X-fim-parent\\X-fim-common\\src\\test\\resources\\conf\\fim.yaml"); for (String spi : spiArr) { - SpiLoader.getInstance().load(spi); + SpiLoaderUtils.load(spi); } Configs.loadConfig(); Assert.assertEquals(Configs.getString("fim.test.key"), "value"); diff --git a/X-fim-netty/src/main/java/com/feige/fim/adapter/NettyCodecAdapter.java b/X-fim-netty/src/main/java/com/feige/fim/adapter/NettyCodecAdapter.java index b1c9bd4..bad002b 100644 --- a/X-fim-netty/src/main/java/com/feige/fim/adapter/NettyCodecAdapter.java +++ b/X-fim-netty/src/main/java/com/feige/fim/adapter/NettyCodecAdapter.java @@ -1,14 +1,14 @@ package com.feige.fim.adapter; import com.feige.api.codec.Codec; -import com.feige.fim.spi.SpiLoader; -import org.slf4j.Logger; import com.feige.fim.lg.Loggers; +import com.feige.fim.spi.SpiLoaderUtils; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.MessageToByteEncoder; +import org.slf4j.Logger; import java.util.List; @@ -40,7 +40,7 @@ public class NettyCodecAdapter { if (codec == null){ synchronized (NettyCodecAdapter.class){ if (codec == null) { - codec = SpiLoader.getInstance().getSpiByConfigOrPrimary(Codec.class); + codec = SpiLoaderUtils.getByConfig(Codec.class, true); } } } -- Gitee From fd94c8875d3c6a467f09be79e686476823a353df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E9=A3=9E?= <1835698775@qq.com> Date: Sat, 20 May 2023 18:38:31 +0800 Subject: [PATCH 08/11] =?UTF-8?q?spi=E5=92=8Cconfig=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../{LoadOnlyOne.java => CacheOne.java} | 2 +- .../main/java/com/feige/api/codec/Codec.java | 4 +- .../java/com/feige/api/codec/ICheckSum.java | 4 +- .../com/feige/api/config/ConfigFactory.java | 4 +- .../java/com/feige/api/constant/Const.java | 3 ++ .../com/feige/fim/codec/CheckSumUtils.java | 6 +-- .../java/com/feige/fim/config/Configs.java | 2 + .../java/com/feige/fim/spi/JdkSpiLoader.java | 43 ++++++++++++++++--- 8 files changed, 53 insertions(+), 15 deletions(-) rename X-fim-api/src/main/java/com/feige/api/annotation/{LoadOnlyOne.java => CacheOne.java} (92%) diff --git a/X-fim-api/src/main/java/com/feige/api/annotation/LoadOnlyOne.java b/X-fim-api/src/main/java/com/feige/api/annotation/CacheOne.java similarity index 92% rename from X-fim-api/src/main/java/com/feige/api/annotation/LoadOnlyOne.java rename to X-fim-api/src/main/java/com/feige/api/annotation/CacheOne.java index a226bd6..822c505 100644 --- a/X-fim-api/src/main/java/com/feige/api/annotation/LoadOnlyOne.java +++ b/X-fim-api/src/main/java/com/feige/api/annotation/CacheOne.java @@ -15,6 +15,6 @@ import java.lang.annotation.Target; @Target({ElementType.TYPE}) @Inherited @Documented -public @interface LoadOnlyOne { +public @interface CacheOne { } diff --git a/X-fim-api/src/main/java/com/feige/api/codec/Codec.java b/X-fim-api/src/main/java/com/feige/api/codec/Codec.java index 83aaa36..2ebb016 100644 --- a/X-fim-api/src/main/java/com/feige/api/codec/Codec.java +++ b/X-fim-api/src/main/java/com/feige/api/codec/Codec.java @@ -1,10 +1,10 @@ package com.feige.api.codec; -import com.feige.api.annotation.LoadOnlyOne; +import com.feige.api.annotation.CacheOne; import com.feige.api.session.ISession; import com.feige.api.spi.Spi; -@LoadOnlyOne +@CacheOne public interface Codec extends Spi { /** diff --git a/X-fim-api/src/main/java/com/feige/api/codec/ICheckSum.java b/X-fim-api/src/main/java/com/feige/api/codec/ICheckSum.java index d0acca7..136e9d6 100644 --- a/X-fim-api/src/main/java/com/feige/api/codec/ICheckSum.java +++ b/X-fim-api/src/main/java/com/feige/api/codec/ICheckSum.java @@ -1,9 +1,9 @@ package com.feige.api.codec; -import com.feige.api.annotation.LoadOnlyOne; +import com.feige.api.annotation.CacheOne; import com.feige.api.spi.Spi; -@LoadOnlyOne +@CacheOne public interface ICheckSum extends Spi { /** * diff --git a/X-fim-api/src/main/java/com/feige/api/config/ConfigFactory.java b/X-fim-api/src/main/java/com/feige/api/config/ConfigFactory.java index 069578f..49af70e 100644 --- a/X-fim-api/src/main/java/com/feige/api/config/ConfigFactory.java +++ b/X-fim-api/src/main/java/com/feige/api/config/ConfigFactory.java @@ -1,9 +1,9 @@ package com.feige.api.config; -import com.feige.api.annotation.LoadOnlyOne; +import com.feige.api.annotation.CacheOne; import com.feige.api.spi.Spi; -@LoadOnlyOne +@CacheOne public interface ConfigFactory extends Spi { /** diff --git a/X-fim-api/src/main/java/com/feige/api/constant/Const.java b/X-fim-api/src/main/java/com/feige/api/constant/Const.java index dd9aeae..1b7ddcb 100644 --- a/X-fim-api/src/main/java/com/feige/api/constant/Const.java +++ b/X-fim-api/src/main/java/com/feige/api/constant/Const.java @@ -40,4 +40,7 @@ public interface Const { int PC = 1; int MOBILE = 2; int WEB = 3; + + + String COMMA = ","; } diff --git a/X-fim-common/src/main/java/com/feige/fim/codec/CheckSumUtils.java b/X-fim-common/src/main/java/com/feige/fim/codec/CheckSumUtils.java index 6815016..3851a89 100644 --- a/X-fim-common/src/main/java/com/feige/fim/codec/CheckSumUtils.java +++ b/X-fim-common/src/main/java/com/feige/fim/codec/CheckSumUtils.java @@ -2,7 +2,7 @@ package com.feige.fim.codec; import com.feige.api.codec.CheckSumException; import com.feige.api.codec.ICheckSum; -import com.feige.fim.spi.JdkSpiLoader; +import com.feige.fim.spi.SpiLoaderUtils; public class CheckSumUtils { @@ -15,7 +15,7 @@ public class CheckSumUtils { * @throws CheckSumException */ public static void check(byte[] data, short expectedCheckSum) throws CheckSumException { - ICheckSum iCheckSum = JdkSpiLoader.getInstance().getSpiByConfigOrPrimary(ICheckSum.class); + ICheckSum iCheckSum = SpiLoaderUtils.getByConfig(ICheckSum.class, true); iCheckSum.check(data, expectedCheckSum); } @@ -26,7 +26,7 @@ public class CheckSumUtils { * @return check sum */ public static short calculate(byte[] body) { - ICheckSum iCheckSum = JdkSpiLoader.getInstance().getSpiByConfigOrPrimary(ICheckSum.class); + ICheckSum iCheckSum = SpiLoaderUtils.getByConfig(ICheckSum.class, true); return iCheckSum.getCheckSum(body); } } diff --git a/X-fim-common/src/main/java/com/feige/fim/config/Configs.java b/X-fim-common/src/main/java/com/feige/fim/config/Configs.java index a9e52a2..9690fb7 100644 --- a/X-fim-common/src/main/java/com/feige/fim/config/Configs.java +++ b/X-fim-common/src/main/java/com/feige/fim/config/Configs.java @@ -33,6 +33,8 @@ public final class Configs { String SERVER_ENABLE_UDP_KEY = "fim.server.enable-udp"; String SERVER_UDP_IP_KEY = "fim.server.udp-ip"; String SERVER_UDP_PORT_KEY = "fim.server.udp-port"; + + String SPI_LOADER_KEY = "fim.spi.loader"; } private final static CompositeConfig COMPOSITE_CONFIG = new CompositeConfig(); diff --git a/X-fim-common/src/main/java/com/feige/fim/spi/JdkSpiLoader.java b/X-fim-common/src/main/java/com/feige/fim/spi/JdkSpiLoader.java index c65babb..da86d45 100644 --- a/X-fim-common/src/main/java/com/feige/fim/spi/JdkSpiLoader.java +++ b/X-fim-common/src/main/java/com/feige/fim/spi/JdkSpiLoader.java @@ -1,20 +1,26 @@ package com.feige.fim.spi; -import com.feige.api.annotation.LoadOnlyOne; +import com.feige.api.annotation.CacheOne; +import com.feige.api.constant.Const; import com.feige.api.spi.Spi; import com.feige.api.spi.SpiLoader; import com.feige.api.spi.SpiNotFoundException; import com.feige.fim.config.Configs; import com.feige.fim.lg.Loggers; import com.feige.fim.utils.StringUtil; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; import org.slf4j.Logger; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.ServiceLoader; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -98,15 +104,33 @@ public class JdkSpiLoader implements SpiLoader { if (list == null){ ServiceLoader loader = ServiceLoader.load(loadClass); List spiList = new ArrayList<>(); - LoadOnlyOne loadOnlyOne = loadClass.getAnnotation(LoadOnlyOne.class); + Set keys = keys(className); for (Object next : loader) { Spi spi = (Spi) next; spiList.add(spi); - if(loadOnlyOne != null){ - break; + } + if (keys != null && keys.size() > 0){ + spiList = spiList.stream() + .filter(spi -> keys.contains(spi.getKey())) + .collect(Collectors.toList()); + }else { + CacheOne cacheOne = loadClass.getAnnotation(CacheOne.class); + if (cacheOne != null){ + List collect = spiList.stream() + .filter(Spi::primary) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(collect) && spiList.size() > 0){ + spiList = spiList.subList(0, 1); + }else { + spiList = collect.subList(0, 1); + } } } - register(loadClass, spiList); + if (CollectionUtils.isNotEmpty(spiList)){ + register(loadClass, spiList); + }else { + LOG.warn("class = {}, No implementation classes have been registered", className); + } } } } @@ -115,4 +139,13 @@ public class JdkSpiLoader implements SpiLoader { } } + + private Set keys(String className){ + Map map = Configs.getMap(Configs.ConfigKey.SPI_LOADER_KEY); + String key = MapUtils.getString(map, className); + if (StringUtil.isNotBlank(key)){ + return new HashSet<>(Arrays.asList(key.split(Const.COMMA))); + } + return null; + } } -- Gitee From 88cf47d36165901a2df7c6d20b8d617f4472bbbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E9=A3=9E?= <1835698775@qq.com> Date: Sat, 20 May 2023 18:40:00 +0800 Subject: [PATCH 09/11] =?UTF-8?q?spi=E5=92=8Cconfig=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/com/feige/fim/config/impl/EnvConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/X-fim-common/src/main/java/com/feige/fim/config/impl/EnvConfig.java b/X-fim-common/src/main/java/com/feige/fim/config/impl/EnvConfig.java index e01d39b..29a755f 100644 --- a/X-fim-common/src/main/java/com/feige/fim/config/impl/EnvConfig.java +++ b/X-fim-common/src/main/java/com/feige/fim/config/impl/EnvConfig.java @@ -56,7 +56,7 @@ public class EnvConfig implements Config { @Override public String[] getArr(String key) { - return new String[0]; + return null; } @Override -- Gitee From fcd56a549e43ef5329b709b347f2599649bdf7e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E9=A3=9E?= <1835698775@qq.com> Date: Sun, 21 May 2023 14:03:52 +0800 Subject: [PATCH 10/11] =?UTF-8?q?config=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/feige/api/config/Config.java | 86 ++++++++++++++--- .../fim/config/impl/CompositeConfig.java | 92 +------------------ .../com/feige/fim/config/impl/EnvConfig.java | 46 +--------- .../feige/fim/config/impl/SystemConfig.java | 48 +--------- .../java/com/feige/fim/utils/StringUtil.java | 12 ++- .../test/java/com/feige/fim/ConfigTest.java | 5 +- X-fim-common/src/test/resources/conf/fim.yaml | 3 +- 7 files changed, 95 insertions(+), 197 deletions(-) diff --git a/X-fim-api/src/main/java/com/feige/api/config/Config.java b/X-fim-api/src/main/java/com/feige/api/config/Config.java index bd5518f..f491cdb 100644 --- a/X-fim-api/src/main/java/com/feige/api/config/Config.java +++ b/X-fim-api/src/main/java/com/feige/api/config/Config.java @@ -2,6 +2,8 @@ package com.feige.api.config; import java.io.File; import java.nio.file.Files; +import java.text.NumberFormat; +import java.text.ParseException; import java.util.List; import java.util.Map; @@ -30,7 +32,9 @@ public interface Config { * @param defaultValue default value * @return int config */ - Integer getInt(String key, Integer defaultValue); + default Integer getInt(String key, Integer defaultValue) { + return convert(Integer.class, key, defaultValue); + } /** * get int config @@ -48,7 +52,10 @@ public interface Config { * @param defaultValue default value * @return long config */ - Long getLong(String key, Long defaultValue); + default Long getLong(String key, Long defaultValue) { + return convert(Long.class, key, defaultValue); + } + /** * get long config @@ -65,7 +72,9 @@ public interface Config { * @param defaultValue default value * @return double config */ - Double getDouble(String key, Double defaultValue); + default Double getDouble(String key, Double defaultValue) { + return convert(Double.class, key, defaultValue); + } /** * get double config @@ -82,7 +91,9 @@ public interface Config { * @param defaultValue default value * @return string config */ - String getString(String key, String defaultValue); + default String getString(String key, String defaultValue) { + return convert(String.class, key, defaultValue); + } /** * get string config @@ -99,7 +110,9 @@ public interface Config { * @param defaultValue default value * @return boolean config */ - Boolean getBoolean(String key, Boolean defaultValue); + default Boolean getBoolean(String key, Boolean defaultValue) { + return convert(Boolean.class, key, defaultValue); + } /** * get boolean config @@ -115,21 +128,27 @@ public interface Config { * @param key key * @return map config */ - Map getMap(String key); + default Map getMap(String key) { + return convert(Map.class, key, null); + } /** * get list config * @param key key * @return list config */ - List getList(String key); + default List getList(String key) { + return convert(List.class, key, null); + } /** * get array config * @param key key * @return array config */ - String[] getArr(String key); + default String[] getArr(String key) { + return convert(String[].class, key, null); + } /** @@ -147,13 +166,56 @@ public interface Config { * @param type */ default T getObject(String key, Class type) { - Object object = getObject(key); - return type.cast(object); + return convert(type, key, null); } /** - * 序号 - * @return 序号 + * order + * @return order */ int order(); + + default T convert(Class cls, String key, T defaultValue){ + Object answer = this.getObject(key); + if (answer == null){ + return defaultValue; + } + if (cls.isInstance(answer)){ + return cls.cast(answer); + } + if (Boolean.class.equals(cls) || Boolean.TYPE.equals(cls)){ + if (answer instanceof String) { + answer = Boolean.valueOf((String)answer); + } + if (answer instanceof Number) { + Number n = (Number)answer; + answer = n.intValue() != 0 ? Boolean.TRUE : Boolean.FALSE; + } + }else if (cls.isAssignableFrom(Number.class) || cls.isPrimitive()){ + if (answer instanceof String) { + try { + String text = (String)answer; + answer = NumberFormat.getInstance().parse(text); + } catch (ParseException ignored) {} + } + if (answer instanceof Number) { + if (Integer.class.equals(cls) || Integer.TYPE.equals(cls)) { + answer = ((Number) answer).intValue(); + } else if (Long.class.equals(cls) || Long.TYPE.equals(cls)) { + answer = ((Number) answer).longValue(); + } else if (Byte.class.equals(cls) || Byte.TYPE.equals(cls)) { + answer = ((Number) answer).byteValue(); + } else if (Short.class.equals(cls) || Short.TYPE.equals(cls)) { + answer = ((Number) answer).shortValue(); + } else if (Float.class.equals(cls) || Float.TYPE.equals(cls)) { + answer = ((Number) answer).floatValue(); + } else if (Double.class.equals(cls) || Double.TYPE.equals(cls)) { + answer = ((Number) answer).doubleValue(); + } + } + }else if (cls.isEnum()) { + answer = Enum.valueOf(cls.asSubclass(Enum.class), (String) answer); + } + return cls.cast(answer); + } } diff --git a/X-fim-common/src/main/java/com/feige/fim/config/impl/CompositeConfig.java b/X-fim-common/src/main/java/com/feige/fim/config/impl/CompositeConfig.java index dcfd169..ece13c4 100644 --- a/X-fim-common/src/main/java/com/feige/fim/config/impl/CompositeConfig.java +++ b/X-fim-common/src/main/java/com/feige/fim/config/impl/CompositeConfig.java @@ -1,11 +1,11 @@ package com.feige.fim.config.impl; import com.feige.api.config.Config; +import com.feige.fim.utils.StringUtil; import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.Map; /** * @author feige
@@ -21,99 +21,11 @@ public class CompositeConfig implements Config { } - @Override - public Integer getInt(String key, Integer defaultValue) { - for (Config config : configList) { - Integer value = config.getInt(key, defaultValue); - if (value != null){ - return value; - } - } - return defaultValue; - } - - @Override - public Long getLong(String key, Long defaultValue) { - for (Config config : configList) { - Long value = config.getLong(key, defaultValue); - if (value != null){ - return value; - } - } - return defaultValue; - } - - @Override - public Double getDouble(String key, Double defaultValue) { - for (Config config : configList) { - Double value = config.getDouble(key, defaultValue); - if (value != null){ - return value; - } - } - return defaultValue; - } - - @Override - public String getString(String key, String defaultValue) { - for (Config config : configList) { - String value = config.getString(key, defaultValue); - if (value != null){ - return value; - } - } - return defaultValue; - } - - @Override - public Boolean getBoolean(String key, Boolean defaultValue) { - for (Config config : configList) { - Boolean value = config.getBoolean(key, defaultValue); - if (value != null){ - return value; - } - } - return defaultValue; - } - - @Override - public Map getMap(String key) { - for (Config config : configList) { - Map value = config.getMap(key); - if (value != null){ - return value; - } - } - return null; - } - - @Override - public List getList(String key) { - for (Config config : configList) { - List value = config.getList(key); - if (value != null){ - return value; - } - } - return null; - } - - @Override - public String[] getArr(String key) { - for (Config config : configList) { - String[] value = config.getArr(key); - if (value != null){ - return value; - } - } - return null; - } - @Override public Object getObject(String key) { for (Config config : configList) { Object value = config.getObject(key); - if (value != null){ + if (!StringUtil.isBlank(value)){ return value; } } diff --git a/X-fim-common/src/main/java/com/feige/fim/config/impl/EnvConfig.java b/X-fim-common/src/main/java/com/feige/fim/config/impl/EnvConfig.java index 29a755f..64ceb33 100644 --- a/X-fim-common/src/main/java/com/feige/fim/config/impl/EnvConfig.java +++ b/X-fim-common/src/main/java/com/feige/fim/config/impl/EnvConfig.java @@ -2,9 +2,6 @@ package com.feige.fim.config.impl; import com.feige.api.config.Config; -import java.util.List; -import java.util.Map; - /** * @author feige
* @ClassName: EnvConfig
@@ -13,55 +10,14 @@ import java.util.Map; */ public class EnvConfig implements Config { - private final Map envMap = System.getenv(); @Override public void parseConfig(Object config) throws Exception { } - @Override - public Integer getInt(String key, Integer defaultValue) { - return defaultValue; - } - - @Override - public Long getLong(String key, Long defaultValue) { - return defaultValue; - } - - @Override - public Double getDouble(String key, Double defaultValue) { - return defaultValue; - } - - @Override - public String getString(String key, String defaultValue) { - return System.getenv(key); - } - - @Override - public Boolean getBoolean(String key, Boolean defaultValue) { - return defaultValue; - } - - @Override - public Map getMap(String key) { - return null; - } - - @Override - public List getList(String key) { - return null; - } - - @Override - public String[] getArr(String key) { - return null; - } - @Override public Object getObject(String key) { - return null; + return System.getenv(key); } @Override diff --git a/X-fim-common/src/main/java/com/feige/fim/config/impl/SystemConfig.java b/X-fim-common/src/main/java/com/feige/fim/config/impl/SystemConfig.java index 06bdae7..3c16bf4 100644 --- a/X-fim-common/src/main/java/com/feige/fim/config/impl/SystemConfig.java +++ b/X-fim-common/src/main/java/com/feige/fim/config/impl/SystemConfig.java @@ -2,10 +2,6 @@ package com.feige.fim.config.impl; import com.feige.api.config.Config; -import java.util.List; -import java.util.Map; -import java.util.Properties; - /** * @author feige
* @ClassName: SystemConfig
@@ -13,55 +9,13 @@ import java.util.Properties; * @date: 2023/5/20 10:08
*/ public class SystemConfig implements Config { - private final Properties prop = System.getProperties(); @Override public void parseConfig(Object obj) throws Exception { } - - @Override - public Integer getInt(String key, Integer defaultValue) { - return (Integer) prop.getOrDefault(key, defaultValue); - } - - @Override - public Long getLong(String key, Long defaultValue) { - return (Long) prop.getOrDefault(key, defaultValue); - } - - @Override - public Double getDouble(String key, Double defaultValue) { - return (Double) prop.getOrDefault(key, defaultValue); - } - - @Override - public String getString(String key, String defaultValue) { - return System.getProperty(key, defaultValue); - } - - @Override - public Boolean getBoolean(String key, Boolean defaultValue) { - return (Boolean) prop.getOrDefault(key, defaultValue); - } - - @Override - public Map getMap(String key) { - return null; - } - - @Override - public List getList(String key) { - return null; - } - - @Override - public String[] getArr(String key) { - return null; - } - @Override public Object getObject(String key) { - return this.prop.get(key); + return System.getProperty(key); } @Override diff --git a/X-fim-common/src/main/java/com/feige/fim/utils/StringUtil.java b/X-fim-common/src/main/java/com/feige/fim/utils/StringUtil.java index 00ad067..12eab30 100644 --- a/X-fim-common/src/main/java/com/feige/fim/utils/StringUtil.java +++ b/X-fim-common/src/main/java/com/feige/fim/utils/StringUtil.java @@ -12,7 +12,17 @@ public class StringUtil { public static final String EMPTY_STR = ""; public static boolean isEmpty(Object str) { - return str == null || "".equals(str); + return str == null || EMPTY_STR.equals(str); + } + public static boolean isNull(Object obj){ + return obj == null; + } + public static boolean isBlank(Object obj){ + if (obj instanceof CharSequence){ + return isBlank((CharSequence) obj); + }else { + return isNull(obj); + } } public static boolean isBlank(CharSequence s) { diff --git a/X-fim-common/src/test/java/com/feige/fim/ConfigTest.java b/X-fim-common/src/test/java/com/feige/fim/ConfigTest.java index 13ae700..a6a7108 100644 --- a/X-fim-common/src/test/java/com/feige/fim/ConfigTest.java +++ b/X-fim-common/src/test/java/com/feige/fim/ConfigTest.java @@ -10,18 +10,21 @@ import java.util.Arrays; public class ConfigTest { + public static final String CONFIG_PATH = "E:\\project\\im\\X-fim-parent\\X-fim-common\\src\\test\\resources\\conf\\fim.yaml"; String[] spiArr = { "com.feige.api.config.ConfigFactory" }; @Test public void yamlConfigTest() throws Exception { - System.setProperty(Configs.CONFIG_FILE_KEY, "E:\\project\\my\\X-fim-parent\\X-fim-common\\src\\test\\resources\\conf\\fim.yaml"); + System.setProperty(Configs.CONFIG_FILE_KEY, CONFIG_PATH); for (String spi : spiArr) { SpiLoaderUtils.load(spi); } Configs.loadConfig(); Assert.assertEquals(Configs.getString("fim.test.key"), "value"); Assert.assertTrue(CollectionUtils.isEqualCollection(Configs.getList("fim.test.arr"), Arrays.asList("a","b","c"))); + Assert.assertEquals(Configs.getString(Configs.CONFIG_FILE_KEY), CONFIG_PATH); + Assert.assertEquals(Configs.getAppConfig().getString(Configs.CONFIG_FILE_KEY), "test"); } } diff --git a/X-fim-common/src/test/resources/conf/fim.yaml b/X-fim-common/src/test/resources/conf/fim.yaml index 13afd03..97dde6c 100644 --- a/X-fim-common/src/test/resources/conf/fim.yaml +++ b/X-fim-common/src/test/resources/conf/fim.yaml @@ -4,4 +4,5 @@ fim: arr: - a - b - - c \ No newline at end of file + - c + path: test \ No newline at end of file -- Gitee From d2fa21a2e2b07bcf45682e8de5910f634aa06552 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E9=A3=9E?= <1835698775@qq.com> Date: Sun, 21 May 2023 22:08:44 +0800 Subject: [PATCH 11/11] =?UTF-8?q?session=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/feige/api/session/ISession.java | 3 +- .../feige/fim/session/AbstractSession.java | 60 ++++++++++++++++ .../fim/session/AbstractSessionHandler.java | 43 +++++++++++ .../session/AbstractSessionRepository.java | 15 ++-- .../fim/adapter/NettyChannelAdapter.java | 71 +++++++------------ .../feige/fim/server/NettyServerHandler.java | 49 +++++++++++++ 6 files changed, 185 insertions(+), 56 deletions(-) create mode 100644 X-fim-common/src/main/java/com/feige/fim/session/AbstractSession.java create mode 100644 X-fim-common/src/main/java/com/feige/fim/session/AbstractSessionHandler.java create mode 100644 X-fim-netty/src/main/java/com/feige/fim/server/NettyServerHandler.java diff --git a/X-fim-api/src/main/java/com/feige/api/session/ISession.java b/X-fim-api/src/main/java/com/feige/api/session/ISession.java index 12b82f0..fd349c7 100644 --- a/X-fim-api/src/main/java/com/feige/api/session/ISession.java +++ b/X-fim-api/src/main/java/com/feige/api/session/ISession.java @@ -1,5 +1,6 @@ package com.feige.api.session; +import com.feige.api.handler.RemotingException; import com.feige.api.spi.Spi; import java.net.InetSocketAddress; @@ -35,7 +36,7 @@ public interface ISession extends Spi { * * @param msg */ - void write(Object msg); + void write(Object msg) throws RemotingException; /** diff --git a/X-fim-common/src/main/java/com/feige/fim/session/AbstractSession.java b/X-fim-common/src/main/java/com/feige/fim/session/AbstractSession.java new file mode 100644 index 0000000..e5ee85c --- /dev/null +++ b/X-fim-common/src/main/java/com/feige/fim/session/AbstractSession.java @@ -0,0 +1,60 @@ +package com.feige.fim.session; + +import com.feige.api.session.ISession; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author feige
+ * @ClassName: AbstractSession
+ * @Description:
+ * @date: 2023/5/21 16:45
+ */ +public abstract class AbstractSession implements ISession { + protected final AtomicBoolean active = new AtomicBoolean(false); + + private final Map attributes = new ConcurrentHashMap<>(); + + + public void markActive(boolean isActive) { + active.set(isActive); + } + + @Override + public boolean hasAttr(String key) { + return attributes.containsKey(key); + } + + @Override + public Object getAttr(String key) { + return attributes.get(key); + } + + @Override + public void setAttr(String key, Object value) { + attributes.put(key, value); + } + + @Override + public void removeAttr(String key) { + attributes.remove(key); + } + + @Override + public void close() { + this.markActive(false); + this.attributes.clear(); + } + + @Override + public boolean isClosed() { + return !active.get(); + } + + @Override + public boolean isConnected() { + return active.get(); + } +} diff --git a/X-fim-common/src/main/java/com/feige/fim/session/AbstractSessionHandler.java b/X-fim-common/src/main/java/com/feige/fim/session/AbstractSessionHandler.java new file mode 100644 index 0000000..7fd2e97 --- /dev/null +++ b/X-fim-common/src/main/java/com/feige/fim/session/AbstractSessionHandler.java @@ -0,0 +1,43 @@ +package com.feige.fim.session; + +import com.feige.api.handler.RemotingException; +import com.feige.api.handler.SessionHandler; +import com.feige.api.session.ISession; +import com.feige.api.session.SessionRepository; + +/** + * @author feige
+ * @ClassName: AbstractSessionHandler
+ * @Description:
+ * @date: 2023/5/21 17:06
+ */ +public abstract class AbstractSessionHandler implements SessionHandler { + private SessionRepository sessionRepository; + + @Override + public void connected(ISession session) throws RemotingException { + sessionRepository.add(session); + } + + @Override + public void disconnected(ISession session) throws RemotingException { + sessionRepository.remove(session); + } + + @Override + public void sent(ISession session, Object message) throws RemotingException { + + } + + + @Override + public void received(ISession session, Object message) throws RemotingException { + + } + + @Override + public void caught(ISession session, Throwable exception) throws RemotingException { + + } + +} diff --git a/X-fim-common/src/main/java/com/feige/fim/session/AbstractSessionRepository.java b/X-fim-common/src/main/java/com/feige/fim/session/AbstractSessionRepository.java index d81e788..d549cdf 100644 --- a/X-fim-common/src/main/java/com/feige/fim/session/AbstractSessionRepository.java +++ b/X-fim-common/src/main/java/com/feige/fim/session/AbstractSessionRepository.java @@ -1,16 +1,17 @@ package com.feige.fim.session; -import com.feige.fim.utils.StringUtil; import com.feige.api.session.ISession; import com.feige.api.session.SessionRepository; +import com.feige.fim.utils.StringUtil; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public abstract class AbstractSessionRepository implements SessionRepository { - private final Map> sessionsMap = new ConcurrentHashMap<>(); + private final Map> sessionsMap = new ConcurrentHashMap<>(); @Override public String getUid(ISession session) { @@ -41,14 +42,8 @@ public abstract class AbstractSessionRepository implements SessionRepository { @Override public boolean containsSession(String uid, Integer clientType) { if (StringUtil.isNotBlank(uid)){ - Map sessionMap = sessionsMap.get(uid); - if (sessionMap != null && !sessionMap.isEmpty()){ - if (!StringUtil.isEmpty(clientType)){ - return sessionMap.containsKey(clientType); - }else { - return true; - } - } + List sessions = sessionsMap.get(uid); + return true; } return false; } diff --git a/X-fim-netty/src/main/java/com/feige/fim/adapter/NettyChannelAdapter.java b/X-fim-netty/src/main/java/com/feige/fim/adapter/NettyChannelAdapter.java index f89cc4e..0a815f9 100644 --- a/X-fim-netty/src/main/java/com/feige/fim/adapter/NettyChannelAdapter.java +++ b/X-fim-netty/src/main/java/com/feige/fim/adapter/NettyChannelAdapter.java @@ -1,25 +1,29 @@ package com.feige.fim.adapter; +import com.feige.api.handler.RemotingException; import com.feige.api.session.ISession; -import io.netty.channel.ChannelHandlerContext; +import com.feige.fim.session.AbstractSession; +import io.netty.channel.Channel; import java.net.InetSocketAddress; -public class NettyChannelAdapter implements ISession { - private final ChannelHandlerContext ctx; - - public NettyChannelAdapter(ChannelHandlerContext ctx) { - this.ctx = ctx; +public class NettyChannelAdapter extends AbstractSession { + + private final Channel channel; + + public NettyChannelAdapter(Channel channel) { + this.channel = channel; + this.markActive(this.channel.isActive()); } - public static ISession fromCtx(ChannelHandlerContext ctx){ - return new NettyChannelAdapter(ctx); + public static ISession fromChannel(Channel channel){ + return new NettyChannelAdapter(channel); } @Override public String getId() { - return ctx.channel().id().asLongText(); + return channel.id().asLongText(); } @Override @@ -29,56 +33,33 @@ public class NettyChannelAdapter implements ISession { @Override public InetSocketAddress getLocalAddress() { - return (InetSocketAddress) ctx.channel().localAddress(); + return (InetSocketAddress) channel.localAddress(); } @Override public InetSocketAddress getRemoteAddress() { - return (InetSocketAddress) ctx.channel().remoteAddress(); + return (InetSocketAddress) channel.remoteAddress(); } @Override - public void write(Object msg) { - + public void write(Object msg) throws RemotingException { + if (isClosed()) { + throw new RemotingException(this, "Failed to write message " + + (msg == null ? "" : msg.getClass().getName()) + + ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress()); + } + channel.writeAndFlush(msg); } @Override public void close() { - - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public boolean isConnected() { - return false; - } - - @Override - public boolean hasAttr(String key) { - return false; - } - - @Override - public Object getAttr(String key) { - return null; - } - - @Override - public void setAttr(String key, Object value) { - - } - - @Override - public void removeAttr(String key) { - + super.close(); + channel.close(); } @Override public String getKey() { - return null; + return "session"; } + } diff --git a/X-fim-netty/src/main/java/com/feige/fim/server/NettyServerHandler.java b/X-fim-netty/src/main/java/com/feige/fim/server/NettyServerHandler.java new file mode 100644 index 0000000..3ce2d9e --- /dev/null +++ b/X-fim-netty/src/main/java/com/feige/fim/server/NettyServerHandler.java @@ -0,0 +1,49 @@ +package com.feige.fim.server; + +import com.feige.api.handler.SessionHandler; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; + +/** + * @author feige
+ * @ClassName: NettyServerHandler
+ * @Description:
+ * @date: 2023/5/21 14:07
+ */ +@ChannelHandler.Sharable +public class NettyServerHandler extends ChannelDuplexHandler { + + private SessionHandler sessionHandler; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + super.write(ctx, msg, promise); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + super.userEventTriggered(ctx, evt); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + } +} -- Gitee