优秀是一种习惯!!!

Netty基础

2023-11-28

Netty

一、Netty入门

1.1 概述

Netty是一个异步的(这里异步主要指通过多线程完成方法调用和处理结果相分离(因为如果调用方法的线程和接收数据的线程是同一个,那么意味着是同步)指调用时的异步,不是异步IO)、基于事件驱动(即底层多路复用selector)的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

1.2 Hello Word

//客户端向服务器发送HW,服务器接收不返回
/**
 * 服务端
 */
public class HelloServer{
    public static void main(String[] args){
        // 1.服务启动器,负责组装netty组件,启动服务器
        new ServerBootstrap()
            // 2.NIO基础部分有用到BossEventLoop, WorkerEventLoop(selector, thread), group组 EventLoop包含了线程和选择器
            .group(new NioEventLoopGroup())
            // 3.选择服务器的ServerSocketChannel实现
            .channel(NioServerSocketChannel.class)
            // 4. boss负责处理连接 worker(child) 负责处理读写,决定了worker(child)能执行哪些操作(handler)
            .childHandler(
            	// 5.channel代表和客户端进行数据读写的通道 Inializer初始化,负责添加别的handler
            	new ChannelInitializer<NioSocketChannel>(){
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception{
                    // 6.添加具体handler
                    ch.pipeline().addLast(new StringDecoder());	// 将ByteBuf转换为字符串
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ // 自定义handler
                        @Override // 读事件发生后
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
                            // 打印上一步转换好的字符串
                            sout(msg);
                        }
                    });
                }
            })
            .bind(8080);
    }
}

/**
 * 客户端
 */
public class HelloClient{
    public static void main(String[] args) throws InterruptedException	{
        // 1.启动类
        new Bootstrap()
            // 2.添加EventLoop
            .group(new NioEventLoopGroup())
            // 3.选择客户端channel实现
            .channel(NioSocketChannel.class)
            // 4.添加处理器 
            .handler(new ChannelInitializer<NioSocketChannel>(){
                @Override // 初始化处理器在连接建立后被调用
                protected void initChannel(NioSocketChannel ch) throws Exception{
                    ch.pipeline().addLast(new StringEncoder());
                }
            })
            // 5.连接到服务器
            .connect(new InetSocketAddress("localhost", 8080))
            .sync()	 // 同步等待channel关闭
            .channel()
            .writeAndFlush("hello Word");
    }
}

提示:

一开始要树立正确的理解

  • 把channel理解为数据的通道
  • 把msg理解为流动的数据,最开始输入是ByteBuf,但经过pipeline的加工,会变成其它类型对象,最后输出又变成ByteBuf
  • 把handler理解为数据的处理工序
    • 工序有多道,合在一起就是pipeline,pipeline负责发布事件(读、读取完成……)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法)
    • handler分Inbound和Outbound两类 (入站 出站)
  • 把eventLoop理解为处理数据的工人
    • 工人可以管理多个channel的io操作,并且一旦工人负责了某个channel,就要负责到底(绑定)
    • 工人既可以执行io操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个channel的待处理任务,任务分为普通任务、定时任务
    • 工人按照pipeline顺序,一次按照handler的规划(代码)处理数据,可以为每道工序指定不同的工人

1.3 组件

1.3.1 EventLoop

事件循环对象
EventLoop本质是一个单线程执行器(同时维护了一个Selector),里面有run方法处理Channel上源源不断的io事件。它的继承关系比较复杂

  • 一条线是继承自j.u.c.ScheduledExecutorService因此包含了线程池中所有的方法
  • 另一条线是继承自Netty自己的OrderedEventExecutor
    • 提供了boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此EventLoop
    • 提供了parent方法来看看自己属于哪个EventLoopGroup

事件循环组
EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个Channel上的io事件都由此EventLoop来处理(保证了io事件处理时的线程安全)

  • 继承自Netty自己的EventExecutorGroup
    • 实现了Iterable接口提供遍历EventLoop的能力
    • 另有next方法获取集合中下一个EventLoop
