Tomcat源码解读——Connector之一个请求的完整历程

​ 我们在Tomcat 源码解读——启动篇中讲解过Connector的初始化过程,但并没有深入的去研究Connector.strat方法,以及启动后是怎么接收并处理用户请求的,本文将重点讲述这一过程。

配置与协议

​ 再回顾一下server.xml关于Connector的配置:

1
2
3
4
<Connector port="8080" protocol="HTTP/1.1"
connectionTimeout="20000"
redirectPort="8443" />
<Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />

​ 它位于<service>节点下,用来在具体的端口监听用户的请求,<Connector>节点可以有多个,每个都可以支援各种不同的协议,比如本例中一个用来监听8080端口,处理普通的用户请求,一个基于AJP协议在8009端口监听,用于与其它网络节点集成(比如Nginx)。

​ 通常我们在做网络编程的时候,都逃不开如下3个步骤:

  1. 监听端口,创建服务端与客户端的链接;
  2. 获取到客户端请求的socket数据,并对Socket数据进行解析和包装成Http请求数据格式;
  3. 将包装后的数据交给Container处理。

​ 那么Connector是如何完成如上3个步骤的?先给出结论,查看源码可以注意到它有两个非常重要的属性:protocolHandleradapter。其中protocolHandler完成了上面步骤的(1)和(2),而(3)则是由adapter来完成的。

​ 协议的设置在conf/server.xml中配置,由setProtocol来赋值,Tomcat提供了6种协议:

  • 三种不带Ajp的协议,客户端与Tomcat服务器直接连接。

    1. Http11Protocol—————默认协议,阻塞IO方式的支持http1.1协议;
    2. Http11NioProtocol———-非阻塞IO方式的支持http1.1协议 ;
    3. Http11AprProtocol———-使用ARP技术的支持http1.1协议(ARP:Apache portable runtime) ;
  • 三种带Ajp的协议为定向包协议,即WEB服务器通过 TCP连接和SERVLET容器连接,例如tomcat和Apache、Nginx等前端服务器连接

    1. AjpProtocol——————–阻塞IO方式的支持Ajp协议;

    2. AjpNioProtocol—————非阻塞IO方式的支持Ajp协议;

    3. AjpAprProtocol—————使用ARP技术的支持Ajp协议;

​ 为了便于分析,之对Http11Protocol进行分析,其他的大同小异。

启动方法

​ 我们看看 Connector 的启动方法:

1
2


​ 跟进去协议处理器的启动方法,这个启动方法在AbstractProtocol中,是一个模板方法,所有的协议启动过程都是一样的,都是启动对应的 Endpoint

1
2
3
4
5
6
7
8
9
@Override
public void start() throws Exception {

// 省略非关注点代码...

endpoint.start();

// 省略非关注点代码...
}

​ 这个Endpoint就是各个协议下,接收请求的入口,暴露给客户端的终点。我们以Http11NioProtocol为例,来看看接下来Tomcat是怎么处理的。

Http11NioProtocol的构造方法如下:

1
2
3
public Http11NioProtocol() {
super(new NioEndpoint());
}

​ 可以看到这里指定一个自己专用的NioEndpoint,当然还有其它的一些构建过程,都交给父类去完成了,无非就是设置一下属性变量等,感兴趣的可以跟进去看看,这是不做讲解。

​ 回到start()方法,我们看看NioEndpoint.startInternal方法做了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public void startInternal() throws Exception {

if (!running) {

// 省略非关注点代码...

// 创建线程池
if ( getExecutor() == null ) {
createExecutor();
}

// 初始化监听阀值
initializeConnectionLatch();

// 启动轮询线程,这是一个后台线程,根据状态将 socket 指派到合适的处理线程
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}

// 启动监听线程
startAcceptorThreads();
}
}

​ 最后一行startAcceptorThreads()启动了N个线程来处理请求,我们看看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new ArrayList<>(count);

for (int i = 0; i < count; i++) {
Acceptor<U> acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor-" + i;
acceptor.setThreadName(threadName);
acceptors.add(acceptor);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}

​ 具体的线程执行过程是在Acceptor中,线程的核心代码在它的run方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public void run() {

int errorDelay = 0;

// Loop until we receive a shutdown command
while (endpoint.isRunning()) {
// endpoint阻塞
while (endpoint.isPaused() && endpoint.isRunning()) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}

if (!endpoint.isRunning()) {
break;
}
state = AcceptorState.RUNNING;
try {
//连接数到达最大值时,await等待释放connection,在Endpoint的startInterval方法中设置了最大连接数
endpoint.countUpOrAwaitConnection();

// Endpoint might have been paused while waiting for latch
// If that is the case, don't accept new connections
if (endpoint.isPaused()) {
continue;
}
//U是一个socketChannel
U socket = null;
try {
// 接收socket请求
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
endpoint.countDownConnection();
if (endpoint.isRunning()) {
errorDelay = handleExceptionWithDelay(errorDelay);
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;

// Configure the socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
// endpoint的setSocketOptions方法对socket进行配置
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
...
}
}
state = AcceptorState.ENDED;
}