高并发之请求合并
            
         
        
        
        
        
            本文于
                1837 
            天之前发表,文中内容可能已经过时。
        
        
     
    
    
        如果我们没有在高并发场景下,我们获取单个用户信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class UserServiceImpl implements UserService {     @Override     public User getById(Integer id) {         try {             //这段代码代码该方法的正常耗时             Thread.sleep(10l);             User user = new User();             user.setId(id);             return user;         }catch (Exception e){             return null;         }     } } 
但在面向大批量to c的用户场景下,用户的请求会出现毛刺的现象。比如某段时间逛的人特别多,获取用户信息,或者商品信息的请求某段时间突然变大,导致单台服务器支持不住,如果做流控的话,体验不是很好。
在这种情况下,两种办法:
添加服务器简单有效,但是成本上去了,请求合并能够解决单台服务器的吞吐量的问题,那么上面的代码需要变成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public class MergeUserServiceImpl implements UserService {     static class Request {         Integer id;         CompletableFuture future;         public Request(Integer id, CompletableFuture future) {             this.id = id;             this.future = future;         }     }     private LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<Request>(1000);     public MergeUserServiceImpl(){         init();     }     private void init() {         ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);         scheduledExecutorService.scheduleAtFixedRate(new Runnable() {             @Override             public void run() {                 try {                     //1.从阻塞队列中取出queue的请求,生成一次批量查询。                     int size = linkedBlockingQueue.size();                     if (size == 0) {                         return;                     }                     List<Request> requests = new ArrayList<>(size);                     for (int i = 0; i < size; i++) {                         // 移出队列,并返回。                         Request poll = linkedBlockingQueue.poll();                         requests.add(poll);                     }                     //2.组装一个批量查询请求参数。                     List<Integer> ids = new ArrayList<>();                     for (Request request : requests) {                         ids.add(request.id);                     }                     //3. http 请求,或者 dubbo 请求。批量请求,得到结果list。                     System.out.println("本次合并请求数量:"+ids.size());                     //请求                     Map<Integer, User> responses = new HashMap<>();                     for(Integer id:ids){                         User user=new User();                         user.setId(id);                         responses.put(id,user);                     }                     Thread.sleep(100l);                     //4.将结果响应给每一个单独的用户请求。                     for (Request request : requests) {                         //根据请求中携带的能表示唯一参数,去批量查询的结果中找响应。                         User user= responses.get(request.id);                         //将结果返回到对应的请求线程。2个线程通信,异步编程赋值。                         //complete(),源码注释翻译:如果尚未完成,则将由方法和相关方法返回的值设置为给定值                         request.future.complete(user);                     }                 } catch (Exception e) {                     e.printStackTrace();                 }             }             // 立即执行任务,并间隔10 毫秒重复执行。         }, 0, 10, TimeUnit.MILLISECONDS);     }     public User getById(Integer id) {         CompletableFuture<User> future = new CompletableFuture();         linkedBlockingQueue.offer(new Request(id, future));         try {             return future.get();         }catch (Exception e){             e.printStackTrace();         }         return null;     } } 
这么做的好处,可以合并io的操作,可以使用redis的pipline
spring cloud 下Hystrix请求合并 demo1 demo2 
在spring 环境下的请求合并,使用了Spring的注解,逻辑思路还是如上面所示。