引言
本篇文章简单介绍一下 SpringMVC 中的异步请求。由于 SpringMVC 其实就是 Servlet 的封装,那么异步也是如此,底层调用的还是 Servlet 中的异步,只不过 Spring 实现起来更简单一些。
起源 在 Servlet 3.0 之前,Servlet 采用 Thread-Per-Request 的方式处理请求,即每一次 Http 请求都由某一个线程从头到尾负责处理。如果一个请求需要进行 IO 操作,比如访问数据库、调用第三方服务接口等,那么其所对应的线程将同步地等待 IO 操作完成, 而 IO 操作是非常慢的,所以此时的线程并不能及时地释放回线程池以供后续使用,在并发量越来越大的情况下,这将带来严重的性能问题。为了解决这样的问题,Servlet 3.0 引入了异步处理,然后在 Servlet 3.1 中又引入了非阻塞 IO 来进一步增强异步处理的性能。
SpringMVC 的异步
把同步方法变成异步方法很简单,只是将 Controller 的返回值用 DeferredResult 封装就行,比如: 示例:
1 2 3 4 5 6 7 8 9 10 @GetMapping("/async") public DeferredResult<String> say () { DeferredResult<String> result = new DeferredResult<>(); AsyncUtils.async(() -> { result.setResult("Hello World" ); }, result); return result; }
是不是很简单?那么 SpringMVC 是如何实现异步的相关操作呢?(AsyncUtils 中的代码由另外的线程负责处理)有了前几篇文章的铺垫,我想你肯定能够猜到,其实还是由特定的返回值处理器来完成的。
DeferredResultMethodReturnValueHandler
通过它的名字,我们可以看出它其实是一个返回值处理器,也就是 HandlerMethodReturnValueHandler 的子类,我们看一下它的核心代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Override public boolean supportsReturnType (MethodParameter returnType) { Class<?> type = returnType.getParameterType(); return (DeferredResult.class.isAssignableFrom(type) || ListenableFuture.class.isAssignableFrom(type) || CompletionStage.class.isAssignableFrom(type)); } @Override public void handleReturnValue (@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { if (returnValue == null ) { mavContainer.setRequestHandled(true ); return ; } DeferredResult<?> result; if (returnValue instanceof DeferredResult) { result = (DeferredResult<?>) returnValue; } else if (returnValue instanceof ListenableFuture) { result = adaptListenableFuture((ListenableFuture<?>) returnValue); } else if (returnValue instanceof CompletionStage) { result = adaptCompletionStage((CompletionStage<?>) returnValue); } else { throw new IllegalStateException("Unexpected return value type: " + returnValue); } WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer); }
其实这个处理器很简单,我们在最后发现,其实它是把相关的处理交给了 WebAsyncManager 中的 startDeferredResultProcessing() 方法。至于 WebAsyncUtils.getAsyncManager() 方法,前几篇文章中也有所介绍,就是请求进来的时候,会为当前请求创建出对应的异步管理器,然后跟 request 绑定(其实就是放到了 request 的 attributes 表中)而后续的操作都是通过这个异步管理器来判断是否是异步请求以及异步的相关操作。所以我们重点看一下这个异步管理器。
WebAsyncManager 有一点是我们需要注意的,就是每个请求对象都会绑定自己的异步管理器 WebAsyncManager,那么就避免了请求之间的并发问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 private static final Object RESULT_NONE = new Object();private static final AsyncTaskExecutor DEFAULT_TASK_EXECUTOR = new SimpleAsyncTaskExecutor(WebAsyncManager.class.getSimpleName()); private static final CallableProcessingInterceptor timeoutCallableInterceptor = new TimeoutCallableProcessingInterceptor(); private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor = new TimeoutDeferredResultProcessingInterceptor(); private AsyncWebRequest asyncWebRequest;private volatile Object[] concurrentResultContext;public void startDeferredResultProcessing ( final DeferredResult<?> deferredResult, Object... processingContext) throws Exception { Assert.notNull(deferredResult, "DeferredResult must not be null" ); Assert.state(this .asyncWebRequest != null , "AsyncWebRequest must not be null" ); Long timeout = deferredResult.getTimeoutValue(); if (timeout != null ) { this .asyncWebRequest.setTimeout(timeout); } List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>(); interceptors.add(deferredResult.getInterceptor()); interceptors.addAll(this .deferredResultInterceptors.values()); interceptors.add(timeoutDeferredResultInterceptor); final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors); this .asyncWebRequest.addTimeoutHandler(() -> { try { interceptorChain.triggerAfterTimeout(this .asyncWebRequest, deferredResult); } catch (Throwable ex) { setConcurrentResultAndDispatch(ex); } }); this .asyncWebRequest.addErrorHandler(ex -> { if (!this .errorHandlingInProgress) { try { if (!interceptorChain.triggerAfterError(this .asyncWebRequest, deferredResult, ex)) { return ; } deferredResult.setErrorResult(ex); } catch (Throwable interceptorEx) { setConcurrentResultAndDispatch(interceptorEx); } } }); this .asyncWebRequest.addCompletionHandler(() -> interceptorChain.triggerAfterCompletion(this .asyncWebRequest, deferredResult)); interceptorChain.applyBeforeConcurrentHandling(this .asyncWebRequest, deferredResult); startAsyncProcessing(processingContext); try { interceptorChain.applyPreProcess(this .asyncWebRequest, deferredResult); deferredResult.setResultHandler(result -> { result = interceptorChain.applyPostProcess(this .asyncWebRequest, deferredResult, result); setConcurrentResultAndDispatch(result); }); } catch (Throwable ex) { setConcurrentResultAndDispatch(ex); } }
startAsyncProcessing(processingContext) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void startAsyncProcessing (Object[] processingContext) { synchronized (WebAsyncManager.this ) { this .concurrentResult = RESULT_NONE; this .concurrentResultContext = processingContext; this .errorHandlingInProgress = false ; } this .asyncWebRequest.startAsync(); if (logger.isDebugEnabled()) { logger.debug("Started async request" ); } }
StandardServletAsyncWebRequest 从 startAsyncProcessing 方法中我们可以知道实际的异步处理是调用的 asyncWebRequest.startAsync() 方法,而 StandardServletAsyncWebRequest 就是 asyncWebRequest 的子类。(那么这个对象是什么时候创建出来的呢?其实是在 RequestMappingHandlerAdapter 中的 invokeHandlerMethod 方法通过调用 WebAsyncUtils.createAsyncWebRequest(request, response) 方法创建的。) 我们继续看一下它的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Override public void startAsync () { Assert.state(getRequest().isAsyncSupported(), "Async support must be enabled on a servlet and for all filters involved " + "in async request processing. This is done in Java code using the Servlet API " + "or by adding \"<async-supported>true</async-supported>\" to servlet and " + "filter declarations in web.xml." ); Assert.state(!isAsyncComplete(), "Async processing has already completed" ); if (isAsyncStarted()) { return ; } this .asyncContext = getRequest().startAsync(getRequest(), getResponse()); this .asyncContext.addListener(this ); if (this .timeout != null ) { this .asyncContext.setTimeout(this .timeout); } } @Override public void dispatch () { Assert.notNull(this .asyncContext, "Cannot dispatch without an AsyncContext" ); this .asyncContext.dispatch(); }
DeferredResult 如果想开启异步的支持,需要将 Controller 的返回值用 DeferredResult 进行封装,然后被返回值解析器解析。当调用 DeferredResult 的 setResult 方法的时候,响应会发送给客户端,完成整个请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public boolean setResult (T result) { return setResultInternal(result); } private boolean setResultInternal (Object result) { if (isSetOrExpired()) { return false ; } DeferredResultHandler resultHandlerToUse; synchronized (this ) { if (isSetOrExpired()) { return false ; } this .result = result; resultHandlerToUse = this .resultHandler; if (resultHandlerToUse == null ) { return true ; } this .resultHandler = null ; } resultHandlerToUse.handleResult(result); return true ; }
尾声 本文只是简单介绍了一下 SpringMVC 异步的相关知识,本来想在深层讲解一下 Servlet 异步的实现,但是发现内容实在太多了,放在本篇有点不合适,毕竟该系列是讲框架层的知识。所以打算另起一个专题,讲讲 Tomcat 的相关源码。