@Slf4j
public class TestEventLoop{
    public static void main(String[] args){
        // 1.创建事件循环组
        EventLoopGroup group = new NioEventLoopGroup(); // io事件、普通任务、定时任务
        // EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务、定时任务
        // sout(NettyRuntime.availableProcessors());  // 获取到系统的CPU核心数
      
        // 2.获取下一个事件循环对象
        sout(group.next());
      
        // 3.执行普通任务
        group.next().execute(() -> {
           log.debug("ok") ;
        });
        log.debug("main");
      
        // 4.执行定时任务 做keepAlive做连接的保活
        group.next().scheduleAtFixedRate(() -> {
            log.debug("ok");
        }, 0, 1, TimeUnit.SECONDS);
      
    }
}
EventLoop IO任务
public class EventLoopServer{
    public static void main(String[] args){
        new ServerBootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>(){
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception{
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        @Override                                          //   ByteBuf
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
                            ByteBuf buf = (ByteBuf) msg;
                            log.debug(buf.toString(Charset.defaultCharset()));
                        }
                    });
                }
            })
            .bind(8080);
    }
}
测试多线程时,断点处Suspend:应选Thread即主线程停下来不会影响其他线程的运行 (All执行到断点处所有线程都会停下来)

head、h1、tail都是handler

一旦建立连接,channel就会和一个NioEventLoop绑定,后续所有的请求都会通过同一个EventLoop来处理

EventLoop 分工细化
@Slf4j
public class EventLoopServer{
    public static void main(String[] args){
        // 细分2:创建一个独立的EventLoopGroup,专门去处理耗时较长的操作【一个EventLoop selector管理多个channel,其中一个channel执行到了handler执行耗时较长那么会影响其他channel的操作,某个handler耗时较长,最好不要让它去占用worker的NIO线程,否则会影响NIO的读写操作】
        EventLoopGroup group = new DefaultEventLoopGroup();
        new ServerBootstrap()
            // boss 和 worker 【EventLoop(Selector)比作boss与worker】
            // 细分1:参数1boss只负责ServerSocketChannel上accept事件  参数2worker只负责socketChannel上的读写
            .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>(){
				@Override
                protected void initChannel(NioSocketChannel ch) throws Exception{
                    ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter(){ // 因为第一次参数未指定group,所以依然会采用NioEventLoopGroup去处理
                        @Override										// ByteBuf
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
                            ByteBuf buf = (ByteBuf)msg;
                            log.debug(buf.toString(Charset.defaultCharset()));
                            ctx.fireChannelRead(msg);  // 让消息传递给下一个handler,不使用的话执行到handler1就断了不会执行下一个handler2,这是自定义handler需注意的
                        }
                    }).addLast(group, "handler2", new ChannelInboundHandlerAdapter(){
                        @Override										// ByteBuf
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
                            ByteBuf buf = (ByteBuf) msg;
                            log.debug(buf.toString(Charset.defaultCharset()));
                        }
                    });
                }
            })
            .bind(8080);
    }
}

head 、h1、 h2、tail都是handler往下执行,粉色handler执行NioEvenLoopGroup中Selector负责,绿色为耗时较长的另外新创建独立的handler,该channel注册到DefaultEventLoopGroup,该组下有两个selector负责监管

源码分析

解释多个handler之间如果使用不同的EventLoop(不同的Selector),那么是怎么进行线程的切换的

1.3.2 Channel

channel的主要作用

  • close() 可以用来关闭 channel 【close() 方法是异步的】
  • closeFuture()用来处理channel的关闭
    • sync方法作用是同步等待channel关闭 【阻塞住当前线程,直到Nio线程连接建立完毕】
    • 而addListener方法是异步等待channel关闭
  • pipeline() 方法添加处理器
  • write() 方法将数据写入
  • writeAndFlush()方法将数据写入并刷出
Netty为什么用异步

异步多线程对响应时间并没有提升,反而降低了,异步提升的是吞吐量,单位时间内处理请求的个数

因此需要合理的拆分任务,即精细化任务

1.3.3 Future & Promise

在异步处理时,经常用到这两个接口

Netty中的Future与JDK中的Future同名,但是是两个接口,它们之间有关系:Netty中的Future继承自JDK中的Future,而Promise继承自Netty中的Future接口

  • JDK Future只能同步等待任务结束(或成功、或失败)才能得到结果
  • Netty Future可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • Netty Promise不仅有Netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称JDK FutureNetty FuturePromise
cancel取消任务
isCanceled任务是否取消
isDone任务是否完成,不能区分成功失败
get获取任务结果,同步阻塞等待
getNow 获取任务结果,非阻塞
await 等待任务结束,如果任务失败,不会抛异常,而是通过isSuccess判断
sync 等待任务结束,如果任务失败,抛出异常(不返回结果)
isSuccess 判断任务是否成功
cause 获取失败信息,非阻塞,如果没有失败,返回null
addListener 添加回调,异步接收结果
setSuccess 设置成功结果
setFailure 设置失败结果
JDK Future
// 可以理解为Future就是线程之间传递结果数据的容器
@Slf4j
public class TestJDKFuture{
    public static void main(String[] args) throws Exception{
        // jdk中Future一般关联线程池一起使用
        // 1. 创建线程池
        ExecutorService service = Executors.newFixedThreadPool(2);
        // 2. 提交任务 返回的是JDK中Future对象
        Future<Integer> future = service.submit(new Callable<Integer>(){
            @Override
            public Integer call() throws Exception{
                log.debug("线程池中的线程执行计算");
                Thread.sleep(1000); // 任务执行时间
                return 50; //返回结果
            }
        });
        // 执行任务的是线程池中的线程,获取结果的是外面的主线程,那么主线程如何与线程池中的该线程通信获得其结果呢?此时需要用到Future对象(submit返回即是Future对象)
        // 3. 主线程通过future获取结果
        log.debug("主线程等待结果");
        log.debug("主线程获取线程池线程结果是 {}", future.get());	// 同步阻塞等待
    }
}
Netty Future
@Slf4j
public class TestNettyFuture{
    public static void main(String[] args){
        NioEventLoopGroup group = new NioEventLoopGroup();
		// 每个EventLoop中只有一个线程
        EventLoop eventLoop = group.next();
        // 返回Netty中Future对象
        Future<Integer> future = eventLoop.submit(new Callable<Integer>(){
            @Override
            public Integer call() throws Exception{
                log.debug("执行计算");
                Thread.sleep(1000);
                return 70;
            }
        });
        // 3. 主线程通过future获取结果
//        log.debug("主线程等待结果");
//        log.debug("主线程获取线程池线程结果是 {}", future.get());	// 同步方式获取结果
      
        // 异步获取结果:
        future.addListener(new GenericFutureListener<Future<? super Integer>>(){
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception{
                log.debug("接收的结果是:{}", future.getNow());	// 因为执行了回调方法那么一定已经获取到了结果,那么可以用非阻塞的getNow()方法立即获取结果,不需要阻塞
            }
        });
    }
}
Promise(如开发RPC框架时很有用)
// JDK Future与Netty Future有个共同的特点:即Future对象不是由我们自己创建的,而是向线程池中提交任务时返回的Future对象,Future的创建权与结果的设置权都不是我们可以控制的,通过Promise对象可以我们灵活的配置
@Slf4j
public class TestNettyPromise{
    // 1. 准备EventLoop对象
    EventLoop eventLoop = new NioEventLoopGroup().next();
    // 2. 可以主动创建promise 是一个结果容器
    DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
    new Thread(() -> {
        // 3. 任意一个线程执行计算,完毕后向promise中填充结果
        log.debug("开始计算。。。");
        try{
            Thread.sleep(1000);
        }catch(Exception e){
            e.printStackTrace();
        }
        promise.setSuccess(80);
    }).start();
    // 4. 接收结果的线程
    log.debug("等待结果。。。");
    log.debug("结果是:{}", promise.get());
  
