`
yayagepei
  • 浏览: 6910 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

tomcat源码分析系列之请求处理---关门打狗

 
阅读更多

    上回我们把请求放进来了,这回我们关上门,好好修理修理它,不折腾它一番,休想轻易出去。

要关门打狗,先得知道房子在哪才行啊,上回我们说到proces的处理被委托到Http11Processor类,它就是这套房子!Http11Processor的process方法,就是这间屋子:

/**
     * Process pipelined HTTP requests on the specified socket.
     *
     * @param socketWrapper Socket from which the HTTP requests will be read
     *               and the HTTP responses will be written.
     *  
     * @throws IOException error during an I/O operation
     */
    @Override
    public SocketState process(SocketWrapper<Socket> socketWrapper)
        throws IOException {
        RequestInfo rp = request.getRequestProcessor();
        //第一阶段
        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

        // Setting up the I/O
        this.socket = socketWrapper;
        inputBuffer.setInputStream(socket.getSocket().getInputStream());
        outputBuffer.setOutputStream(socket.getSocket().getOutputStream());

        // Error flag
        error = false;
        keepAlive = true;

        if (maxKeepAliveRequests > 0) {
            socketWrapper.decrementKeepAlive();
        }
        
        int soTimeout = endpoint.getSoTimeout();

        int threadRatio = -1;   
        // These may return zero or negative values     
        // Only calculate a thread ratio when both are >0 to ensure we get a    
        // sensible result      
        if (endpoint.getCurrentThreadsBusy() >0 &&      
                endpoint.getMaxThreads() >0) {      
            threadRatio = (endpoint.getCurrentThreadsBusy() * 100)      
                    / endpoint.getMaxThreads();     
        }   
        // Disable keep-alive if we are running low on threads      
        if (threadRatio > getDisableKeepAlivePercentage()) {     
            socketWrapper.setKeepAliveLeft(0);
        }

        try {
            socket.getSocket().setSoTimeout(soTimeout);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            log.debug(sm.getString("http11processor.socket.timeout"), t);
            error = true;
        }

        boolean keptAlive = socketWrapper.isKeptAlive();

        while (!error && keepAlive && !endpoint.isPaused()) {

            // Parsing the request header
            try {
                int standardTimeout = 0;
                if (keptAlive) {
                    if (keepAliveTimeout > 0) {
                        standardTimeout = keepAliveTimeout;
                    } else if (soTimeout > 0) {
                        standardTimeout = soTimeout;
                    }
                }
                /*
                 * When there is no data in the buffer and this is not the first
                 * request on this connection and timeouts are being used the
                 * first read for this request may need a different timeout to
                 * take account of time spent waiting for a processing thread.
                 * 
                 * This is a little hacky but better than exposing the socket
                 * and the timeout info to the InputBuffer
                 */
                if (inputBuffer.lastValid == 0 &&
                        socketWrapper.getLastAccess() > -1 &&
                        standardTimeout > 0) {

                    long queueTime = System.currentTimeMillis() -
                            socketWrapper.getLastAccess();
                    int firstReadTimeout;
                    if (queueTime >= standardTimeout) {
                        // Queued for longer than timeout but there might be
                        // data so use shortest possible timeout
                        firstReadTimeout = 1;
                    } else {
                        // Cast is safe since queueTime must be less than
                        // standardTimeout which is an int
                        firstReadTimeout = standardTimeout - (int) queueTime;
                    }
                    socket.getSocket().setSoTimeout(firstReadTimeout);
                    if (!inputBuffer.fill()) {
                        throw new EOFException(sm.getString("iib.eof.error"));
                    }
                }
                if (standardTimeout > 0) {
                    socket.getSocket().setSoTimeout(standardTimeout);
                }
                /**前面一堆道具,这里正式开始 这是用来读取Http头的,只是用来读取头的哦,
                  * 它负责将二进制流解析到对应的头部 后面我们会放出次方法的实现
                  **/
                inputBuffer.parseRequestLine(false);
                if (endpoint.isPaused()) {
                    // 503 - Service unavailable
                    response.setStatus(503);
                    adapter.log(request, response, 0);
                    error = true;
                } else {
                    request.setStartTime(System.currentTimeMillis());
                    keptAlive = true;
                    if (disableUploadTimeout) {
                        socket.getSocket().setSoTimeout(soTimeout);
                    } else {
                        socket.getSocket().setSoTimeout(connectionUploadTimeout);
                    }
                   /**走到这真坎坷,赶紧把到手的狗处理了吧--先处理头、脚
                    * 先把头部转化了
                    **/
                   inputBuffer.parseHeaders();
                }
            } catch (IOException e) {
                error = true;
                break;
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("http11processor.header.parse"), t);
                }
                // 400 - Bad Request
                response.setStatus(400);
                adapter.log(request, response, 0);
                error = true;
            }

            if (!error) {
                // Setting up filters, and parse some request headers
                //第二阶段
                rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
                try {
                    /**拔毛----预处理请求,解析一些协议上的东西,稍后放出方法**/
                    prepareRequest();
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("http11processor.request.prepare"), t);
                    }
                    // 400 - Internal Server Error
                    response.setStatus(400);
                    adapter.log(request, response, 0);
                    error = true;
                }
            }

            if (socketWrapper.getKeepAliveLeft() == 0) {
                keepAlive = false;
            }

            // Process the request in the adapter
            if (!error) {
                try {
                     //第三阶段
                     rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                    //核心处理:去除内脏,烹饪……  处理请求体 这里又涉及到很多知识了,在下节详细介绍这个过程
                    adapter.service(request, response);
                    // Handle when the response was committed before a serious
                    // error occurred.  Throwing a ServletException should both
                    // set the status to 500 and set the errorException.
                    // If we fail here, then the response is likely already
                    // committed, so we can't try and set headers.
                    if(keepAlive && !error) { // Avoid checking twice.
                        error = response.getErrorException() != null ||
                                (!isAsync() &&
                                statusDropsConnection(response.getStatus()));
                    }

                } catch (InterruptedIOException e) {
                    error = true;
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("http11processor.request.process"), t);
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    adapter.log(request, response, 0);
                    error = true;
                }
            }

            // Finish the handling of the request
            try {
                   //第四阶段
                   rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
                // If we know we are closing the connection, don't drain input.
                // This way uploading a 100GB file doesn't tie up the thread 
                // if the servlet has rejected it.
                
                if(error && !isAsync())
                    inputBuffer.setSwallowInput(false);
                if (!isAsync())
                    //结束烹饪,结束请求
                    endRequest();
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("http11processor.request.finish"), t);
                // 500 - Internal Server Error
                response.setStatus(500);
                adapter.log(request, response, 0);
                error = true;
            }
            try {
                 //第五阶段
                 rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("http11processor.response.finish"), t);
                error = true;
            }

            // If there was an error, make sure the request is counted as
            // and error, and update the statistics counter
            if (error) {
                response.setStatus(500);
            }
            //收尾阶段:将狗肉盛盘  更新统计数据
            request.updateCounters();
            //第六阶段   收尾阶段:收拾后厨  清楚数据,等待下一个请求,计数器
            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

            // Don't reset the param - we'll see it as ended. Next request
            // will reset it
            // thrA.setParam(null);
            // Next request
            if (!isAsync() || error) {
                inputBuffer.nextRequest();
                outputBuffer.nextRequest();
            }

            // If we don't have a pipe-lined request allow this thread to be
            // used by another connection
            if (isAsync() || error || inputBuffer.lastValid == 0) {
                break;
            }
            
            if (maxKeepAliveRequests > 0) {
                socketWrapper.decrementKeepAlive();
            }
        }
        //第七阶段:收工  上狗肉  
        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
        if (error || endpoint.isPaused()) {
            return SocketState.CLOSED;
        } else if (isAsync()) {
            return SocketState.LONG;
        } else {
            if (!keepAlive) {
                return SocketState.CLOSED;
            } else {
                return SocketState.OPEN;
            }
        } 
    }

这个过程还是挺复杂的(你以为吃顿狗肉这么容易?),分七个阶段:

第一阶段:处理头部,可能抛503和400异常;

第二阶段:预处理请求体,主要解析请求的部分header和解析url,可能抛400异常;

第三阶段:处理请求(最核心的阶段),可能抛500异常;

第四阶段:结束请求,可能抛500异常;

第五阶段:更新统计计数,可能抛500异常;

第六阶段:整理线程,准备接受下一个请求;

第七阶段:结束

走完这七步,香喷喷的狗肉就上桌了!! ,不过先别动筷子,我们还要补上前面提到的几个方法:

处理头部:

/**
     * Read the request line. This function is meant to be used during the 
     * HTTP request header parsing. Do NOT attempt to read the request body 
     * using it.
     *
     * @throws IOException If an exception occurs during the underlying socket
     * read operations, or if the given buffer is not big enough to accommodate
     * the whole line.
     */
    @Override
    public boolean parseRequestLine(boolean useAvailableDataOnly)
    
        throws IOException {

        int start = 0;

        //
        // Skipping blank lines
        //

        byte chr = 0;
        do {

            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }

            chr = buf[pos++];

        } while ((chr == Constants.CR) || (chr == Constants.LF));

        pos--;

        // Mark the current buffer position
        start = pos;

        //
        // Reading the method name
        // Method name is always US-ASCII
        //

        boolean space = false;

        while (!space) {

            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }

            // Spec says no CR or LF in method name
            if (buf[pos] == Constants.CR || buf[pos] == Constants.LF) {
                throw new IllegalArgumentException(
                        sm.getString("iib.invalidmethod"));
            }
            // Spec says single SP but it also says be tolerant of HT
            if (buf[pos] == Constants.SP || buf[pos] == Constants.HT) {
                space = true;
                request.method().setBytes(buf, start, pos - start);
            }

            pos++;

        }

        
        // Spec says single SP but also says be tolerant of multiple and/or HT
        while (space) {
            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }
            if (buf[pos] == Constants.SP || buf[pos] == Constants.HT) {
                pos++;
            } else {
                space = false;
            }
        }

        // Mark the current buffer position
        start = pos;
        int end = 0;
        int questionPos = -1;

        //
        // Reading the URI
        //

        boolean eol = false;

        while (!space) {

            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }

            // Spec says single SP but it also says be tolerant of HT
            if (buf[pos] == Constants.SP || buf[pos] == Constants.HT) {
                space = true;
                end = pos;
            } else if ((buf[pos] == Constants.CR) 
                       || (buf[pos] == Constants.LF)) {
                // HTTP/0.9 style request
                eol = true;
                space = true;
                end = pos;
            } else if ((buf[pos] == Constants.QUESTION) 
                       && (questionPos == -1)) {
                questionPos = pos;
            }

            pos++;

        }

        request.unparsedURI().setBytes(buf, start, end - start);
        if (questionPos >= 0) {
            request.queryString().setBytes(buf, questionPos + 1, 
                                           end - questionPos - 1);
            request.requestURI().setBytes(buf, start, questionPos - start);
        } else {
            request.requestURI().setBytes(buf, start, end - start);
        }

        // Spec says single SP but also says be tolerant of multiple and/or HT
        while (space) {
            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }
            if (buf[pos] == Constants.SP || buf[pos] == Constants.HT) {
                pos++;
            } else {
                space = false;
            }
        }

        // Mark the current buffer position
        start = pos;
        end = 0;

        //
        // Reading the protocol
        // Protocol is always US-ASCII
        //

        while (!eol) {

            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }

            if (buf[pos] == Constants.CR) {
                end = pos;
            } else if (buf[pos] == Constants.LF) {
                if (end == 0)
                    end = pos;
                eol = true;
            }

            pos++;

        }

        if ((end - start) > 0) {
            request.protocol().setBytes(buf, start, end - start);
        } else {
            request.protocol().setString("");
        }
        
        return true;

    }

  这个方法没什么好说,接下来看

  /**
     * Parse an HTTP header.
     * 
     * @return false after reading a blank line (which indicates that the
     * HTTP header parsing is done
     */
    @SuppressWarnings("null") // headerValue cannot be null
    public boolean parseHeader()
        throws IOException {

        //
        // Check for blank line
        //

        byte chr = 0;
        while (true) {

            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }

            chr = buf[pos];

            if ((chr == Constants.CR) || (chr == Constants.LF)) {
                if (chr == Constants.LF) {
                    pos++;
                    return false;
                }
            } else {
                break;
            }

            pos++;

        }

        // Mark the current buffer position
        int start = pos;

        //
        // Reading the header name
        // Header name is always US-ASCII
        //

        boolean colon = false;
        MessageBytes headerValue = null;

        while (!colon) {

            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }

            if (buf[pos] == Constants.COLON) {
                colon = true;
                headerValue = headers.addValue(buf, start, pos - start);
            } else if (!HTTP_TOKEN_CHAR[buf[pos]]) {
                // If a non-token header is detected, skip the line and
                // ignore the header
                skipLine(start);
                return true;
            }

            chr = buf[pos];
            if ((chr >= Constants.A) && (chr <= Constants.Z)) {
                buf[pos] = (byte) (chr - Constants.LC_OFFSET);
            }

            pos++;

        }

        // Mark the current buffer position
        start = pos;
        int realPos = pos;

        //
        // Reading the header value (which can be spanned over multiple lines)
        //

        boolean eol = false;
        boolean validLine = true;

        while (validLine) {

            boolean space = true;

            // Skipping spaces
            while (space) {

                // Read new bytes if needed
                if (pos >= lastValid) {
                    if (!fill())
                        throw new EOFException(sm.getString("iib.eof.error"));
                }

                if ((buf[pos] == Constants.SP) || (buf[pos] == Constants.HT)) {
                    pos++;
                } else {
                    space = false;
                }

            }

            int lastSignificantChar = realPos;

            // Reading bytes until the end of the line
            while (!eol) {

                // Read new bytes if needed
                if (pos >= lastValid) {
                    if (!fill())
                        throw new EOFException(sm.getString("iib.eof.error"));
                }

                if (buf[pos] == Constants.CR) {
                    // Skip
                } else if (buf[pos] == Constants.LF) {
                    eol = true;
                } else if (buf[pos] == Constants.SP) {
                    buf[realPos] = buf[pos];
                    realPos++;
                } else {
                    buf[realPos] = buf[pos];
                    realPos++;
                    lastSignificantChar = realPos;
                }

                pos++;

            }

            realPos = lastSignificantChar;

            // Checking the first character of the new line. If the character
            // is a LWS, then it's a multiline header

            // Read new bytes if needed
            if (pos >= lastValid) {
                if (!fill())
                    throw new EOFException(sm.getString("iib.eof.error"));
            }

            chr = buf[pos];
            if ((chr != Constants.SP) && (chr != Constants.HT)) {
                validLine = false;
            } else {
                eol = false;
                // Copying one extra space in the buffer (since there must
                // be at least one space inserted between the lines)
                buf[realPos] = chr;
                realPos++;
            }

        }

        // Set the header value
        headerValue.setBytes(buf, start, realPos - start);

        return true;

    }
 
/**
     * After reading the request headers, we have to setup the request filters.
     */
    protected void prepareRequest() {

        http11 = true;
        http09 = false;
        contentDelimitation = false;
        expectation = false;
        
        prepareRequestInternal();

        if (endpoint.isSSLEnabled()) {
            request.scheme().setString("https");
        }
        MessageBytes protocolMB = request.protocol();
        if (protocolMB.equals(Constants.HTTP_11)) {
            http11 = true;
            protocolMB.setString(Constants.HTTP_11);
        } else if (protocolMB.equals(Constants.HTTP_10)) {
            http11 = false;
            keepAlive = false;
            protocolMB.setString(Constants.HTTP_10);
        } else if (protocolMB.equals("")) {
            // HTTP/0.9
            http09 = true;
            http11 = false;
            keepAlive = false;
        } else {
            // Unsupported protocol
            http11 = false;
            error = true;
            // Send 505; Unsupported HTTP version
            if (getLog().isDebugEnabled()) {
                getLog().debug(sm.getString("http11processor.request.prepare")+
                          " Unsupported HTTP version \""+protocolMB+"\"");
            }
            response.setStatus(505);
            adapter.log(request, response, 0);
        }

        MessageBytes methodMB = request.method();
        if (methodMB.equals(Constants.GET)) {
            methodMB.setString(Constants.GET);
        } else if (methodMB.equals(Constants.POST)) {
            methodMB.setString(Constants.POST);
        }

        MimeHeaders headers = request.getMimeHeaders();

        // Check connection header
        MessageBytes connectionValueMB = headers.getValue("connection");
        if (connectionValueMB != null) {
            ByteChunk connectionValueBC = connectionValueMB.getByteChunk();
            if (findBytes(connectionValueBC, Constants.CLOSE_BYTES) != -1) {
                keepAlive = false;
            } else if (findBytes(connectionValueBC,
                                 Constants.KEEPALIVE_BYTES) != -1) {
                keepAlive = true;
            }
        }

        MessageBytes expectMB = null;
        if (http11)
            expectMB = headers.getValue("expect");
        if ((expectMB != null)
            && (expectMB.indexOfIgnoreCase("100-continue", 0) != -1)) {
            getInputBuffer().setSwallowInput(false);
            expectation = true;
        }

        // Check user-agent header
        if ((restrictedUserAgents != null) && ((http11) || (keepAlive))) {
            MessageBytes userAgentValueMB = headers.getValue("user-agent");
            // Check in the restricted list, and adjust the http11
            // and keepAlive flags accordingly
            if(userAgentValueMB != null) {
                String userAgentValue = userAgentValueMB.toString();
                if (restrictedUserAgents != null &&
                        restrictedUserAgents.matcher(userAgentValue).matches()) {
                    http11 = false;
                    keepAlive = false;
                }
            }
        }

        // Check for a full URI (including protocol://host:port/)
        ByteChunk uriBC = request.requestURI().getByteChunk();
        if (uriBC.startsWithIgnoreCase("http", 0)) {

            int pos = uriBC.indexOf("://", 0, 3, 4);
            int uriBCStart = uriBC.getStart();
            int slashPos = -1;
            if (pos != -1) {
                byte[] uriB = uriBC.getBytes();
                slashPos = uriBC.indexOf('/', pos + 3);
                if (slashPos == -1) {
                    slashPos = uriBC.getLength();
                    // Set URI as "/"
                    request.requestURI().setBytes
                        (uriB, uriBCStart + pos + 1, 1);
                } else {
                    request.requestURI().setBytes
                        (uriB, uriBCStart + slashPos,
                         uriBC.getLength() - slashPos);
                }
                MessageBytes hostMB = headers.setValue("host");
                hostMB.setBytes(uriB, uriBCStart + pos + 3,
                                slashPos - pos - 3);
            }

        }

        // Input filter setup
        InputFilter[] inputFilters = getInputBuffer().getFilters();

        // Parse transfer-encoding header
        MessageBytes transferEncodingValueMB = null;
        if (http11)
            transferEncodingValueMB = headers.getValue("transfer-encoding");
        if (transferEncodingValueMB != null) {
            String transferEncodingValue = transferEncodingValueMB.toString();
            // Parse the comma separated list. "identity" codings are ignored
            int startPos = 0;
            int commaPos = transferEncodingValue.indexOf(',');
            String encodingName = null;
            while (commaPos != -1) {
                encodingName = transferEncodingValue.substring
                    (startPos, commaPos).toLowerCase(Locale.ENGLISH).trim();
                if (!addInputFilter(inputFilters, encodingName)) {
                    // Unsupported transfer encoding
                    error = true;
                    // 501 - Unimplemented
                    response.setStatus(501);
                    adapter.log(request, response, 0);
                }
                startPos = commaPos + 1;
                commaPos = transferEncodingValue.indexOf(',', startPos);
            }
            encodingName = transferEncodingValue.substring(startPos)
                .toLowerCase(Locale.ENGLISH).trim();
            if (!addInputFilter(inputFilters, encodingName)) {
                // Unsupported transfer encoding
                error = true;
                // 501 - Unimplemented
                if (getLog().isDebugEnabled()) {
                    getLog().debug(sm.getString("http11processor.request.prepare")+
                              " Unsupported transfer encoding \""+encodingName+"\"");
                }
                response.setStatus(501);
                adapter.log(request, response, 0);
            }
        }

        // Parse content-length header
        long contentLength = request.getContentLengthLong();
        if (contentLength >= 0 && !contentDelimitation) {
            getInputBuffer().addActiveFilter
                (inputFilters[Constants.IDENTITY_FILTER]);
            contentDelimitation = true;
        }

        MessageBytes valueMB = headers.getValue("host");

        // Check host header
        if (http11 && (valueMB == null)) {
            error = true;
            // 400 - Bad request
            if (getLog().isDebugEnabled()) {
                getLog().debug(sm.getString("http11processor.request.prepare")+
                          " host header missing");
            }
            response.setStatus(400);
            adapter.log(request, response, 0);
        }

        parseHost(valueMB);

        if (!contentDelimitation) {
            // If there's no content length 
            // (broken HTTP/1.0 or HTTP/1.1), assume
            // the client is not broken and didn't send a body
            getInputBuffer().addActiveFilter
                    (inputFilters[Constants.VOID_FILTER]);
            contentDelimitation = true;
        }

        // Advertise sendfile support through a request attribute
        if (endpoint.getUseSendfile()) {
            request.setAttribute("org.apache.tomcat.sendfile.support",
                    Boolean.TRUE);
        }
        
        // Advertise comet support through a request attribute
        if (endpoint.getUseComet()) {
            request.setAttribute("org.apache.tomcat.comet.support",
                    Boolean.TRUE);
        }
        // Advertise comet timeout support
        if (endpoint.getUseCometTimeout()) {
            request.setAttribute("org.apache.tomcat.comet.timeout.support",
                    Boolean.TRUE);
        }
    }
 

这几个方法没什么好说,感兴趣就看一下就行了。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics