所有分类
  • 所有分类
  • 未分类

SpringBoot-多线程处理

简介

本文介绍SpringBoot多线程处理的方案。

为什么需要多线程

项目里会遇到这样的场景:

  1. 获取一次主要数据
  2. 遍历第1步的主数据,对每一个数据,都以它为条件获取次要数据
  3. 将前2步查到的数据组合起来,返回给前端

那么问题来了,如果单次调获取数据比较慢,那么获取多个数据时会很费时间。最好的解决方法就是使用多线程处理,本文介绍如何处理。

实际场景

需求

假设需要提供一个接口,获得多个订单的信息,按创建时间由近到远排序。这个订单信息在其他公司那里,他们只提供了查单个订单的接口,没有批量的接口。

数据库表结构

  • 订单的字段:订单id、用户id、总金额、商品id、商品的数量、创建时间 等。
  • 用户的字段:用户id、用户名字、用户的电话、注册时间 等。
  • 商品的字段:商品id、商品名字 等。

业务流程

  1. 调第三方读出订单主要数据的前n行
  2. 用第1步的每个数据的用户id去调第三方获取用户的信息
  3. 将第2步获取到的用户信息整合到订单VO里。

说明

本处为了简单,用这种方法处理:

  1. 仅获取订单的id、用户id、用户名字。
  2. 使用模拟数据,不实际调用接口。用sleep()模拟对数据库的操作。
  3. 所有代码都在controller层处理。(实际业务中肯定要放到service中处理)。

思路

整体方案

  1. 多线程方案:使用线程池。
  2. 等待多线程执行完毕的方案:CountDownLatch。
    见:Java-CountDownLatch的用法(有实例) – 自学精灵
  3. 如何排序:
    1. 方案1:所有线程处理完之后统一排序。(此法简单,本文使用此方法)
    2. 方案2:使用队列,前边的线程将结果放入最终结果集后,唤醒下一个线程将结果放入结果集。(较为复杂)。
      见:Java多线程–使用阻塞队列实现顺序消费–方法/实例 – 自学精灵

细节

  1. 线程池的核心线程数、最大线程数如何设置。
    1. 见:Java线程池–核心参数/大小设置/使用示例 – 自学精灵
  2. 线程安全的List。
    1. 见:Java–ArrayList保证线程安全的方法 – 自学精灵

代码

公共代码

controller

package com.example.order.controller;

import com.example.order.entity.OrderVO;
import com.example.order.entity.User;
import com.example.order.task.OrderTask;
import com.example.order.task.OrderTask2;
import com.example.utils.SynchroniseUtil;
import com.example.utils.ThreadPoolExecutors;
import com.example.utils.ThreadPoolExecutors2;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@RestController
@RequestMapping
public class OrderController {
    private List<OrderVO> orderVOS = new ArrayList<>();
    private List<User> users = new ArrayList<>();

    //初始化时就创建好数据。模拟数据库已经存在的数据
    @PostConstruct
    public void createData() {
        long dataCount = 500;

        // 创建订单数据。模拟已经插入到数据库的订单
        for (long i = 0; i < dataCount; i++) {
            OrderVO orderVO = new OrderVO();
            orderVO.setId(i + 1);
            orderVO.setUserId(i + 1);
            //防止电脑太快,导致都是同一个时间,所以加一个数
            orderVO.setCreateTime(LocalDateTime.now().plusSeconds(i));
            orderVOS.add(orderVO);
        }

        // 创建用户数据。模拟已经插入到数据库的用户
        for (long i = 0; i < dataCount; i++) {
            User user = new User();
            user.setId(i + 1);
            user.setUserName("用户名" + (i + 1));
            users.add(user);
        }
        orderVOS = orderVOS.stream()
            .sorted(Comparator.comparing(OrderVO::getCreateTime).reversed())
            .collect(Collectors.toList());
    }

    @GetMapping("/getOrderDetails")
    public List<OrderVO> getOrderDetails() {
        long startTime = System.currentTimeMillis();

        List<OrderVO> orderVOList;

        //这里是不同的执行方式(单线程/线程池)

        long endTime = System.currentTimeMillis();

        System.out.println("执行时间:" + (endTime - startTime) + " ms");
        return orderVOList;
    }
}

entity

订单 

package com.example.order.entity;

import lombok.Data;

import java.time.LocalDateTime;

@Data
public class Order {
    private Long id;
    private Long userId;
    private LocalDateTime createTime;
}

订单视图(用于返回数据)

package com.example.order.entity;

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = true)
public class OrderVO extends Order{
    private String userName;
}

用户

package com.example.order.entity;

import lombok.Data;

@Data
public class User {
    private Long id;
    private String userName;
}

方案1:单线程

代码

package com.example.order.controller;

import com.example.order.entity.OrderVO;
import com.example.order.entity.User;
import com.example.order.task.OrderTask;
import com.example.order.task.OrderTask2;
import com.example.utils.SynchroniseUtil;
import com.example.utils.ThreadPoolExecutors;
import com.example.utils.ThreadPoolExecutors2;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@RestController
@RequestMapping
public class OrderController {
    private List<OrderVO> orderVOS = new ArrayList<>();
    private List<User> users = new ArrayList<>();

    //初始化时就创建好数据。模拟数据库已经存在的数据
    @PostConstruct
    public void createData() {
        long dataCount = 500;

        // 创建订单数据。模拟已经插入到数据库的订单
        for (long i = 0; i < dataCount; i++) {
            OrderVO orderVO = new OrderVO();
            orderVO.setId(i + 1);
            orderVO.setUserId(i + 1);
            //防止电脑太快,导致都是同一个时间,所以加一个数
            orderVO.setCreateTime(LocalDateTime.now().plusSeconds(i));
            orderVOS.add(orderVO);
        }

        // 创建用户数据。模拟已经插入到数据库的用户
        for (long i = 0; i < dataCount; i++) {
            User user = new User();
            user.setId(i + 1);
            user.setUserName("用户名" + (i + 1));
            users.add(user);
        }
        orderVOS = orderVOS.stream()
                .sorted(Comparator.comparing(OrderVO::getCreateTime).reversed())
                .collect(Collectors.toList());
    }

    @GetMapping("/getOrderDetails")
    public List<OrderVO> getOrderDetails() {
        long startTime = System.currentTimeMillis();

        List<OrderVO> orderVOList;
        orderVOList = singleThread(orderVOS);

        long endTime = System.currentTimeMillis();

        System.out.println("执行时间:" + (endTime - startTime) + " ms");
        return orderVOList;
    }

    private List<OrderVO> singleThread(List<OrderVO> orders) {
        List<OrderVO> result = new ArrayList<>(orders);
        for (OrderVO orderVO : result) {
            //模拟从数据库里查数据
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (User user : users) {
                if (orderVO.getUserId().equals(user.getId())) {
                    orderVO.setUserName(user.getUserName());
                    break;
                }
            }
        }
        return result;
    }
}

测试

请求:http://localhost:8080/getOrderDetails

后端打印

执行时间:7525 ms

前端结果

总结

缺点

  1. 才500个数据,就用了7秒多,实在太慢。

方案2:线程池(每个数据一个任务)

简介

上边已经看到了,单线程特别慢,本处使用线程池来优化:每个数据一个任务。

controller

package com.example.order.controller;

import com.example.order.entity.OrderVO;
import com.example.order.entity.User;
import com.example.order.task.OrderTask;
import com.example.order.task.OrderTask2;
import com.example.utils.SynchronizeUtil;
import com.example.utils.ThreadPoolExecutors;
import com.example.utils.ThreadPoolExecutors2;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@RestController
@RequestMapping
public class OrderController {
    private List<OrderVO> orderVOS = new ArrayList<>();
    private List<User> users = new ArrayList<>();

    //初始化时就创建好数据。模拟数据库已经存在的数据
    @PostConstruct
    public void createData() {
        long dataCount = 500;

        // 创建订单数据。模拟已经插入到数据库的订单
        for (long i = 0; i < dataCount; i++) {
            OrderVO orderVO = new OrderVO();
            orderVO.setId(i + 1);
            orderVO.setUserId(i + 1);
            //防止电脑太快,导致都是同一个时间,所以加一个数
            orderVO.setCreateTime(LocalDateTime.now().plusSeconds(i));
            orderVOS.add(orderVO);
        }

        // 创建用户数据。模拟已经插入到数据库的用户
        for (long i = 0; i < dataCount; i++) {
            User user = new User();
            user.setId(i + 1);
            user.setUserName("用户名" + (i + 1));
            users.add(user);
        }
        orderVOS = orderVOS.stream()
                .sorted(Comparator.comparing(OrderVO::getCreateTime).reversed())
                .collect(Collectors.toList());
    }

