[图片] 概念剖析 Master-Worker 是常用的并行计算模式。它的核心思想是系统由两类进程协作工作:Master 进程和 Worker 进程。 Master 负责接收和分配任务,Worker 负责处理子任务。当各个 Worker 子进程处理完成后,会将结果返回给 Master,由 Master 作归纳总结。 其 ..

多线程设计模式 : Master-Worker 模式

概念剖析

Master-Worker 是常用的并行计算模式。它的核心思想是系统由两类进程协作工作:Master 进程和 Worker 进程。
Master 负责接收和分配任务,Worker 负责处理子任务。当各个 Worker 子进程处理完成后,会将结果返回给 Master,由 Master 作归纳总结。
其好处就是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。
处理过程如下图所示:

image.png

Master 进程为主要进程,它维护一个 Worker 进程队列、子任务队列和子结果集。Worker 进程队列中的 Worker 进程不停从任务队列中提取要处理的子任务,并将结果写入结果集。

根据上面的思想,我们来模拟一下这种经典设计模式的实现。

过程分析

  1. 既然 Worker 是具体的执行任务,那么 Worker 一定要实现 Runnable 接口
  2. Matser 作为接受和分配任务,得先有个容器来装载用户发出的请求,在不考虑阻塞的情况下我们选择 ConcurrentLinkedQueue 作为装载容器
  3. Worker 对象需要能从 Master 接收任务,它也得有 Master ConcurrentLinkedQueue 容器的引用
  4. Master 还得有个容器需要能够装载所有的 Worker,可以使用 HashMap<String,Thread>
  5. Worker 处理完后需要将数据返回给 Master,那么 Master 需要有个容器能够装载所有 worker 并发处理任务的结果集。此容器需要能够支持高并发,所以最好采用 ConcurrentHashMap<String,Object>
  6. 同理由于 Worker 处理完成后将数据填充进 Master 的 ConcurrentHashMap,那么它也得有一份 ConcurrentHashMap 的引用

代码实现

Task 任务对象

public class Task {
    private int id;
    private String name;
    private int price;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getPrice() {
        return price;
    }

    public void setPrice(int price) {
        this.price = price;
    }
}

Master 对象:

public class Master {
    //任务集合
    private ConcurrentLinkedQueue<Task> taskQueue = new ConcurrentLinkedQueue<>();

    //所有的处理结果
    private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<>();

    //所有的Worker集合
    private HashMap<String,Thread> workerMap = Maps.newHashMap();

    //构造方法,初始化Worker
    public Master(Worker worker,int workerCount){
        //每一个worker对象都需要有Master的引用,taskQueue用于任务的提取,resultMap用于任务的提交
        worker.setTaskQueue(this.taskQueue);
        worker.setResultMap(this.resultMap);
        for(int i = 0 ;i < workerCount; i++){
            //key表示worker的名字,value表示线程执行对象
            workerMap.put("worker"+i,new Thread(worker));
        }
    }

    //用于提交任务
    public void submit(Task task){
        this.taskQueue.add(task);
    }

    //执行方法,启动应用程序让所有的Worker工作
    public void execute(){
        for(Map.Entry<String,Thread> me : workerMap.entrySet()){
            me.getValue().start();
        }
    }

    //判断所有的线程是否都完成任务
    public boolean isComplete() {
        for(Map.Entry<String,Thread> me : workerMap.entrySet()){
           if(me.getValue().getState() != Thread.State.TERMINATED){
               return false;
           }
        }
        return true;
    }

    //总结归纳 
    public int getResult(){
        int ret = 0;
        for (Map.Entry<String, Object> entry : resultMap.entrySet()) {
            ret+=(Integer) entry.getValue();
        }
        return ret;
    }
}

###Worker 对象:

public class Worker implements Runnable{
    private ConcurrentLinkedQueue<Task> taskQueue;
    private ConcurrentHashMap<String, Object> resultMap;

    public void setTaskQueue(ConcurrentLinkedQueue<Task> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    @Override
    public void run() {
        while(true){
            Task executeTask = this.taskQueue.poll();
            if(executeTask == null) break;
            //真正的任务处理
            Object result = handle(executeTask);
            this.resultMap.put(executeTask.getName(),result);
        }
    }

    //核心处理逻辑,可以抽离出来由具体子类实现
    private Object handle(Task executeTask) {
        Object result = null;
        try {
            //表示处理任务的耗时....
            Thread.sleep(500);
            result = executeTask.getPrice();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
}

客户端调用

public class Main {

    public static void main(String[] args) {
        //实际开发中多少个线程最好写成Runtime.getRuntime().availableProcessors()
        Master master = new Master(new Worker(), 10);
        Random random = new Random();
        for(int i = 0 ;i <= 100 ;i++){
            Task task = new Task();
            task.setId(i);
            task.setName("任务"+i);
            task.setPrice(random.nextInt(1000));
            master.submit(task);
        }
        master.execute();
        long start = System.currentTimeMillis();
        while(true){
            if(master.isComplete()){
                long end  = System.currentTimeMillis() - start;
                int ret = master.getResult();
                System.out.println("计算结果:"+ret+",执行耗时:"+end);
                break;
            }
        }
    }
}

在 Worker 对象中的核心处理业务逻辑 handle()方法最好抽象成公共方法,具体实现由子类覆写。

  • 设计模式

    设计模式(Design pattern)代表了最佳的实践,通常被有经验的面向对象的软件开发人员所采用。设计模式是软件开发人员在软件开发过程中面临的一般问题的解决方案。这些解决方案是众多软件开发人员经过相当长的一段时间的试验和错误总结出来的。

    156 引用 • 115 回帖
  • 代码
    295 引用 • 486 回帖 • 4 关注
  • 架构

    我们平时所说的“架构”主要是指软件架构,这是有关软件整体结构与组件的抽象描述,用于指导软件系统各个方面的设计。另外还有“业务架构”、“网络架构”、“硬件架构”等细分领域。

    89 引用 • 382 回帖 • 1 关注
2 回帖
请输入回帖内容...
  • dyzhs

    勤奋,周末也搞

  • dyzhs

    来点有趣的程序,搬教程不腻吗