作者:唐亚峰 | 出自:唐亚峰博客
有一段时间没写博客了,
Spring Cloud
基本的都已经写完(后续会写一个SpringBootAdmin
的整合),接下来会记录Netty
相关的,早期的JAVA
对NIO
支持是非常糟糕的,直到2002
年发布的JDK1.4
中才第一次支持非阻塞I/O
,这个类库为JDK通讯模型带来了翻天覆地的变化,在开始学习Netty
之前先看看早期的写法是什么样的……
- 网络编程
网络编程
的基本模型就是Client/Server
模型,两个进程相互通讯,其中服务端提供位置信息(ip:port)
,客户端通过连接操作向服务端监听的地址发起连接请求,通过三次握手建立连接,如果成功则可以通过网络套字(Socket
)进行通信…
- 同步阻塞I/O
采用BIO
通信模型的服务端,通常由一个独立的Acceptor
线程负责监听客户端链接,由它来接收到每个请求然后为每个客户端创建一个新的线程进行链路处理,处理完毕后通过输出流应答给客户端,然后线程销毁…
该模型最大的问题就是缺乏弹性伸缩能力,因为它是1 : 1
模型的,当客户端越多服务端线程开销越大,线程数膨胀后,系统性能就急剧下降了,然后堆栈,GC,等等问题就来找你唠嗑了…
- TimeServer
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
/**
* 初窥NIO-TimeServer:同步阻塞方式的I/O创建
* @author Levin
*/
public class TimeServer {
public static void main(String[] args) {
int port = 4040;
System.out.println("start server......" + port);
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
while (true) {
new TimeServerHandler(serverSocket.accept()).run();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- TimeServerHandler
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
* @author Levin
* @create 2017/8/29 0029
*/
public class TimeServerHandler implements Runnable {
private Socket socket;
@Override
public void run() {
BufferedReader reader = null;
PrintWriter writer = null;
try {
reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
writer = new PrintWriter(this.socket.getOutputStream(), true);
while (true) {
String body = reader.readLine();
if (body == null) break;
System.out.println("The time server receive order : " + body);
writer.println("我已经接收到你的请求了....");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (this.socket != null) socket.close();
if (reader != null) reader.close();
if (writer != null) writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
TimeServerHandler(Socket socket) {
this.socket = socket;
}
- 代码分析
TimeServer
监听的是4040
,通过构造函数创建ServerSocket
,如果端口未被占用则表示监听成功,然后通过无限循环来监听客户端的连接,如果没有客户端接入,则主线程阻塞在ServerSocket
的accept
操作上
启动TimeServer
,通过jvisualvm
打印的线程堆栈可以发现它是阻塞在accept
操作上的
该工具在:JAVA_HOME/bin/jvisualvm.exe
,打开后默认监听本机所有JVM程序….
- 伪异步I/O
为了解决同步阻塞I/O
面临的一个请求需要一个线程处理的问题,后来有人对它的线程模型进行了优化,后端维护一个ThreadPool
来处理多个客户端的请求,形成客户端个数M:线程池最大数N的比例关系
,其中M
可以大于N
,通过线程池可以灵活的调配线程资源,设置线程最大值,防止海量并发导致的线程耗尽…
- TimeServerPool
改进了TimeServer
代码,将原本new Thread(socket)
方式改成线程池…
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* 初窥NIO-TimeServer:伪异步方式的I/O创建
*
* @author Levin
* @create 2017/8/28 0028
*/
public class TimeServerPool {
public static void main(String[] args) {
int port = 4041;
System.out.println("start server......" + port);
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
Socket socket = serverSocket.accept();
TimeServerHandlerExecutePool executePool = new TimeServerHandlerExecutePool(50, 100);//创建I/O任务线程池
executePool.execute(new TimeServerHandler(socket));
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- TimeServerHandlerExecutePool
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Levin
* @create 2017/8/29 0029
*/
public class TimeServerHandlerExecutePool {
private ExecutorService service;
/**
* 线程池
*
* @param maxPoolSize 最大线程数
* @param queueSize 队列大小
*/
public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
service = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize));
}
public void execute(Runnable task) {
service.execute(task);
}
- 弊端
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
源码:InputStream
中的read
方法,里面写了This method blocks until input data is available, end of file is detected, or an exception is thrown.
翻译:此方法阻塞直到输入数据可用,检测到文件结束,或引发异常。
- 有数据可读
- 可用数据已读取完毕
- 发生空指针或者I/O异常
意味着当对方发送一个数据较大的包时,读取的I/O
也会进入阻塞状态,再次期间,其它消息则进入到我们创建的队列中去…
伪异步I/O
实际上仅仅只是对之前的同步阻塞I/O
线程模型做了简单优化(换汤不换药),无法从本质上去解决线程阻塞的问题…
- 客户端测试
客户端测试类,根据访问不同端口路由到指定的TimeServer
上…
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
* 初窥NIO - TimeClient
*
* @author Levin
* @create 2017/8/28 0028
*/
public class TimeClient {
public static void main(String[] args) {
int port = 4041;
Socket socket = null;
BufferedReader reader = null;
PrintWriter writer = null;
try {
socket = new Socket("127.0.0.1", port);
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
writer = new PrintWriter(socket.getOutputStream(), true);
writer.println("hello time server");
System.out.println("response " + reader.readLine());
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (socket != null) {
socket.close();
}
if (reader != null) {
reader.close();
}
if (writer != null) {
writer.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 说点什么
全文代码:https://git.oschina.net/battcn/battcn-netty/tree/master/Chapter1-1/battcn-netty-bio