并发基础(二)

image

前言

跟着 The Java Tutorials 把并发的一些基础过了一遍,发现仍然还是有很多不清楚的地方,主要是因为平常没有机会实际应用吧,理论知识要有,实践也很重要,哪怕是写些小 demo 也可以的。

虽然主要是跟着 tutorials 的 concurrency 章节整理的,但这并不是官方文档的一个翻译哈,看到一个地方有前置技能不足的时候,就会穿插着一些对 API 的学习之类的。

文章太长,分为两篇,此篇为 并发基础(二),主要是一些高级的并发基础知识。

High Level Concurrency Objects

1. Lock Objects

在 liveness 那一节中有举了一个 Alphonse 和 Gaston 鞠躬产生死锁的例子,这里产生死锁的原因在于可能 线程1 进入 bow,线程2 进入 bowBack,当 线程1 进入 bow 里的 bowBack 时,线程2 恰好也正在进入 bowBack,二者都在等彼此退出,但是却又永远不会退出,因为彼此在循环等待,会一直阻塞在这里,从而产生死锁(循环等待、不可剥夺、独自占有、保持请求)。

public class Deadlock {
    static class Friend {
        private final String name;
        public Friend(String name) {
            this.name = name;
        }
        public String getName() {
            return this.name;
        }
        public synchronized void bow(Friend bower) {
            System.out.format("%s: %s"
                + "  has bowed to me!%n", 
                this.name, bower.getName());
            bower.bowBack(this);
        }
        public synchronized void bowBack(Friend bower) {
            System.out.format("%s: %s"
                + " has bowed back to me!%n",
                this.name, bower.getName());
        }
    }

    public static void main(String[] args) {
        final Friend alphonse =
            new Friend("Alphonse");
        final Friend gaston =
            new Friend("Gaston");
        new Thread(new Runnable() {
            public void run() { alphonse.bow(gaston); }
        }).start();
        new Thread(new Runnable() {
            public void run() { gaston.bow(alphonse); }
        }).start();
    }
}

在这一节作者用 lock objects 来解决这个问题

先简单了解一下 Lock 这个接口:

Lock Objects (The Java™ Tutorials >Essential Classes > Concurrency)

Lock objects work very much like the implicit locks used by synchronized code. As with implicit locks, only one thread can own a Lock object at a time. Lock objects also support a wait/notify mechanism, through their associated Condition objects.

Lock 很像同步代码使用的内置锁,在同一时刻只有一个线程可以获得 Lock 对象。通过关联 Condition 对象,Lock 对象也支持 wait/notify 机制。

关于 Condition:

Condition (Java Platform SE 8 )

Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.

在 Lock 取代了 synchronized 方法和语句的地方,Condition 相应地取代了 Object 监视器方法(wait, notify and notifyAll)的使用。

看一下 tryLock() 这个方法:
Lock (Java Platform SE 8 ) >

  • boolean tryLock()
    Acquires the lock only if it is free at the time of invocation.

    Acquires the lock if it is available and returns immediately with the value true. If the lock is not available then this method will return immediately with the value false.

    A typical usage idiom for this method would be:

     
     Lock lock = …;
     if (lock.tryLock()) {
       try {
         // manipulate protected state
       } finally {
         lock.unlock();
       }
     } else {
       // perform alternative actions
     }
    This usage ensures that the lock is unlocked if it was acquired, and doesn’t try to unlock if the lock was not acquired.
    >
    Returns:
    true if the lock was acquired and false otherwise

它会尝试获取锁,如果成功,立刻返回 true,如果失败也会立刻返回 false,API 文档中举得那个例子就是一个典型的用法,这种使用 tryLock() 的方式可以确保在获取锁后一定会释放锁,因为 Lock 和 synchronized 有一个不一样的地方在于 synchronized 会自动释放,而 Lock 不会,所以一定要保证手动释放 Lock 锁,并且这种方式也可以保证如果没获取锁的情况不会 unlock()。

下面看怎么用 Lock 解决鞠躬的死锁问题:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Random;

public class Safelock {
    // 让 Friend 作为一个静态内部类,因为不需要引用外部变量
    static class Friend {
        private final String name;
        // 这里使用的是可重入锁
        private final Lock lock = new ReentrantLock();

        public Friend(String name) {
            this.name = name;
        }

        public String getName() {
            return this.name;
        }

        // 即将到来的鞠躬
        public boolean impendingBow(Friend bower) {
            Boolean myLock = false;
            Boolean yourLock = false;
            try {
                myLock = lock.tryLock();
                yourLock = bower.lock.tryLock();
            } finally {
                // 如果都获取到了锁 or 都没获取到,就不会释放锁
                // 如果只有一个获取到了,需要释放
                if (! (myLock && yourLock)) {
                    if (myLock) {
                        lock.unlock();
                    }
                    if (yourLock) {
                        bower.lock.unlock();
                    }
                }
            }
            // 只有两个锁都获取到时才会返回 true
            return myLock && yourLock;
        }
            