    // 异步方式 同Netty Future上面代码
}
ChannelFuture连接问题
public class EventLoopClient{
    public static void main(String[] args) throws InterruptedException{
        // 带有 Future、promise的类型都是和异步方法配套使用,用来处理结果
        ChannelFuture channelFuture = new Bootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<NioSocketChannel>(){
                @Override // 在连接建立后被调用
                protected void initChannel(NioSocketChannel ch) throws Exception{
                    ch.pipeline().addLast(new StringEncoder());
                }
            })
            .connect(new InetSocketAddress("localhost", 8080)); // connect线程是异步非阻塞,真正连接的是另一个线程(NioEventLoop中的一个线程nio线程),main线程发起了调用connect,不用等待结果可以继续向下运行
        /** 解决方法一:
        channelFuture.sync(); // 如果不调用sync方法,那么会无阻塞向下执行,但此时connect还未成功建立。解决方法一:使用sync方法同步处理结果
        Channel channel = channelFuture.channel();
        channel.writeAndFlush("hello word");
        **/
        /** 解决方法二:
         * 使用addListener(回调对象)方法异步处理结果,不是由main线程而是由nio线程执行
         **/
        channelFuture.addListener(new ChannelFutureListener(){
            @Override
            // 在nio线程连接建立好之后,会调用operationComplete
            public void operationComplete(ChannelFuture future) throws Exception{
                Channel channel = future.channel();
                log.debug("{}", channel);
                channel.writeAndFlush("hello word");
            }
        });
    }
}
ChannelFuture关闭问题
// 获取CloseFuture对象,在关闭发生以后,所做的处理,有两种处理方法:1、同步处理关闭2、异步处理关闭
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
    .group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>(){
        @Override // 在连接建立后被调用
        protected void initChannel(NioSocketChannel ch) throws Exception{
            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.sync().channel();
log.debug("{}", channel);
new Thread(() -> {
    Scanner scanner = new Scanner(System.in);
    while(true){
        String line = scanner.nextLine();
        if("q".equals(line)){
            channel.close; // close异步操作
//            log.debug("处理关闭后的操作"); // 不能在这里善后,由于close异步
            break;
        }
        channel.writeAndFlush(line);
    }
}).start();
// 获取ClosedFuture对象, 1)同步处理关闭 2)异步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
/**
 * closeFuture.sync(); // 同步处理
 * log.debug("这里处理关闭之后的操作");
 */

 closeFuture.addListener(new ChannelFutureListener(){
     @Override
     public void operationComplete(ChannelFuture future) throws Exception{
         log.debug("处理关闭之后的操作");  // 异步处理
         group.shutdownGracefully();	// 优雅停止线程
     }
 });

1.3.4 Handler & Pipeline

编写自己业务都是在Handler中编写,使用Netty中自带的Handler简化工作

下面研究Pipeline中Handler的执行流程

Pipeline好比流水线,Handler好比流水线上一道道的工序,中间流动的就是需要处理的数据

ChannelHandler用来处理Channel上的各种事件,分为两种:入站Handler(做数据的读取操作)、出站Handler(做数据的写出操作),所有ChannelHandler被连成一串,就是Pipeline

  • 入站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是ChannelOutboundHandlerAdapter的子类,主要对协会结果进行加工

Pipeline中如果有多个Handler是如何执行的?流程是怎样的?:

@Slf4j
public class TestPipeline{
    public static void main(String[] args){
        new ServerBootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>(){
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception{
                    // 1、通过channel拿到pipeline
                    ChannelPipeline pipeline = ch.pipeline;
                    // 2、添加处理器  addLast()字面意思是把handler添加到流水线的最后一个位置,但是Netty会自动多加两个handler:head 和 tail,所加的handler是tail handler之前
                    // 因此,下面的pipeline结构是:head  ->  handler1  ->  handler2  ->  handler3  ->  handler4  ->  handler5  ->  handler6  ->  tail;底层是一个双向链表  入站处理器 最终结果log打印结果为: 1 2 3
                    // 出站处理器 只有当往channel中写入数据以后才会触发 最终log打印结果为 1 2 3 6 5 4,出站为从尾往前走
                    pipeline.addLast("handler1", new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
                            log.debug("1");
                            ByteBuf buf = (ByteBuf)msg;
                            String name = buf.toString(Charset.defaultCharset());
                            super.channelRead(ctx, name);	// 调用pipeline中的下一个handler,将执行权交给下一个handler,并且会将handler的处理结果传递给下一个handler	channelRead是唤醒pipeline中下一个【入站处理器】handler
                            // 或者使用 ctx.fireChannelRead(student);
                        }
                    });
                    pipeline.addLast("handler2", new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
                            // 这里msg拿到的是上一个handler处理后的name
                            log.debug("2");
                            Student student = new Student(name.toString());
                            super.channelRead(ctx, student);
                        }
                    });
                    pipeline.addLast("handler3", new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
                            log.debug("3");
                            // 分配了一个byteBuffer对象 然后向其中写入字节,该字节是将字符串转换为字节数组
                            // ch.writeAndFlush(ctx.alloc().buffer.writeBytes("server...".getBytes()));	// 这里写入操作为了触发以下的出站处理器
                            ctx.writeAndFlush(ctx.alloc().buffer.writeBytes("server...".getBytes())); // ctx是从当前的处理器向前寻找出站处理器,因此并不会执行handler 4 5 6	而NioSocketChannel是从tail向前找
                            // 可更换handler位置进行验证:head  ->  handler1  ->  handler2  ->  handler4  ->  handler3  ->  handler5  ->  handler6  ->  tail
                        }
                    });
                    pipeline.addLast("handler4", new ChannelOutboundHandlerAdapter(){
                        @Override
                        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
                            log.debug("4");
                            super.write(ctx, msg, promise);
                        }
                    });
                    pipeline.addLast("handler5", new ChannelOutboundHandlerAdapter(){
                        @Override
                        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
                            log.debug("5");
                            super.write(ctx, msg, promise);
                        }
                    });
                    pipeline.addLast("handler6", new ChannelOutboundHandlerAdapter(){
                        @Override
                        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
                            log.debug("6");
                            super.write(ctx, msg, promise);
                        }
                    });
                }
            })
            .bind(8080);
    }
  
    @Data
    @AllArgsConstructor
    static class Student{
        private String name;
    }
}