    @GetMapping("/getOrderDetails")
    public List<OrderVO> getOrderDetails() throws Exception{
        long startTime = System.currentTimeMillis();

        List<OrderVO> orderVOList = multiThread(orderVOS);

        long endTime = System.currentTimeMillis();

        System.out.println("执行时间:" + (endTime - startTime) + " ms");
        return orderVOList;
    }

    private List<OrderVO> multiThread(List<OrderVO> orders) throws Exception{
        ExecutorService executor = ThreadPoolExecutors.getSingletonExecutor();
        SynchronizeUtil<OrderVO> synchronizeUtil = new SynchronizeUtil<>(orders.size());
        System.out.println("任务个数:" + orders.size());

        for (OrderVO order : orders) {
            OrderTask orderTask = new OrderTask(order, users, synchronizeUtil);
            executor.execute(orderTask);
        }

        List<OrderVO> list = null;
        try {
            list = synchronizeUtil.get(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        if (list != null) {
            list = list.stream()
                    .sorted(Comparator.comparing(OrderVO::getCreateTime).reversed())
                    .collect(Collectors.toList());
        }

        return list;
    }
}

自定义Task

package com.example.order.task;

import com.example.order.entity.OrderVO;
import com.example.order.entity.User;
import com.example.utils.SynchronizeUtil;

import java.util.List;

public class OrderTask implements Runnable {
    private OrderVO orderVO;
    private List<User> users;
    private SynchronizeUtil<OrderVO> synchronizeUtil;

    public OrderTask(OrderVO orderVO, 
                     List<User> users, 
                     SynchronizeUtil<OrderVO> synchronizeUtil) {
        this.orderVO = orderVO;
        this.users = users;
        this.synchronizeUtil = synchronizeUtil;
    }

    @Override
    public void run() {
        //模拟从数据库里查数据
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (User user : users) {
            if (orderVO.getUserId().equals(user.getId())) {
                orderVO.setUserName(user.getUserName());
                break;
            }
        }
        synchronizeUtil.addResult(orderVO);
    }
}

单例模式的线程池

package com.example.utils;

import java.util.concurrent.*;

public class ThreadPoolExecutors {
    private static final int processorNumber = 
            Runtime.getRuntime().availableProcessors();

    private static class ThreadPoolExecutorsHolder {
        private static final ExecutorService EXECUTOR = 
                Executors.newFixedThreadPool(processorNumber);
    }

    private ThreadPoolExecutors() {
    }

    public static ExecutorService getSingletonExecutor() {
        System.out.println("处理器数量:" + processorNumber);
        return ThreadPoolExecutorsHolder.EXECUTOR;
    }

}

封装CoutDownLatch

package com.example.utils;
 
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
 
public class SynchronizeUtil<T>{
    private CountDownLatch countDownLatch;

    private final List<T> result = Collections.synchronizedList(new ArrayList<>());

    public SynchronizeUtil(int count) {
        this.countDownLatch = new CountDownLatch(count);
    }
 
    public List<T> get() throws InterruptedException{
        countDownLatch.await();
        return this.result;
    }
 
    public List<T> get(long timeout, TimeUnit timeUnit) throws Exception{
        if (countDownLatch.await(timeout, timeUnit)) {
            return this.result;
        } else {
            throw new RuntimeException("超时");
        }
    }
 
    public void addResult(T resultMember) {
        result.add(resultMember);
        countDownLatch.countDown();
    }

    public void addResult(List<T> resultMembers) {
        result.addAll(resultMembers);
        countDownLatch.countDown();
    }
}

测试

访问:http://localhost:8080/getOrderDetails

后端结果

处理器数量:25
任务个数:500
执行时间:301 ms

前端结果

总结

优点

  1. 比单线程快很多。

缺点

  1. 固定线程池大小的线程池,队列长度是整型数的最大值,若数据很多,每个数据一个任务,会把内存耗尽。

方案3:线程池(多个数据一个任务)

简介

上边每个数据一个任务是不合适的,本处进行优化:多个数据一个任务。

controller

package com.example.order.controller;

import com.example.order.entity.OrderVO;
import com.example.order.entity.User;
import com.example.order.task.OrderTask;
import com.example.utils.SynchronizeUtil;
import com.example.utils.ThreadPoolExecutors;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@RestController
@RequestMapping
public class OrderController {
    private List<OrderVO> orderVOS = new ArrayList<>();
    private List<User> users = new ArrayList<>();

    //初始化时就创建好数据。模拟数据库已经存在的数据
    @PostConstruct
    public void createData() {
        long dataCount = 500;

        // 创建订单数据。模拟已经插入到数据库的订单
        for (long i = 0; i < dataCount; i++) {
            OrderVO orderVO = new OrderVO();
            orderVO.setId(i + 1);
            orderVO.setUserId(i + 1);
            //防止电脑太快,导致都是同一个时间,所以加一个数
            orderVO.setCreateTime(LocalDateTime.now().plusSeconds(i));
            orderVOS.add(orderVO);
        }

        // 创建用户数据。模拟已经插入到数据库的用户
        for (long i = 0; i < dataCount; i++) {
            User user = new User();
            user.setId(i + 1);
            user.setUserName("用户名" + (i + 1));
            users.add(user);
        }
        orderVOS = orderVOS.stream()
                .sorted(Comparator.comparing(OrderVO::getCreateTime).reversed())
                .collect(Collectors.toList());
    }

    @GetMapping("/getOrderDetails")
    public List<OrderVO> getOrderDetails() throws Exception{
        long startTime = System.currentTimeMillis();

        List<OrderVO> orderVOList = multiThread(orderVOS);

        long endTime = System.currentTimeMillis();

        System.out.println("执行时间:" + (endTime - startTime) + " ms");
        return orderVOList;
    }

    private List<OrderVO> multiThread(List<OrderVO> orders) throws Exception{
        ThreadPoolExecutor executor = ThreadPoolExecutors.getSingletonExecutor();
        int unitLength = orders.size() / ThreadPoolExecutors.getQueueSize() + 1;
        int synchronizeCount = orders.size() / unitLength;
        synchronizeCount = orders.size() % unitLength == 0 
                                ? synchronizeCount : synchronizeCount + 1;
        SynchronizeUtil<OrderVO> synchronizeUtil = new SynchronizeUtil<>(synchroniseCount);
        System.out.println("任务个数:" + synchronizeCount);

        for (int i = 0; i < orders.size(); i += unitLength) {
            int toIndex = Math.min(i + unitLength, orders.size());
            List<OrderVO> orderVOSubList = orders.subList(i, toIndex);
            OrderTask orderTask = new OrderTask(orderVOSubList, users, synchroniseUtil);
            executor.execute(orderTask);
        }

        List<OrderVO> list = null;
        try {
            list = synchronizeUtil.get(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return null;
        }

        if (list != null) {
            list = list.stream()
                    .sorted(Comparator.comparing(OrderVO::getCreateTime).reversed())
                    .collect(Collectors.toList());
        }

        return list;
    }

}

自定义Task

package com.example.order.task;

import com.example.order.entity.OrderVO;
import com.example.order.entity.User;
import com.example.utils.SynchroniseUtil;

import java.util.List;

public class OrderTask implements Runnable {
    private List<OrderVO> orderVOS;
    private List<User> users;
    private SynchroniseUtil<OrderVO> synchroniseUtil;

    public OrderTask(List<OrderVO> orderVOS,
                      List<User> users,
                      SynchronizeUtil<OrderVO> synchronizeUtil) {
        this.orderVOS = orderVOS;
        this.users = users;
        this.synchronizeUtil = synchronizeUtil;
    }

    @Override
    public void run() {
        //模拟从数据库里查数据
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (OrderVO orderVO : orderVOS) {
            for (User user : users) {
                if (orderVO.getUserId().equals(user.getId())) {
                    orderVO.setUserName(user.getUserName());
                    break;
                }
            }
        }

        synchronizeUtil.addResult(orderVOS);
    }
}

单例的线程池(指定队列长度)

package com.example.utils;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutors {
    private static final int processorNumber =
            Runtime.getRuntime().availableProcessors();
    private static final int corePoolSize = processorNumber;
    private static final int maximumPoolSize = processorNumber * 2 + 1;
    private static final int queueSize = 100;

    private static class ThreadPoolExecutorsHolder {
        private static final ThreadPoolExecutor INSTANCE =
                new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
                200,TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<>(queueSize));
    }

    private ThreadPoolExecutors() {
    }

    public static ThreadPoolExecutor getSingletonExecutor() {
        System.out.println("处理器数量:" + processorNumber);

        return ThreadPoolExecutorsHolder.INSTANCE;
    }

    public static int getQueueSize() {
        return queueSize;
    }
}

封装CountDownLatch

package com.example.utils;
 
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
 
public class SynchronizeUtil<T>{
    private CountDownLatch countDownLatch;

    private final List<T> result = Collections.synchronizedList(new ArrayList<>());

    public SynchronizeUtil(int count) {
        this.countDownLatch = new CountDownLatch(count);
    }
 
    public List<T> get() throws InterruptedException{
        countDownLatch.await();
        return this.result;
    }
 
    public List<T> get(long timeout, TimeUnit timeUnit) throws Exception{
        if (countDownLatch.await(timeout, timeUnit)) {
            return this.result;
        } else {
            throw new RuntimeException("超时");
        }
    }
 
    public void addResult(T resultMember) {
        result.add(resultMember);
        countDownLatch.countDown();
    }

    public void addResult(List<T> resultMembers) {
        result.addAll(resultMembers);
        countDownLatch.countDown();
    }
}

测试

访问:http://localhost:8080/getOrderDetails

后端结果

处理器数量:12
任务个数:84
执行时间:117 ms

前端结果

总结

优点

可见,此时速度比每个数据一个任务更快。(原因待分析,猜测:任务越少,在某个调度、唤醒之类的地方耗时就少,于是速度更快)

大数据量测试

数据量单线程线程池(每个数据一个任务)线程池(多个数据一个任务)
1001498 ms72 ms77 ms
5007525 ms312 ms113 ms
100015024 ms605 ms125 ms
500075160 ms3008 ms163 ms

总结

可见,多个数据一个任务速度最快。

多个数据一个任务时,随着数据成倍的增加,耗时却没有成倍增加。原因分析:与线程池一个任务包含的数据量有关系。因为我是固定死了队列的长度,然后把总数据量平均分配到每一个队列上,如果数据量成倍增加,平均到一个任务里边,就增加的很少了。

当然,实际上任务并不是平均分到了队列里边,因为任务进来,先去占用核心线程,再去占用队列,再去占用最大线程数。按我本篇程序里的写法,实际队列并不会占满,而且最大线程数也没有用完。

2

评论10

请先

  1. 老师 想请教一下 把数据平均给到线程命令 这个看不懂 为什么是整取队列长度 int unitLength = orders.size() / ThreadPoolExecutors.getQueueSize() + 1;
    wangxg 2024-09-30 0
    • 这里是假设全部都阻塞的话,每个任务队列处理多少个线程数。保证所有数据都有队列去处理
      自学精灵 2024-10-08 0
  2. 站长,Java多线程系列–CountDownLatch的用法(有实例) – 自学精灵 链接地址错了吧,跳到 Java调试–监视类工具 去了
    罖亽 2024-05-04 0
    • 是的,谢谢指出。已修复
      自学精灵 2024-05-04 0
  3. 感谢作者,大概理解几个方案的区别了 但我个人有个疑问,方案2和方案3在效率方面的差距,除了任务数量的区别外是不是还和线程池的定义有关? 方案3的线程池做过调整的,线程池大小和任务数是相匹配的(毕竟任务数量就是按照线程池队列算出来的),但方案2的线程池好像没具体配置?我感觉如果方案2的线程池max配成500,core和队列合理配置,效率其实不会这么低
    goldencowcow 2024-02-29 0
    • 线程池加大,是能提高效率。不过文中也提到了,一个数据一个线程,很耗资源。
      自学精灵 2024-02-29 0
  4. 分段截取那块算最小值为啥要集合大小减一?
    18700967318 2023-11-23 0
    • 不需要减一,已经修复。
      自学精灵 2023-11-23 0
  5. hao
    派大唾沫星子 2023-11-07 0
显示验证码
没有账号?注册  忘记密码?

社交账号快速登录