        public void bow(Friend bower) {
            // 如果都获取到了锁
            if (impendingBow(bower)) {
                try {
                    System.out.format("%s: %s has"
                        + " bowed to me!%n", 
                        this.name, bower.getName());
                    bower.bowBack(this);
                } finally {
                    lock.unlock();
                    bower.lock.unlock();
                }
            } else {
                // 如果只获取到了一个锁,说明其他线程抢占了另一个锁
                // 在这个情景中就是 Alphonse 鞠躬之前发现在 Gaston 正要鞠躬
                // 因此自己就不鞠躬了,从而避免了死锁
                System.out.format("%s: %s started"
                    + " to bow to me, but saw that"
                    + " I was already bowing to"
                    + " him.%n",
                    this.name, bower.getName());
            }
        }

        public void bowBack(Friend bower) {
            System.out.format("%s: %s has" +
                " bowed back to me!%n",
                this.name, bower.getName());
        }
    }

    // 同样地,也是将 BowLoop 作为静态内部类
    static class BowLoop implements Runnable {
        private Friend bower;
        private Friend bowee;

        public BowLoop(Friend bower, Friend bowee) {
            this.bower = bower;
            this.bowee = bowee;
        }
    
        public void run() {
            Random random = new Random();
            for (;;) {
                try {
                    Thread.sleep(random.nextInt(10));
                } catch (InterruptedException e) {}
                bowee.bow(bower);
            }
        }
    }
            

    public static void main(String[] args) {
        final Friend alphonse =
            new Friend("Alphonse");
        final Friend gaston =
            new Friend("Gaston");
        new Thread(new BowLoop(alphonse, gaston)).start();
        new Thread(new BowLoop(gaston, alphonse)).start();
    }
}

2. Executors

文档教程只是一些比较简单的介绍,先过一遍了。

2.1 Executor Interfaces

concurrent 包中主要有三个 Executor 接口 - Executor - ExecutorService - ScheduledExecutorService

  1. Executor

运行新任务。
只有一个 execute() 方法,用来创建线程。但是这个方法没有定义具体的实现方式,所以对于不同的 Executor 的实现,有不同的创建方式。

Executor (Java Platform SE 8 )

  • execute

    void execute(Runnable command)
    Executes the given command at some time in the future. The command may execute in a new thread, in a pooled thread, or in the calling thread, at the discretion of the Executor implementation.
    Parameters:
    command - the runnable task
    Throws:
    RejectedExecutionException - if this task cannot be accepted for execution
    NullPointerException - if command is null

execute() 接受一个 Runnable 对象。

  1. ExecutorService

ExecutorService 除了提供 execute() 方法还提供了 submit() 方法,这个方法不仅可以接受 Runnable 对象还可以接受 Callable 对象。Callable 对象可以使任务返还执行的结果

Callable (Java Platform SE 8 )

  • call

    V call() throws Exception
    Computes a result, or throws an exception if unable to do so.
    Returns:
    computed result
    Throws:
    Exception - if unable to compute a result

call() 方法会返回计算的结果。

通过 submit() 方法返回的 Future 对象可以读取 Callable 任务的执行结果,或是管理 Callable 任务和 Runnable 任务的状态。

关于 Future 这个接口,它是表示异步计算的结果,提供的方法是用来 1. 检查计算是否完成 2. 是否等待计算完成 3. 是否查找计算的结果

简单使用的例子:

 interface ArchiveSearcher {
    String search (String target);
 }

 class App {

    ExecutorService executor = ...
    ArchiveSearcher searcher = ...

    void showSearch(final String target) throws InterruptedException {

        Future<String> future = executor.submit(new Callable<String>() {
            public String call() {
                return searcher.search(target);
            }
        });

        displayOtherThings(); // do other things while searching

        try {
            // 计算完成时才能用 get() 获取结果,如果没有计算完成会阻塞着
            displayText(future.get()); // use future
        } catch (ExecutionException ex) {
            cleanup();
            return;
        }

    }
    
 }

submit 里的 Callable 对象也可以替换成如下代码:

 FutureTask<String> future =
   new FutureTask<String>(new Callable<String>() {
     public String call() {
       return searcher.search(target);
   }});
 executor.execute(future);

因为 FutureTask 是实现了 Runnable 接口的 Future 的实现,所以可以由 Executor 来执行。

public class FutureTask<V> 
extends Object 
implements RunnableFuture<V>
public interface RunnableFuture<V>
extends Runnable, Future<V>