调试工具

如果写很多handler那么其中执行顺序各种业务比较复杂时,每次测试需要启动一个客户端服务端,所以测试代码会很多。所以使用Netty提供的EmbeddedChannel测试的channel,这样就无需启动客户端或服务端了。

@Slf4j
public class TestEmbeddedChannel{
    public static void main(String[] args){
        ChannelInboundHandlerAdapter h1 = channelRead(ctx, msg) -> {
            log.debug("1");
            super.channelRead(ctx, msg);
        };
        ChannelInboundHandlerAdapter h2 = channelRead(ctx, msg) -> {
            log.debug("2");
            super.channelRead(ctx, msg);
        };
        ChannelOutboundHandlerAdapter h3 = write(ctx, msg, promise) -> {
            log.debug("3");
            super.write(ctx, msg, promise);
        };
        ChannelOutboundHandlerAdapter h4 = write(ctx, msg, promise) -> {
            log.debug("4");
            super.write(ctx, msg, promise);
        };
      
        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        // 模拟入站操作
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
        // 模拟出站操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("word".getBytes()));
    }
}

1.3.5 ByteBuf

ByteBuf是对Nio中ByteBuffer增强 是对字节数据的封装

public class TestByteBuf{
    public static void main(String[] args){
        // ByteBuf中的容量是可以动态扩容的,而Netty中ByteBuffer就不可
        ByteBuf  buf = ByteBufAllocator.DEFAULT.buffer();
        sout(buf);
        StringBuilder sb = new StringBuilder();
        for(int i = 0; i < 300; i++){
            sb.append("a");
        }
        buf.writeBytes(sb.toString().getBytes());
        sout(buf);
    }
}
直接内存 vs 堆内存

直接内存:分配效率低,读写效率高(使用的是系统内存)
堆内存:分配效率高,读写效率低
可以使用下面的代码来创建池化基于堆的ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);

也可以使用下面的代码来创建池化基于直接内存的ByteBuf

ByteBuf buffer = ByteBufferAllocator.DEFAULT.directBuffer(10);
  • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起使用。
  • 直接内存对GC压力小,因为这部分内存不受JVM垃圾回收的管理,但也要注意及时主动释放。
池化 vs 非池化

池化的最大意义在于可以重用ByteBuf,优点有

  • 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加GC压力。
  • 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率。
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能是否开启,可以通过下面的系统环境变量来设置

-Dio.netty.allocator.type={unpooled|pooled}

4.1以后,非Android平台默认启用池化类实现,Android平台启用非池化实现

4.1之前,池化功能还不成熟,默认是非池化实现

ByteBuf组成


注意:

  • 这些方法的未指明返回值的,其返回值都是ByteBuf,意味着可以链式调用
  • 网络编程中一般采用大端方式 即不带LE 默认习惯是Big Endian
扩容

扩容规则是

  • 如果写入后数据大小未超过512,则选择下一个16的正数倍,例如写入后大小为12,则扩容后capacity是16
  • 如果写入后数据大小超过512,则选择下一个2^n,例如写入后大小为513,则扩容后capacity是2^10=1024(2^9=512已经不够了)
  • 扩容不能超过max capacity会报错
读取

buffer.readByte();

读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分

如果需要重复读取,可以在读前先做个标记mark buffer.markReaderIndex()

