博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rocketmq之源码分析netty实现源码(六)
阅读量:6689 次
发布时间:2019-06-25

本文共 19168 字,大约阅读时间需要 63 分钟。

hot3.png

netty的服务端核心属性

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);    //netty的启动核心入口    private final ServerBootstrap serverBootstrap;    //netty的入口连接池    private final EventLoopGroup eventLoopGroupSelector;    //netty的handler的连接池    private final EventLoopGroup eventLoopGroupBoss;    private final NettyServerConfig nettyServerConfig;    //操作处理的线程池    private final ExecutorService publicExecutor;    //处理监听的回调服务    private final ChannelEventListener channelEventListener;    private final Timer timer = new Timer("ServerHouseKeepingService", true);    private DefaultEventExecutorGroup defaultEventExecutorGroup;

netty服务端的核心构造

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,    final ChannelEventListener channelEventListener) {    super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());    //初始化netty的核心框架    this.serverBootstrap = new ServerBootstrap();    this.nettyServerConfig = nettyServerConfig;    this.channelEventListener = channelEventListener;    int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();    if (publicThreadNums <= 0) {        publicThreadNums = 4;    }    //共享线程池的初始化    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {        private AtomicInteger threadIndex = new AtomicInteger(0);        @Override        public Thread newThread(Runnable r) {            return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());        }    });    //获得系统平台是否支持epoll的高性能io操作    if (useEpoll()) {        this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {            private AtomicInteger threadIndex = new AtomicInteger(0);            @Override            public Thread newThread(Runnable r) {                return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));            }        });        this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {            private AtomicInteger threadIndex = new AtomicInteger(0);            private int threadTotal = nettyServerConfig.getServerSelectorThreads();            @Override            public Thread newThread(Runnable r) {                return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));            }        });    } else {        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {            private AtomicInteger threadIndex = new AtomicInteger(0);            @Override            public Thread newThread(Runnable r) {                return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));            }        });        this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {            private AtomicInteger threadIndex = new AtomicInteger(0);            private int threadTotal = nettyServerConfig.getServerSelectorThreads();            @Override            public Thread newThread(Runnable r) {                return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));            }        });    }    loadSslContext();}

netty的服务端核心启动

public void start() {    //默认的事件处理进程组初始化    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(        nettyServerConfig.getServerWorkerThreads(),        new ThreadFactory() {            private AtomicInteger threadIndex = new AtomicInteger(0);            @Override            public Thread newThread(Runnable r) {                return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());            }        });    //netty的服务端启动配置    ServerBootstrap childHandler =            //netty中服务端的标准设置,一个boss,多个worker        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)            //采用的nio模型            .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)            //socket参数,服务端接受链接的队列长度,如果队列已满则拒绝,默认值较小 win200 ,它128            .option(ChannelOption.SO_BACKLOG, 1024)            //socket参数,地址复用,默认false,快速启动的操作更优            .option(ChannelOption.SO_REUSEADDR, true)            //socket参数,连接保持,默认false,tcp会主动探测连接的有效性,可理解为心跳            .option(ChannelOption.SO_KEEPALIVE, false)            //tcp参数,立即发送数据,netty默认true,系统默认false,如果选择带宽的性能可以设置为false,一次发送多个数据款            .childOption(ChannelOption.TCP_NODELAY, true)            //socket参数,tcp数据发送缓冲区            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())            //socket参数,tcp数据接收缓冲区            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())            //绑定当前服务器及端口            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))            //设置handler            .childHandler(new ChannelInitializer
() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) //增加业务需要的handler .addLast(defaultEventExecutorGroup, //编码 new NettyEncoder(), //解码 new NettyDecoder(), //idle new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //连接管理 new NettyConnectManageHandler(), //业务核心处理 new NettyServerHandler() ); } }); //buf的分配器设置 if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } //启动当前的服务 try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } //namesrv端的事件回调处理机制,会调用channelEventListener的事件处理 if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } //执行响应数据的处理 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000);}

netty服务端核心业务逻辑设计