ExecutorService 也提供了批量运行 Callable 任务的方法。最后,ExecutorService 还提供了一些关闭执行器的方法。如果需要支持即时关闭,执行器所执行的任务需要正确处理中断。

  1. ScheduledExecutorService

扩展了 ExecutorService 接口,添加了 schedule 方法。

public interface ScheduledExecutorService
extends ExecutorService

通过 schedule 方法可以让命令在给定延迟的时间之后执行或者定期执行。

2.2 Thread Pools

这里只是对线程池的一个简单的介绍。

大多数 concurrent 包里的 executor 的实现都使用了线程池(由 worker 线程组成),worker 线程独立于它所执行的 Runnable 任务和 Callable 任务,并且经常用来执行多个任务。

使用 worker 线程可以使创建线程的开销最小化。在大规模并发应用中,创建大量的Thread对象会占用占用大量系统内存,分配和回收这些对象会产生很大的开销。

一种最常见的线程池是 fixed thread pool(固定大小的线程池)。这种线程池始终有一定数量的线程在运行,如果一个线程由于某种原因终止运行了,线程池会自动创建一个新的线程来代替它。需要执行的任务通过一个内部队列提交给线程,当没有更多的工作线程可以用来执行任务时,队列保存额外的任务。 使用 fixed thread pool 的一个很重要的好处是可以 degrade gracefully。

比如一个 Web 服务器,每一个 HTTP 请求都是由一个单独的线程来处理,不可能为每一个 HTTP 请求都创建一个新线程,这样的话当系统的开销超出其能力时,会突然地对所有请求都停止响应。如果限制 Web 服务器可以创建的线程数量,那么它就不必立即处理所有收到的请求,而是在有能力处理请求时才处理。

创建一个使用 fixed thread pool 的 executor 的最简单的方法是调用 java.util.concurrent.Executors 的 newFixedThreadPool 工厂方法。

Executors 还提供了下面的工厂方法: - newCachedThreadPool - newSingleThreadExecutor - 还有一些创建 ScheduledExecutorService executor 的方法。

还有其他的,如 ThreadPoolExecutor or ScheduledThreadPoolExecutor

2.3 Fork/Join

Basic use:

if (my portion of the work is small enough)
  do the work directly
else
  split my work into two pieces
  invoke the two pieces and wait for the results

思想有点像分而治之的感觉,主要是可以充分利用多处理器系统的并行处理能力。

文档中举了一个图片模糊处理的例子:

假设你想要模糊一张图片。原始的 source 图片由一个整数的数组表示,每个整数表示一个像素点的颜色数值。与 source 图片相同,模糊之后的 destination 图片也由一个整数数组表示。 对图片的模糊操作是通过对 source 数组中的每一个像素点进行处理完成的。

处理的过程:将每个像素点的色值取出,与周围像素的色值(红、黄、蓝)放在一起取平均值,得到的结果被放入 destination 数组。

因为一张图片会由一个很大的数组来表示,所以处理图片过程可能会很耗时,但是如果使用 fork/join 框架来完成,就可以充分利用多处理器系统的并行处理能力,加快处理速度。

public class ForkBlur extends RecursiveAction {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
  
    // Processing window size; should be odd.
    private int mBlurWidth = 15;
  
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0),
                                    mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float)((pixel & 0x00ff0000) >> 16)
                      / mBlurWidth;
                gt += (float)((pixel & 0x0000ff00) >>  8)
                      / mBlurWidth;
                bt += (float)((pixel & 0x000000ff) >>  0)
                      / mBlurWidth;
            }
          
            // Reassemble destination pixel.
            int dpixel = (0xff000000     ) |
                   (((int)rt) << 16) |
                   (((int)gt) <<  8) |
                   (((int)bt) <<  0);
            mDestination[index] = dpixel;
        }
    }
  
  ...

现在实现抽象方法 compute(),在处理时,可以直接计算,也可以将其分开计算,取决于一个阈值,这个阈值可以简单地用数组的长度来代表。

protected static int sThreshold = 100000;

protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }
    
    int split = mLength / 2;
    
    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
              new ForkBlur(mSource, mStart + split, mLength - split,
                           mDestination));
}

因为 fork/join 的核心就是 ForkJoinPool 类,ForkJoinPool 可以执行 ForkJoinTask 任务,而 RecursiveAction 继承了 ForkJoinTask。所以上面这个方法如果是在 RecursiveAction 类中,就可以在 ForkJoinPool 中设置任务并令其执行。

// source image pixels are in src
// destination image pixels are in dst
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);

// Create the ForkJoinPool that will run the task.
ForkJoinPool pool = new ForkJoinPool();

// Run the task.
pool.invoke(fb);

参考资料

The Java™ Tutorials

 
comments powered by Disqus