DeferredResult异步处理spring mvc Demo

首页 / 新闻资讯 / 正文

spring mvc同步接口在请求处理过程中一直处于阻塞状态,而异步接口可以启用后台线程去处理耗时任务。简单来说适用场景:
1.高并发;
2.高IO耗时操作。

Spring MVC3.2之后支持异步请求,能够在controller中返回一个Callable或者DeferredResult。
1.Callable实例

@Controller public class CallableController {     @RequestMapping(path = "/async1", method = RequestMethod.GET)     @ResponseBody     public Callable<String> asyncRequest() {         return () -> {             final long currentThread = Thread.currentThread().getId();             final Date requestProcessingStarted = new Date();              Thread.sleep(6000L);              final Date requestProcessingFinished = new Date();              return String.format(                     "request: [threadId: %s, started: %s - finished: %s]"                     , currentThread, requestProcessingStarted, requestProcessingFinished);         };     } } 

2.DeferredResult使用方式与Callable类似,但在返回结果上不一样,它返回的时候实际结果可能没有生成,实际的结果可能会在另外的线程里面设置到DeferredResult中去,能实现更加复杂的业务场景。

@Controller public class DeferredResultController {     private Map<Integer, DeferredResult<String>> deferredResultMap = new HashMap<>();      @ResponseBody     @GetMapping("/get")     public DeferredResult<String> getId(@RequestParam Integer id) throws Exception {         System.out.println("start hello");         DeferredResult<String> deferredResult = new DeferredResult<>();          //先存起来,等待触发         deferredResultMap.put(id, deferredResult);         System.out.println("end hello");         return deferredResult;     }      @ResponseBody     @GetMapping("/set")     public void setId(@RequestParam Integer id) throws Exception {         // 让所有hold住的请求给与响应         if (deferredResultMap.containsKey(id)) {             deferredResultMap.get(id).setResult("hello " + id);         }     } } 

当从浏览器请求http://localhost:8080/get/1时,页面处于等待状态;当访问http://localhost:8080/set/1,前面的页面会返回"hello 1"。

处理过程:

  • controller 返回一个DeferredResult,我们把它保存到内存里或者List里面(供后续访问)
  • Spring MVC调用request.startAsync(),开启异步处理
  • 与此同时将DispatcherServlet里的拦截器、Filter等等都马上退出主线程,但是response仍然保持打开的状态
  • 应用通过另外一个线程(可能是MQ消息、定时任务等)给DeferredResult set值。然后Spring MVC会把这个请求再次派发给servlet容器
  • DispatcherServlet再次被调用,然后处理后续的标准流程

3.模拟场景:接口接收请求,推送到队列receiveQueue,后台线程处理完成后推送到resultQueue,监听器监听resultQueue将结果赋值给DeferredResult,接口响应结果。
首先定义类Task:

public class Task<T> {     private DeferredResult<String> result;     private T message;     private Boolean isTimeout; 

定义MockQueue,用于管理队列及处理数据:

@Component public class MockQueue {     /**      * 接收队列      */     private BlockingQueue<Task<String>> receiveQueue = new LinkedBlockingDeque<>(5000);     /**      * 结果队列      */     private BlockingQueue<Task<String>> resultQueue = new LinkedBlockingDeque<>(5000);      public MockQueue() {         this.run();     }      /**      * 接收task      *      * @param task task实体      * @throws InterruptedException      */     public void put(Task<String> task) throws InterruptedException {         receiveQueue.put(task);     }      /**      * 获取结果      *      * @return      * @throws InterruptedException      */     public Task<String> get() throws InterruptedException {         return resultQueue.take();     }      private void run() {         new Thread(() -> {             while (true) {                 try {                     Task<String> task = receiveQueue.take();                     System.out.println("receive data,start process!");                     Thread.sleep(1000);                     task.setMessage("success");                      //任务超时,跳过                     if (task.getIsTimeout()) {                         continue;                     }                      resultQueue.put(task);                     System.out.println("process done!");                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }         }).start();     } } 

然后实现Controller异步接口:

@Controller public class DeferredResultQueueController {     @Autowired     MockQueue queue;      @ResponseBody     @GetMapping("/test")     public DeferredResult<String> test(@RequestParam Integer id) throws InterruptedException {         System.out.println("start test");         DeferredResult<String> deferredResult = new DeferredResult<>();         Task<String> task = new Task<>(deferredResult, "任务", false);         deferredResult.onTimeout(() -> {             System.out.println("任务超时 id=" + id);             task.setMessage("任务超时");             task.setIsTimeout(true);         });         queue.put(task);         return deferredResult;     } } 

最后定义监听器,将resultQueue的结果写入DeferredResult。

@Component public class QueueResultListener implements ApplicationListener<ContextRefreshedEvent> {     @Autowired     MockQueue mockQueue;      @Override     public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {         new Thread(() -> {             try {                 Task<String> task = mockQueue.get();                 task.getResult().setResult(task.getMessage());                 System.out.println("监听器获取到结果:task=" + task);             } catch (InterruptedException e) {                 e.printStackTrace();             }         }).start();     } } 

https://www.baeldung.com/spring-deferred-result
https://cloud.tencent.com/developer/article/1497796
https://zhuanlan.zhihu.com/p/31223106