tomcat请求原理解析

mac2025-05-20  49

Connector容器主要负责解析socket请求,在tomcat中的源码位于org.apache.catalina.connector和org.apache.coyote包路径下;通过上两节的分析,我们知道了Connector是Service的子容器,而Service又是Server的子容器。在server.xml文件中配置,然后在Catalina类中通过Digester完成实例化。在server.xml中默认配置了两种Connector的实现,分别用来处理Http请求和AJP请求。其实Connector的实现一共有以下三种: 1、Http Connector:解析HTTP请求,又分为BIO Http Connector和NIO Http Connector,即阻塞IO Connector和非阻塞IO Connector。本文主要分析NIO Http Connector的实现过程。 2、AJP:基于AJP协议,用于Tomcat与HTTP服务器通信定制的协议,能提供较高的通信速度和效率。如与Apache服务器集成时,采用这个协议。 3、APR HTTP Connector:用C实现,通过JNI调用的。主要提升对静态资源(如HTML、图片、CSS、JS等)的访问性能。 具体要使用哪种Connector可以在server.xml文件中通过protocol属性配置如下:

<Connector port="8080" protocol="org.apache.coyote.http11.Http11AprProtocol" connectionTimeout="20000" redirectPort="8443" />

然后看一下Connector的构造器

//默认connector为HTTP/1.1 NIO public Connector() { this("org.apache.coyote.http11.Http11NioProtocol"); } //根据protocol实现Connector public Connector(String protocol) { boolean aprConnector = AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseAprConnector(); if ("HTTP/1.1".equals(protocol) || protocol == null) { if (aprConnector) { protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol"; } else { protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol"; } } else if ("AJP/1.3".equals(protocol)) { if (aprConnector) { protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol"; } else { protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol"; } } else { protocolHandlerClassName = protocol; } // 通过反射实例化一个protocolHandle,之后对请求数据的解析都由该protocolHandle完成,例如Http11AprProtocol ProtocolHandler p = null; try { Class<?> clazz = Class.forName(protocolHandlerClassName); p = (ProtocolHandler) clazz.getConstructor().newInstance(); } catch (Exception e) { log.error(sm.getString( "coyoteConnector.protocolHandlerInstantiationFailed"), e); } finally { this.protocolHandler = p; } // Default for Connector depends on this system property setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")); }

通过分析Connector构造器的源码可以知道,每一个Connector对应了一个protocolHandler,一个protocolHandler被设计用来监听服务器某个端口的网络请求,但并不负责处理请求(处理请求由Container组件完成)。下面就以Http11NioProtocol为例分析Http请求的解析过程。 在Connector的startInterval方法中启动了protocolHandler,代码如下:

protected void startInternal() throws LifecycleException { // Validate settings before starting if (getPort() < 0) { throw new LifecycleException(sm.getString( "coyoteConnector.invalidPort", Integer.valueOf(getPort()))); } setState(LifecycleState.STARTING); try { protocolHandler.start(); //启动protocolHandler } catch (Exception e) { throw new LifecycleException( sm.getString("coyoteConnector.protocolHandlerStartFailed"), e); } }

Http11NioProtocol创建一个org.apache.tomcat.util.net.NioEndpoint实例,然后将监听端口并解析请求的工作全被委托给NioEndpoint实现。tomcat在使用Http11NioProtocol解析HTTP请求时一共设计了三种线程,分别为Acceptor,Poller和Worker。 1、Acceptor线程 Acceptor实现了Runnable接口,根据其命名就知道它是一个接收器,负责接收socket,其接收方法是serverSocket.accept()方式,获得SocketChannel对象,然后封装成tomcat自定义的org.apache.tomcat.util.net.NioChannel。虽然是Nio,但在接收socket时仍然使用传统的方法,使用阻塞方式实现。Acceptor以线程池的方式被创建和管理,在NioEndpoint的startInternal()方法中完成Acceptor的启动,源码如下:

public void startInternal() throws Exception { if (!running) { running = true; paused = false; processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); // Create worker collection if (getExecutor() == null) { createExecutor(); } //设置最大连接数,默认值为maxConnections = 10000,通过同步器AQS实现。 initializeConnectionLatch(); //创建、配置并启动线程Pooler pollers = new Poller[getPollerThreadCount()]; for (int i = 0; i < pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" + i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } startAcceptorThreads(); //启动Acceptor线程 } }

继续追踪startAcceptorThreads的源码

protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); //默认值为1 acceptors = new ArrayList<>(count); for (int i = 0; i < count; i++) { Acceptor<U> acceptor = new Acceptor<>(this); String threadName = getName() + "-Acceptor-" + i; acceptor.setThreadName(threadName); acceptors.add(acceptor); Thread t = new Thread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }

Acceptor线程的核心代码在它的run方法中

public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (endpoint.isRunning()) { // endpoint阻塞 while (endpoint.isPaused() && endpoint.isRunning()) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!endpoint.isRunning()) { break; } state = AcceptorState.RUNNING; try { //连接数到达最大值时,await等待释放connection,在Endpoint的startInterval方法中设置了最大连接数 endpoint.countUpOrAwaitConnection(); // Endpoint might have been paused while waiting for latch // If that is the case, don't accept new connections if (endpoint.isPaused()) { continue; } //U是一个socketChannel U socket = null; try { //接收socket请求 socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { // We didn't get a socket endpoint.countDownConnection(); if (endpoint.isRunning()) { errorDelay = handleExceptionWithDelay(errorDelay); throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (endpoint.isRunning() && !endpoint.isPaused()) { // endpoint的setSocketOptions方法对socket进行配置 if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); String msg = sm.getString("endpoint.accept.fail"); // APR specific. // Could push this down but not sure it is worth the trouble. if (t instanceof Error) { Error e = (Error) t; if (e.getError() == 233) { // Not an error on HP-UX so log as a warning // so it can be filtered out on that platform // See bug 50273 log.warn(msg, t); } else { log.error(msg, t); } } else { log.error(msg, t); } } } state = AcceptorState.ENDED; }

Acceptor完成了socket请求的接收,然后交给NioEndpoint 进行配置,继续追踪Endpoint的setSocketOptions方法。

protected boolean setSocketOptions(SocketChannel socket) { try { //设置为非阻塞 socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); NioChannel channel = nioChannels.pop(); if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } getPoller0().register(channel); //调用Poller的register方法,完成channel的注册。 } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("", t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; }

分析setSocketOptions的源码可以知道,该方法的主要功能是利用传入的SocketChannel参数生成SecureNioChannel或者NioChannel,然后注册到Poller线程的selector中,可以进一步了解Java nio的相关知识,对这一块内容有更深的理解。 2、Poolor线程 Pollor同样实现了Runnable接口,是NioEndpoint类的内部类。在Endpoint的startInterval方法中创建、配置并启动了Pollor线程,见代码清单4。Poolor主要职责是不断轮询其selector,检查准备就绪的socket(有数据可读或可写),实现io的多路复用。其构造其中初始化了selector。

public Poller() throws IOException { this.selector = Selector.open(); }

在分析Acceptor的时候,提到了Acceptor接受到一个socket请求后,调用NioEndpoint的setSocketOptions方法(代码清单6),该方法生成了NioChannel后调用Pollor的register方法生成PoolorEvent后加入到Eventqueue,register方法的源码如下:

public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); PollerEvent r = eventCache.pop(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. //生成PoolorEvent并加入到Eventqueue if (r == null) r = new PollerEvent(socket, ka, OP_REGISTER); else r.reset(socket, ka, OP_REGISTER); addEvent(r); }

Pollor的核心代码也在其run方法中

public void run() { // 调用了destroy()方法后终止此循环 while (true) { boolean hasEvents = false; try { if (!close) { hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //非阻塞的 select keyCount = selector.selectNow(); } else { //阻塞selector,直到有准备就绪的socket keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { //该方法遍历了eventqueue中的所有PollorEvent,然后依次调用PollorEvent的run,将socket注册到selector中。 events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("", x); continue; } //either we timed out or we woke up, process events first if (keyCount == 0) hasEvents = (hasEvents | events()); Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // 遍历就绪的socket while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper) sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { //调用processKey方法对有数据读写的socket进行处理,在分析Worker线程时会分析该方法 iterator.remove(); processKey(sk, attachment); } } //process timeouts timeout(keyCount, hasEvents); }//while getStopLatch().countDown(); }

run方法中调用了events方法:

public boolean events() { boolean result = false; PollerEvent pe = null; for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++) { result = true; try { pe.run(); //将pollerEvent中的每个socketChannel注册到selector中 pe.reset(); if (running && !paused) { eventCache.push(pe); //将注册了的pollerEvent加到endPoint.eventCache } } catch (Throwable x) { log.error("", x); } } return result; }

继续跟进PollerEvent的run方法:

public void run() { if (interestOps == OP_REGISTER) { try { //将SocketChannel注册到selector中,注册时间为SelectionKey.OP_READ读事件 socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail"), x); } } else { final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { if (key == null) { socket.socketWrapper.getEndpoint().countDownConnection(); } else { final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment(); if (socketWrapper != null) { //we are registering the key to start with, reset the fairness counter. int ops = key.interestOps() | interestOps; socketWrapper.interestOps(ops); key.interestOps(ops); } else { socket.getPoller().cancelledKey(key); } } } catch (CancelledKeyException ckx) { try { socket.getPoller().cancelledKey(key); } catch (Exception ignore) { } } } }

3、Worker线程 Worker线程即SocketProcessor是用来处理Socket请求的。SocketProcessor也同样是Endpoint的内部类。在Pollor的run方法中(代码清单8)监听到准备就绪的socket时会调用processKey方法进行处理:

protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { try { if (close) { cancelledKey(sk); } else if (sk.isValid() && attachment != null) { //有读写事件就绪时 if (sk.isReadable() || sk.isWritable()) { if (attachment.getSendfileData() != null) { processSendfile(sk, attachment, false); } else { unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // socket可读时,先处理读事件 if (sk.isReadable()) { //调用processSocket方法进一步处理 if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } //写事件 if (!closeSocket && sk.isWritable()) { //调用processSocket方法进一步处理 if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { cancelledKey(sk); } } } } else { //invalid key cancelledKey(sk); } } catch (CancelledKeyException ckx) { cancelledKey(sk); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("", t); } }

继续跟踪processSocket方法:

public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } // 尝试循环利用之前回收的SocketProcessor对象,如果没有可回收利用的则 // 创建新的SocketProcessor对象 SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { 创建SocketProcessor,即Worker线程,基于线程池模式进行创建和管理 sc = createSocketProcessor(socketWrapper, event); } else { // 循环利用回收的SocketProcessor对象 sc.reset(socketWrapper, event); } Executor executor = getExecutor(); if (dispatch && executor != null) { //SocketProcessor实现了Runneble接口,可以直接传入execute方法进行处理 executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; } //NioEndpoint中createSocketProcessor创建一个SocketProcessor。 protected SocketProcessorBase<NioChannel> createSocketProcessor( SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) { return new SocketProcessor(socketWrapper, event); }

总结:Http11NioProtocol是基于Java Nio实现的,创建了Acceptor、Pollor和Worker线程实现多路io的复用。三类线程之间的关系如下图所示: Acceptor和Pollor之间是生产者消费者模式的关系,Acceptor不断向EventQueue中添加PollorEvent,Pollor轮询检查EventQueue中就绪的PollorEvent,然后发送给Work线程进行处理。

最新回复(0)