class NettyServerHandler extends SimpleChannelInboundHandler
{ //读取网络请求 @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); }}
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {    final RemotingCommand cmd = msg;    if (cmd != null) {        switch (cmd.getType()) {            case REQUEST_COMMAND:                //处理请求类的操作                processRequestCommand(ctx, cmd);                break;            case RESPONSE_COMMAND:                //处理响应类的操作                processResponseCommand(ctx, cmd);                break;            default:                break;        }    }}
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {    //获得对应的事件处理对象    final Pair
matched = this.processorTable.get(cmd.getCode()); //健全的配置事件处理 final Pair
pair = null == matched ? this.defaultRequestProcessor : matched; //消息唯一标示 final int opaque = cmd.getOpaque(); if (pair != null) { //构建异步线程操作 Runnable run = new Runnable() { @Override public void run() { try { //前置处理 doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); //执行对应的事件处理 final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); //后置处理 doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); //如果不是单发需要返回对应的请求唯一标识和数据 if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { //将结果数据会写给netty的channel ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; //验证请求处理类型 if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } //封装异步处理线程,同时根据配置的事件线程池执行事件异步线程, try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); }}

===========================================================================

netty客户端的核心属性

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);    private static final long LOCK_TIMEOUT_MILLIS = 3000;    //netty的client配置    private final NettyClientConfig nettyClientConfig;    //netty的启动核心入口    private final Bootstrap bootstrap = new Bootstrap();    //netty的启动线程池事件集合    private final EventLoopGroup eventLoopGroupWorker;    private final Lock lockChannelTables = new ReentrantLock();    //channel的本地内存数据接口    private final ConcurrentMap
channelTables = new ConcurrentHashMap
(); private final Timer timer = new Timer("ClientHouseKeepingService", true); private final AtomicReference
> namesrvAddrList = new AtomicReference
>(); private final AtomicReference
namesrvAddrChoosed = new AtomicReference
(); private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex()); private final Lock lockNamesrvChannel = new ReentrantLock(); private final ExecutorService publicExecutor; /** * Invoke the callback methods in this executor when process response. */ private ExecutorService callbackExecutor; private final ChannelEventListener channelEventListener; private DefaultEventExecutorGroup defaultEventExecutorGroup;

netty客户端的初始化构造

//客户端的通信实现public NettyRemotingClient(final NettyClientConfig nettyClientConfig,    final ChannelEventListener channelEventListener) {    super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());    this.nettyClientConfig = nettyClientConfig;    this.channelEventListener = channelEventListener;    int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();    if (publicThreadNums <= 0) {        publicThreadNums = 4;    }    //共享线程池配置    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {        private AtomicInteger threadIndex = new AtomicInteger(0);        @Override        public Thread newThread(Runnable r) {            return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());        }    });    //netty的客户端处理eventgroup配置    this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {        private AtomicInteger threadIndex = new AtomicInteger(0);        @Override        public Thread newThread(Runnable r) {            return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));        }    });    //验证是否加载ssl    if (nettyClientConfig.isUseTLS()) {        try {            sslContext = TlsHelper.buildSslContext(true);            log.info("SSL enabled for client");        } catch (IOException e) {            log.error("Failed to create SSLContext", e);        } catch (CertificateException e) {            log.error("Failed to create SSLContext", e);            throw new RuntimeException("Failed to create SSLContext", e);        }    }}

netty客户端的启动

public void start() {    //构造个性的线程池执行分组,定制线程名称格式    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(        nettyClientConfig.getClientWorkerThreads(),        new ThreadFactory() {            private AtomicInteger threadIndex = new AtomicInteger(0);            @Override            public Thread newThread(Runnable r) {                return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());            }        });    //netty的客户端通信关键,可以参照netty的option配置详细    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)        .option(ChannelOption.TCP_NODELAY, true)        .option(ChannelOption.SO_KEEPALIVE, false)        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())        .handler(new ChannelInitializer
() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); } else { log.warn("Connections are insecure as SSLContext is null!"); } } pipeline.addLast( defaultEventExecutorGroup, //编码 new NettyEncoder(), //解码 new NettyDecoder(), //idle new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), //链接管理 new NettyConnectManageHandler(), //核心请求处理 new NettyClientHandler()); } }); //执行扫描响应结果,每秒 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); //事件处理机制 if (this.channelEventListener != null) { this.nettyEventExecutor.start(); }}
//netty客户端的核心实现class NettyClientHandler extends SimpleChannelInboundHandler
{ @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); }}

服务端和客户端共享的代码设计

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {    final RemotingCommand cmd = msg;    if (cmd != null) {        switch (cmd.getType()) {            case REQUEST_COMMAND:                //处理请求类的操作                processRequestCommand(ctx, cmd);                break;            case RESPONSE_COMMAND:                //处理响应类的操作                processResponseCommand(ctx, cmd);                break;            default:                break;        }    }}

netty客户端处理服务端的响应数据

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {    final int opaque = cmd.getOpaque();    final ResponseFuture responseFuture = responseTable.get(opaque);    if (responseFuture != null) {        responseFuture.setResponseCommand(cmd);        responseTable.remove(opaque);        if (responseFuture.getInvokeCallback() != null) {            executeInvokeCallback(responseFuture);        } else {            responseFuture.putResponse(cmd);            responseFuture.release();        }    } else {        log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));        log.warn(cmd.toString());    }}

=============================================================================

这里只讲netty的通信设计,后面的章节会根据功能详细的分析实现不同功能的技术设计和技术实现

转载于:https://my.oschina.net/wangshuaixin/blog/3055717

你可能感兴趣的文章
Activiti数据库
查看>>
我所看过的书
查看>>
静态内部类与非静态内部类
查看>>
Cisco3750交换机堆叠配置要点
查看>>
Debian以及Ubuntu源设置
查看>>
dedecms 当session开启了memcache存储后报错解决办法
查看>>
Centos 下搭建SVN + Apache 服务器
查看>>
Linux下C/C++编译器GCC/G++使用简介
查看>>
TI DSP型号简述
查看>>
XIP
查看>>
struts2 jsonp 解决跨域问题
查看>>
oracle11g的数据库导入到oracle10g中
查看>>
自动亮度配置
查看>>
打代码必备攻略-前端
查看>>
几款在线图文编辑单元
查看>>
event.keycode值大全
查看>>
PHP异步调用实现方式
查看>>
理解 Java 的 GC 与 幽灵引用
查看>>
powershell 获取wmi对象
查看>>
apache2 以及https证书配置
查看>>