java-netty-IO非阻塞压测客户端

2017-01-01 10:46
  • 非阻塞 IO
  • 其实无论是用 Java NIO 还是用 Netty,达到百万连接都没有任何难度。因为它们都是非阻塞的 IO,不需要为每个连接创建一个线程了。
  • nio 和 io的区别
  • nio的核心:Channel 通道,Buffer 缓冲区,Selector 选择器如果是百万级测试,怎么去找那么多机器?单机最多可以有 6W 的连接,百万连接起码需要17台机器!

    如何才能突破这个限制呢?

  • 其实这个限制来自于网卡。 可以通过使用虚拟机,并且把虚拟机的虚拟网卡配置成了桥接模式解决了问题。
    根据物理机内存大小,单个物理机起码可以跑4-5个虚拟机,所以最终百万连接只要4台物理机就够了。

    端口受限设置

    windows客户端

  • 打开注册表:regedit HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\ Services\TCPIP\Parameters
    新建 DWORD值,name:TcpTimedWaitDelay,value:0(十进制) –> 设置为0
    新建 DWORD值,name:MaxUserPort,value:65534(十进制) –> 设置最大连接数65534 重启系统

    liunx客户端

cat /proc/sys/net/ipv4/ip_local_port_range  
值为32768 610
  • 大概也就是共61000-32768=28232个端口可以使用,单个IP对外只能发送28232个TCP请求。
  • 以管理员身份,把端口的范围区间增到最大:
echo "1024 65535"> /proc/sys/net/ipv4/ip_local_port_range 

现在有64511个端口可用.

  • 以上做法只是临时,系统下次重启,会还原。 更为稳妥的做法是修改/etc/sysctl.conf文件,增加一行内容
 net.ipv4.ip_local_port_range= 1024 65535
sysctl -p
  • 现在可以使用的端口达到64510个(假设系统所有运行的服务器是没有占用大于1024的端口的,较为纯净的centos系统可以做到),要想达到50万请求,还得再想办法。
    增加IP地址

  • 一般假设本机网卡名称为 eth0,那么手动再添加几个虚拟的IP:
    ifconfig eth0:1 192.168.190.151
    ifconfig eth0:2 192.168.190.152 ......
    或者偷懒一些:
    for i in seq 1 9; do ifconfig eth0:$i 192.168.190.15$i up ; done
    这些虚拟的IP地址,一旦重启,或者 service network restart 就会丢失。
    为了模拟较为真实环境,在测试端,手动再次添加9个vmware虚拟机网卡

    代码

入口

package com.nio.test;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpVersion;
import java.net.URI;
import java.nio.charset.StandardCharsets;
public class HttpClient {
    public void connect(String host, int port) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
                    ch.pipeline().addLast(new HttpResponseDecoder());
                    // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
                    ch.pipeline().addLast(new HttpRequestEncoder());
                    ch.pipeline().addLast(new HttpClientInboundHandler()); 
                }
            });
                 ChannelFuture f = b.connect(host, port).sync();

               URI uri = new URI("http://gc.ditu.aliyun.com:80/geocoding?a=深圳市");
                 String msg = "Are you ok?";
                 DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
                         uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));                             
                 // 构建http请求
                 request.headers().set(HttpHeaders.Names.HOST, host);
                 request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
                 request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes());
                 // 发送http请求
                 f.channel().write(request);
                 f.channel().flush();
                 f.channel().closeFuture().sync();                
//            }

        } finally {
            workerGroup.shutdownGracefully();
        }
    }
    public void connect_post(String host, int port) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
                    ch.pipeline().addLast(new HttpResponseDecoder());
                    // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
                    ch.pipeline().addLast(new HttpRequestEncoder());
                    ch.pipeline().addLast(new HttpClientInboundHandler()); 
                }
            });

                 ChannelFuture f = b.connect(host, port).sync();

               URI uri = new URI("http://gc.ditu.aliyun.com:80/geocoding?a=深圳市");
               FullHttpRequest request = new DefaultFullHttpRequest(
                       HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath());                        
                 // 构建http请求
               request.headers().set(HttpHeaders.Names.HOST, host);
               request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); // or HttpHeaders.Values.CLOSE
               request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
               request.headers().add(HttpHeaders.Names.CONTENT_TYPE, "application/json");
               ByteBuf bbuf = Unpooled.copiedBuffer("{\"jsonrpc\":\"2.0\",\"method\":\"calc.add\",\"params\":[1,2],\"id\":1}", StandardCharsets.UTF_8);
               request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bbuf.readableBytes());
                 // 发送http请求
                 f.channel().write(request);
                 f.channel().flush();
                 f.channel().closeFuture().sync();

//            }

        } finally {
            workerGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        HttpClient client = new HttpClient();
            //请自行修改成服务端的IP
        for(int k=0; k<1;k++){
          client.connect("http://gc.ditu.aliyun.com/", 80);

            System.out.println(k);
            }
    }
}

监听代码

package com.nio.test;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
public class HttpClientInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.print("success!~!~");
        if (msg instanceof HttpResponse) 
        {
            HttpResponse response = (HttpResponse) msg;
//            System.out.println("CONTENT_TYPE:" + response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
            System.out.println("HTTP_CODE:" + response.getStatus());

        }
        if(msg instanceof HttpContent)
        {
            HttpContent content = (HttpContent)msg;
            ByteBuf buf = content.content();

            System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
            buf.release();
        }
        ctx.close();
    }
}

更多参考