netty的服务端核心属性
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); //netty的启动核心入口 private final ServerBootstrap serverBootstrap; //netty的入口连接池 private final EventLoopGroup eventLoopGroupSelector; //netty的handler的连接池 private final EventLoopGroup eventLoopGroupBoss; private final NettyServerConfig nettyServerConfig; //操作处理的线程池 private final ExecutorService publicExecutor; //处理监听的回调服务 private final ChannelEventListener channelEventListener; private final Timer timer = new Timer("ServerHouseKeepingService", true); private DefaultEventExecutorGroup defaultEventExecutorGroup;
netty服务端的核心构造
public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) { super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); //初始化netty的核心框架 this.serverBootstrap = new ServerBootstrap(); this.nettyServerConfig = nettyServerConfig; this.channelEventListener = channelEventListener; int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads(); if (publicThreadNums <= 0) { publicThreadNums = 4; } //共享线程池的初始化 this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet()); } }); //获得系统平台是否支持epoll的高性能io操作 if (useEpoll()) { this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet())); } }); this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } else { this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet())); } }); this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } loadSslContext();}
netty的服务端核心启动
public void start() { //默认的事件处理进程组初始化 this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); //netty的服务端启动配置 ServerBootstrap childHandler = //netty中服务端的标准设置,一个boss,多个worker this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) //采用的nio模型 .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) //socket参数,服务端接受链接的队列长度,如果队列已满则拒绝,默认值较小 win200 ,它128 .option(ChannelOption.SO_BACKLOG, 1024) //socket参数,地址复用,默认false,快速启动的操作更优 .option(ChannelOption.SO_REUSEADDR, true) //socket参数,连接保持,默认false,tcp会主动探测连接的有效性,可理解为心跳 .option(ChannelOption.SO_KEEPALIVE, false) //tcp参数,立即发送数据,netty默认true,系统默认false,如果选择带宽的性能可以设置为false,一次发送多个数据款 .childOption(ChannelOption.TCP_NODELAY, true) //socket参数,tcp数据发送缓冲区 .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) //socket参数,tcp数据接收缓冲区 .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) //绑定当前服务器及端口 .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) //设置handler .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) //增加业务需要的handler .addLast(defaultEventExecutorGroup, //编码 new NettyEncoder(), //解码 new NettyDecoder(), //idle new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //连接管理 new NettyConnectManageHandler(), //业务核心处理 new NettyServerHandler() ); } }); //buf的分配器设置 if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } //启动当前的服务 try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } //namesrv端的事件回调处理机制,会调用channelEventListener的事件处理 if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } //执行响应数据的处理 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000);}
netty服务端核心业务逻辑设计
class NettyServerHandler extends SimpleChannelInboundHandler{ //读取网络请求 @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); }}
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: //处理请求类的操作 processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: //处理响应类的操作 processResponseCommand(ctx, cmd); break; default: break; } }}
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { //获得对应的事件处理对象 final Pairmatched = this.processorTable.get(cmd.getCode()); //健全的配置事件处理 final Pair pair = null == matched ? this.defaultRequestProcessor : matched; //消息唯一标示 final int opaque = cmd.getOpaque(); if (pair != null) { //构建异步线程操作 Runnable run = new Runnable() { @Override public void run() { try { //前置处理 doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); //执行对应的事件处理 final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); //后置处理 doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); //如果不是单发需要返回对应的请求唯一标识和数据 if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { //将结果数据会写给netty的channel ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; //验证请求处理类型 if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } //封装异步处理线程,同时根据配置的事件线程池执行事件异步线程, try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); }}
===========================================================================
netty客户端的核心属性
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final long LOCK_TIMEOUT_MILLIS = 3000; //netty的client配置 private final NettyClientConfig nettyClientConfig; //netty的启动核心入口 private final Bootstrap bootstrap = new Bootstrap(); //netty的启动线程池事件集合 private final EventLoopGroup eventLoopGroupWorker; private final Lock lockChannelTables = new ReentrantLock(); //channel的本地内存数据接口 private final ConcurrentMapchannelTables = new ConcurrentHashMap (); private final Timer timer = new Timer("ClientHouseKeepingService", true); private final AtomicReference
> namesrvAddrList = new AtomicReference
>(); private final AtomicReference namesrvAddrChoosed = new AtomicReference (); private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex()); private final Lock lockNamesrvChannel = new ReentrantLock(); private final ExecutorService publicExecutor; /** * Invoke the callback methods in this executor when process response. */ private ExecutorService callbackExecutor; private final ChannelEventListener channelEventListener; private DefaultEventExecutorGroup defaultEventExecutorGroup;
netty客户端的初始化构造
//客户端的通信实现public NettyRemotingClient(final NettyClientConfig nettyClientConfig, final ChannelEventListener channelEventListener) { super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue()); this.nettyClientConfig = nettyClientConfig; this.channelEventListener = channelEventListener; int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads(); if (publicThreadNums <= 0) { publicThreadNums = 4; } //共享线程池配置 this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet()); } }); //netty的客户端处理eventgroup配置 this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet())); } }); //验证是否加载ssl if (nettyClientConfig.isUseTLS()) { try { sslContext = TlsHelper.buildSslContext(true); log.info("SSL enabled for client"); } catch (IOException e) { log.error("Failed to create SSLContext", e); } catch (CertificateException e) { log.error("Failed to create SSLContext", e); throw new RuntimeException("Failed to create SSLContext", e); } }}
netty客户端的启动
public void start() { //构造个性的线程池执行分组,定制线程名称格式 this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); } }); //netty的客户端通信关键,可以参照netty的option配置详细 Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); } else { log.warn("Connections are insecure as SSLContext is null!"); } } pipeline.addLast( defaultEventExecutorGroup, //编码 new NettyEncoder(), //解码 new NettyDecoder(), //idle new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), //链接管理 new NettyConnectManageHandler(), //核心请求处理 new NettyClientHandler()); } }); //执行扫描响应结果,每秒 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); //事件处理机制 if (this.channelEventListener != null) { this.nettyEventExecutor.start(); }}
//netty客户端的核心实现class NettyClientHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); }}
服务端和客户端共享的代码设计
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: //处理请求类的操作 processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: //处理响应类的操作 processResponseCommand(ctx, cmd); break; default: break; } }}
netty客户端处理服务端的响应数据
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); }}
=============================================================================
这里只讲netty的通信设计,后面的章节会根据功能详细的分析实现不同功能的技术设计和技术实现