重置到标记位置reset buffer.resetReaderIndex()

还有办法是采用get开头的一系列方法,这些方法不会改变read index

ByteBuf的内存回收 retain & release

由于Netty中有堆外内存的ByteBuf实现,堆外内存最好是手动来释放,而不是等GC垃圾回收。

  • UnpooledHealByteBuf使用的是JVM内存,只需等GC回收内存即可
  • UnpooledDirectByteBuf使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存

回收内存的源码实现,请关注下面方法的不同实现
protected abstract void deallocate()

Netty这里采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口

  • 每个ByteBuf对象的初始计数为1
  • 调用release方法计数减一,如果计数为0,ByteBuf内存被回收
  • 调用retain方法计数加1,表示调用者没用完之前,其它handler即使调用了release也不会造成回收
  • 当计数为0时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用

谁来负责release呢?一般不可以直接在finally中release,因为pipeline中handler可能会将其中的ByteBuf传递给下一个handler,如果finally release掉,那么下一个就无法使用了。

请思考,因为pipeline的存在,一般需要将ByteBuf传递给下一个ChannelHandler,如果在finally中release了,就时区了传递性

基本规则是,谁是最后的使用者,谁负责release,详细分析如下:

可在head 和 tail部分 release

slice

以前所说的零拷贝:主要是指文件channel向socket channel传输数据的时候,可以不经过java内存,直接从文件到socket网络设备,这样减少了多次内存复制。Netty中的零拷贝主要是减少内存复制,但不是指操作系统层面的。
【零拷贝】的体现之一,对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发生内存复制,还是使用原始的ByteBuf内存,切片后的ByteBuf维护独立的read,write指针。逻辑上的操作,并没有进行内存复制。

一般使用slilce需要搭配retain()方法使计数加1,防止被release的ByteBuf被释放

duplicate

【零拷贝】的体现之一,就好比截取了原始ByteBuf所有内容,并且没有max capacity的限制,也是与原始ByteBuf使用同一块底层内存,只是读写指针是独立的

copy

会将底层内存数据进行深拷贝,因此无论读写,都与原始ByteBuf无关

ByteBuf优势
  • 池化 可以重用池中ByteBuf实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像ByteBuffer一样切换读写模式
  • 可以自动扩容
  • 支持链式调用
  • 很多地方体现零拷贝,例如slice、duplicate、CompositeByteBuf

1.4 双向通信

// 编写server
new ServerBootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>(){
        @Override
        protected void initChannel(SocketChannel ch) throws Exception{
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg){
                    ByteBuf buffer = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
                    sout(buffer.toString(Charset.defaultCharset()));
                  
                    // 思考:需要释放buffer吗?
                }
            });
        }
    })
  
// 怎么回应呢?只需要为channelRead添加逻辑
public void channelRead(ChannelHandlerContext ctx, Object msg){
    // ...
  
    // 建议使用ctx.alloc() 创建 ByteBuf
    ByteBuf response = ctx.alloc().buffer(20);
    response.writeBytes("hello".getBytes());
    ctx.writeAndFlush(response);
  
    // 思考:需要释放response吗
}

// 编写client,这次使用了 channelActive事件,它会在连接建立后触发
new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>(){
        @Override
        protected void initChannel(SocketChannel ch) throws Exception{
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelActive(ChannelHandlerContext ctx) throws Exception{
                    // 建议使用 ctx.alloc() 创建 ByteBuf
                    ByteBuf buffer = ctx.alloc().buffer(10);
                  
                    // 首次建立连接,发送hello信息
                    buffer.writeBytes("hello".getBytes());
                    ctx.writeAndFlush(buffer);
                  
                    // 思考:需要释放buffer 吗?
                }
            });
        }
    })
    .connect("127.0.0.1", 8080);

// 客户端接收,与服务器接收代码类似,在客户端的ChannelInboundHandlerAdapter中加入channelRead事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
    ByteBuf buffer = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
    sout(buffer.toString(Charset.defaultCharset()));
  
    // 思考:需要释放buffer吗?
}

Java Socket是全双工的:在任意时刻,线路上存在A 到 B 和 B 到 A的双向信号传输,即使是阻塞IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读


标题:Netty基础
作者:amethystfob
地址:https://www.newmoon.top/articles/2023/11/28/1701141482042.html

欢迎各路大侠指点留痕: