Java Bài 50 – Thread Pool Tập 3 – ThreadPoolExecutor

Posted by
Rating: 4.5/5. From 4 votes.
Please wait...

Chào mừng các bạn đã đến với bài học Java số 50, bài học về Thread Pool phần thứ ba. Đây là bài học trong chuỗi bài về lập trình ngôn ngữ Java của Yellow Code Books.

Như vậy sau khi làm quen với Phần 2 về Thread Pool, bạn đã biết đến một cách khá dễ dàng để chúng ta có thể nhanh chóng xây dựng một Thread Pool mà không cần biết quá nhiều về các thông số rườm rà khác. Tuy nhiên cái gì cũng vậy, tiện dụng thì kèm theo đó công năng có thể không đủ mạnh. Chính vì vậy mà chúng ta phải cần tìm hiểu kỹ hơn về cách sử sụng Thread Pool trong Java, thông qua một cách sử dụng có phần “thủ công” hơn. Rồi cuối cùng sẽ cùng so sánh xem là với mỗi cách, điểm mạnh điểm yếu của chúng là gì nhé.

Giới Thiệu Về ThreadPoolExecutor

Như đã được giới thiệu ở bài đầu tiên, ThreadPoolExecutor chính là nhân vật chính của Thread Pool. Việc mà bài hôm trước Helper Class Executors gọi ra các câu lệnh, chẳng hạn như Executors.newFixedThreadPool(5), thực chất cũng là vì muốn giúp chúng ta đơn giản hơn trong việc khai báo một ThreadPoolExecutor mà thôi. Bạn có thể dừng ở việc hiểu về Executors ở bài trước, điều đó đủ để bạn có thể xây dựng các Thread Pool tiện dụng được sử dụng ở hầu hết các nhu cầu của bạn. Nhưng với việc tìm hiểu thêm về ThreadPoolExecutor, bạn sẽ biết được bản chất thực sự của một Thread Pool được khai báo trong Java như thế nào.

Chúng ta hãy bắt đầu với việc nhìn lại sơ đồ mô phỏng một ThreadPoolExecutor sau.

Đây chính là sơ đồ mô phỏng ThreadPoolExecutor
Đây chính là sơ đồ mô phỏng ThreadPoolExecutor

Chúng ta cũng cần thống nhất lại một số cách gọi trong bài viết hôm nay để đỡ đau đầu. Khi nói đến các Runnable, thì đó chính là các tác vụ bên trái ở sơ đồ trên, chúng sẽ được đưa vào Pool. Còn khi nói đến Queue hay hàng đợi, đó chính là các Runnable đã được vào Pool nhưng vẫn phải xếp hàng ở Task Queue ở giữa sơ đồ trên và chờ được thực thi. Và khi nói đến Thread chính là các Runnable khi này đã được thực thi (hay start) thành Thread ở dãy bên phải trong Pool.

Một vài quy ước dùng từ trong bài viết hôm nay

Sơ đồ là vậy, còn về code, bài hôm nay chúng ta sẽ làm quen với phương thức khởi tạo một ThreadPoolExecutor với các tham số đầy đủ như sau.

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                                int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue,
                                ThreadFactory threadFactory,
                                RejectedExecutionHandler handler);

Ối sao nhiều tham số vậy. Bạn đừng lo, dù khá là nhiều tham số, nhưng vai trò của chúng khá rạch ròi, và việc hiểu các thông số này không quá khó, nếu kết hợp bài bản với nhau bạn sẽ có trong tay một công cụ tuyệt vời để xây dựng một Thread Pool hoàn toàn theo ý muốn.

À, phương thức khởi tạo mẫu mình đưa ra trên đây là phương thức với đầy đủ tham số nhất. Sẽ có những phương thức khởi tạo ít tham số hơn được dùng trong các bài thực hành của bài học, bạn sẽ được làm quen hết với các loại phương thức khởi tạo này.

Một ý nữa, nếu nhìn vào khai báo lớp này, thì ThreadPoolExecutor thực ra cũng là một lớp triển khai của ExecutorService. Cho nên bài viết của ngày hôm nay sẽ tập trung hoàn toàn vào việc khai báo ThreadPoolExecutor, còn việc sử dụng lớp này như thế nào thì y chang như hướng dẫn ở phần này của bài hôm trước bạn nhé.

Và một ý cuối cùng trước khi đi qua các phần bên dưới. Rằng bài học tuy dài và có nhiều source code, nhưng các source code cũng chỉ là sự lặp đi lặp lại việc phối hợp các tham số khởi tạo cho ThreadPoolExecutor mà thôi, bạn sẽ không bị quá đau đầu về số lượng code của bài hôm nay đâu.

Làm Quen Các Tham Số Của ThreadPoolExecutor

Như bạn có thể thấy trên đây. Để điều khiển ThreadPoolExecutor, việc đầu tiên đó là chúng ta cần nắm vững các tham số của phương thức khởi tạo lớp này trước, rồi sau đó mới vận dụng chúng để tạo một Thread Pool mong muốn.

  • corePoolSize: đây chính là giá trị giúp khai báo số lượng Thread mà Thread Pool này cho phép chạy cùng lúc. Có thể gọi là core thread. Một Thread đã được gọi là core thread thì chắc chắn sẽ được thực thi. Do bạn đã quen với việc sử dụng Thread Pool ở bài trước rồi thì tham số này cũng không quá khó để hiểu.
  • maximumPoolSize: ThreadPoolExecutor cho phép bạn thiết lập thêm một thông số về số lượng Thread có thể chạy bên trong Pool này nữa. Giá trị này có thể lớn hơn hoặc bằng corePoolSize, cho phép Thread Pool có thể thực thi thêm Thread nữa, nhiều hơn số lượng core thread, khi mà Queue đã chứa đầy các Runnable đang đợi. Điều này giúp ThreadPoolExecutor có thể linh động giải quyết các Runnable bị “tồn đọng” quá nhiều trong Queue.
  • keepAliveTime: nếu như trong Pool hiện tại có nhiều Thread hơn con số corePoolSize (và dĩ nhiên là nhỏ hơn hay bằng con số maximumPoolSize) còn đang “rảnh” (không có Runnable nào để thực thi nữa cả), thì những Thread này sẽ bị đếm thời gian dựa trên thông số keepAliveTime. Nếu hết thời gian chờ theo thông số này mà Thread đó vẫn chưa được thực thi, nó sẽ bị hủy. Thông số này giúp hệ thống quản lý các Thread Pool đang rảnh được hiệu quả hơn, giúp giải phóng các resource nhàn rỗi. Và vì thông số này mình chưa thấy có tác động lớn đến việc tổ tức ThreadPoolExecutor của chúng ta nên bài viết này mình sẽ không nói chi tiết em nó, chỉ nêu ra ở mục này mà thôi.
  • unit: đơn vị thời gian của keepAliveTime. Chẳng hạn nếu muốn đơn vị là mili giây thì chúng ta sẽ dùng enum TimeUnit.MILLISECONS.
  • threadFactory: giúp bạn tự định nghĩa ra một cách thức tạo mới Thread của riêng bạn. Nếu bạn không cung cấp thông số này thì Thread Pool sẽ dùng một cách thức có sẵn mà ở phần dưới của bài viết hôm nay chúng ta sẽ nói đến.
  • workQueue: hàng đợi dùng để chứa các Runnable mà Thread Pool sẽ lấy ra và thực thi lần lượt. Hàng đợi này quyết định số Runnable sẽ được đợi và số Thread được thực thi đồng thời theo corePoolSize hay maximumPoolSize. Số lượng Runnable sau khi đưa vào Pool mà vượt quá số lượng cho phép của hàng đợi sẽ bị “đối xử” tùy theo việc thiết lập handler như sau.
  • handler: quyết định cách thức mà một Runnable bị từ chối đưa vào hàng đợi. Chúng ta sẽ tìm hiểu kỹ ý nghĩa của tham số này sau.

Nào chúng ta sẽ cùng hiểu rõ dần các tham số trên thông qua các bài thực hành sau.

Bài Thực Hành Số 1 – Xây Dựng Thread Pool Giống Như newSingleThreadExecutors

Bài này chúng ta cùng xây dựng lại Thread Pool mà ở Bài thực hành số 1 của bài học trước chúng ta đã xây dựng thông qua Executors. Khi đó Thread Pool của chúng ta mỗi lần chỉ có duy nhất một Thread được chạy.

Trước hết, mình xin hiển thị lại lớp MyRunnable, mình lấy lại lớp này từ bài hôm trước, nhưng có thay đổi một chút xíu như sau để các bài thực hành được rõ nghĩa hơn.

  • Mình truyền thêm ThreadPoolExecutor vào phương thức khởi tạo luôn, để mỗi lần MyRunnable này được thực thi, chúng ta sẽ in ra số lượng Thread đang chạy và đang chờ trong Pool của chúng ta.
  • Mình override lại phương thức toString() để lát nữa đây chúng ta có thể in ra tên của MyRunnable này từ main().
public class MyRunnable implements Runnable {

    // Tên của Runnable, giúp chúng ta phân biệt Runnable nào đang thực thi trong Thread Pool
    private String name;
    // ThreadPoolExecutor để in ra số lượng Thread đang chạy và đang chờ trong Pool
    private ThreadPoolExecutor executor;

    public MyRunnable(String name, ThreadPoolExecutor executor) {
        this.name = name;
        this.executor = executor;
    }

    @Override
    public void run() {
        // In tên Thread, kèm số lượng Thread đang chạy và đang chờ trong Pool
        System.out.println(name + " đang thực thi... (số thread chạy: " + executor.getPoolSize() + ", số thread chờ: " + executor.getQueue().size() + ")");

        // Giả lập thời gian chạy của Runnable mất 2 giây
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // In trạng thái kết thúc
        System.out.println(name + " kết thúc.");
    }

    @Override
    public String toString() {
        return name;
    }
}

Và đây là cách dùng ThreadPoolExecutor ở phương thức main().

public static void main(String args[]) {
    int corePoolSize = 1;
    int maximumPoolSize = 1;
    long keepAliveTime = 0L;
    TimeUnit unit = TimeUnit.MILLISECONDS;
    LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Phương thức khởi tạo của MyRunnable có tham số name và executor truyền vào
        MyRunnable myRunnable = new MyRunnable("Runnable " + i, executor);
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}

Bạn có thể so sánh nhanh với code của bài hôm trước. Hôm nay chúng ta “thủ công hóa” với một đống tham số dùng cho phương thức khởi tạo của ThreadPoolExecutor (nếu bạn biết cách vào xem phương thức newSingleThreadExecutors() được xây dựng sẵn của bài hôm trước trên Eclipse hay InteliJ thì thấy với khai báo tham số hôm nay là tương tự nhau). Chắc các bạn cũng đoán được nhưng mình cũng xin giải nghĩa một chút cách sử dụng các tham số.

  • corePoolSize chúng ta khai báo là 1, điều này khiến cho Thread Pool chỉ chứa tối đa 1 Thread như bài hôm trước.
  • maximumPoolSize cũng là 1, chúng ta không muốn Thread Pool đưa thêm bất kỳ Thread nào vào nữa, chỉ một mà thôi.
  • keepAliveTime khi này được chỉ định là 0 (mili giây), vì với ví dụ này tham số này không có tác dụng gì cả vì mỗi corePoolSizemaximumPoolSize là như nhau, không có Thread nào start khi vượt quá corePoolSize cả.
  • workQueue sẽ chứa trong một LinkedBlockingQueue. Mình sẽ nói cụ thể về các loại Queue sau. Bạn chỉ cần biết với việc dùng LinkedBlockingQueue thì Queue này giúp chứa không giới hạn số lượng các Runnable đợi được thực thi khi mà corePoolSize đã đầy.

Có thể bạn chưa hiểu lắm các thông số. Nhưng cứ thực thi chương trình đi nhé, chúng ta sẽ làm rõ dần các thông số này ở các bài thực hành tiếp theo bên dưới. Và đây là kết quả in ra console của code trên đây.

Kết quả luôn luôn có 1 Thread trong Pool được chạy
Kết quả luôn luôn có 1 Thread trong Pool được chạy

Bạn có thể thấy số Thread đang chạy luôn luôn là 1. Cứ mỗi một Runnable được thực thi, thì con số Thread đang chờ (đúng hơn sẽ là các Runnable đang chờ) sẽ giảm dần.

Bài Thực Hành Số 2 – Xây Dựng Thread Pool Giống Như newFixedThreadPool

Chúng ta lại thử xây dựng Thread Pool như Bài thực hành số 2 của bài học trước. Để cùng nhau thấy rằng việc dùng Thread Pool với cách của bài trước hay bài hôm nay đều cho các kết quả tương đương, chỉ khác một chút cách khai báo tham số mà thôi.

Bài thực hành này mong muốn rằng mỗi một lần sẽ có 5 Thread được start. Cứ Thread nào kết thúc mà trong Queue vẫn còn Runnable đang đợi thì sẽ được Thread Pool start tiếp cho đủ số lượng corePoolSize mong muốn. Code sau cũng không khác Bài thực hành 1 trên kia lắm, chỉ thay các thông số corePoolSizemaximumPoolSize từ 1 lên 5 mà thôi.

public static void main(String args[]) {
    int corePoolSize = 5;
    int maximumPoolSize = 5;
    long keepAliveTime = 0L;
    TimeUnit unit = TimeUnit.MILLISECONDS;
    LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Phương thức khởi tạo của MyRunnable có tham số name và executor truyền vào
        MyRunnable myRunnable = new MyRunnable("Runnable " + i, executor);
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}

Kết quả thực thi Pool này như bài học trước bạn cũng thấy. Đầu tiên 5 Thread được thực thi. Sau đó hễ Thread nào xong thì Thread khác được start. Đảm bảo luôn luôn không quá 5 Thread được thực thi trong Pool.

Kết quả luôn luôn có tối đa 5 Thread trong Pool được chạy
Kết quả luôn luôn có tối đa 5 Thread trong Pool được chạy

Sở dĩ hàng chờ ban đầu không có Thread nào là vì Thread nào đưa vào Queue cũng được thực thi ngay, không có Thread nào kịp vào chờ cả. Sau đó khi corePoolSize đã đầy, thì Queue mới chứa các Runnable.

À và vì việc các Thread được start và câu lệnh in ra console không “đồng bộ” với nhau khiến thứ tự in ra của chúng hơi lộn xộn, bạn nên nhìn theo số thứ tự của Runnable, như Runnable 1, Runnable 2,… sẽ thấy số thread đang chạy sẽ tăng theo đúng với số Thread trong Pool thật bạn nhé.

Nói thêm

Bài Thực Hành Số 3 – Xây Dựng Thread Pool Giống Như newCachedThreadPool

Tiếp tục với việc xây dựng Thread Pool tương đồng với Bài thực hành số 3 của bài học hôm trước, để chúng ta hiểu rõ hơn về cách kiểm soát ThreadPoolExecutor của bài hôm nay.

Thực sự thì với việc sử dụng Executors.newCachedThreadPool() từ bài hôm trước, có thể bạn cũng không mường tượng được hết công dụng của loại Thread Pool này. Hôm nay với việc khai báo thủ công Thread Pool này một lần nữa, sẽ hé lộ thêm nhiều thông tin đấy nhé. Trước hết hãy đến việc code cái nào.

public static void main(String args[]) {
    int corePoolSize = 0;
    int maximumPoolSize = Integer.MAX_VALUE;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;
    SynchronousQueue<Runnable> workQueue = new SynchronousQueue<>();

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Phương thức khởi tạo của MyRunnable có tham số name và executor truyền vào
        MyRunnable myRunnable = new MyRunnable("Runnable " + i, executor);
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}

Wow, cách sử dụng tham số khi này có một chút đặc biệt. Chúng ta hãy cùng xem qua.

  • corePoolSize được chỉ định là 0, nhưng maximumPoolSize lại là giá trị lớn nhất mà kiểu Int có thể hỗ trợ. Điều này ban đầu giúp cho Thread Pool không quan tâm giá trị corePoolSize nữa mà sẽ thực thi với số lượng Thread lớn nhất mà hệ thống có thể đáp ứng được, vì giá trị maximumPoolSize khi này rất lớn.
  • keepAliveTime được chỉ định 60 giây. Và bởi vì corePoolSize được chỉ định là 0, nên khi này tất cả các Thread đều bị áp đặt bởi việc đếm ngược 60 giây này. Các Thread không được start trong thời gian này sẽ bị hủy.
  • workQueue khi này lại là một SynchronousQueue. Cũng là Queue như 2 bài thực hành trên thôi, nhưng việc dùng khác loại Queue này sẽ tác động đến cách Runnable được nằm trong hàng đợi như thế nào. Cụ thể thì SynchronousQueue khá đặc biệt, nó giúp start luôn Runnable được đưa vào và không nắm giữ chúng ở Queue gì cả. Các mục dưới của bài học sẽ làm rõ các Queue bạn yên tâm.

Kết quả thực thi cho thấy tất cả các Thread khi này đều được thực thi cùng lúc, và không có Thread nào phải chờ cả.

Kết quả tất cả các Thread trong Pool đều được chạy cùng lúc
Kết quả tất cả các Thread trong Pool đều được chạy cùng lúc

Với 3 bài thực hành trên đây đã giúp bạn phần nào hiểu rõ hơn về ThreadPoolExecutor đúng không nào. Việc của bạn chỉ là hiểu rõ và phối hợp các tham số khởi tạo để có được một Thread Pool đa dạng theo nhu cầu của bạn mà thôi.

Tuy nhiên các ví dụ trên cũng chỉ là các gợi ý ban đầu về việc kết hợp các tham số khởi tạo này. Chúng ta hãy cùng nhau đi sâu hơn một tí về từng tham số để cùng hiểu sâu hơn.

Tìm Hiểu corePoolSize Và maximumPoolSize

Như giới thiệu từ đầu bài học thì bạn đã hiểu corePoolSizemaximumPoolSize rồi. Hai tham số này giúp điều khiển số lượng Thread cùng chạy song song bên trong một Pool.

Nếu corePoolSizemaximumPoolSize có cùng giá trị. Chúng được gọi là Fixed-size Thread Pool, tức là các Thread Pool ở Bài thực hành số 1 và 2 trên đây. Điều này chúng ta không cần phải nói nhiều.

Nhưng nếu chúng khác nhau về giá trị, khi đó maximumPoolSize buộc phải lớn hơn giá trị corePoolSize nếu không muốn ứng dụng sẽ tung ra một IllegalArgumentException. Với việc khai báo kiểu này, thì chỉ khi Queue đã chứa đầy các Runnable cần chạy, khi đó số lượng Thread trong Pool mới được thực thi vượt giá trị corePoolSize và có thể đạt đến con số maximumPoolSize.

Chúng ta hãy đến với các bài thực hành sau để hiểu rõ hơn cách dùng hai tham số này.

Bài Thực Hành Số 4 – Thread Pool Đạt Tới Mức maximumPoolSize

Chúng ta cùng xem. Với corePoolSize2, và maximumPoolSize4, Queue của chúng ta chỉ khai báo cho phép chứa được 6 Runnable. Thì với việc nhét 10 Runnable vào Pool, với hình chụp console bên dưới bạn sẽ thấy nhanh chóng Queue sẽ chứa đầy 6 Runnable. Việc Queue “đầy ắp” thế này khiến Thread Pool sẽ lấy tới con số lớn nhất là 4 Thread cùng chạy cùng một lúc, nếu không sẽ “vỡ đê” (thực ra nếu Thread Pool không mở rộng số Thread cần chạy, nhiều Runnable sẽ bị hủy nếu bạn xem tiếp các ví dụ bên dưới bài học).

public static void main(String args[]) {
    int corePoolSize = 2;
    int maximumPoolSize = 4;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;
    ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(6);

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Phương thức khởi tạo của MyRunnable có tham số name và executor truyền vào
        MyRunnable myRunnable = new MyRunnable("Runnable " + i, executor);
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}
Kết quả có tới 4 Thread chạy cùng lúc
Kết quả có tới 4 Thread chạy cùng lúc

Bài Thực Hành Số 5 – Thread Pool Start Ở Mức corePoolSize

Cũng với code trên, lần này chúng ta nới thêm số lượng Queue lên 15, khiến cho số lượng Runnable đưa vào Pool không đủ làm đầy Queue này, chính vì thế mà chúng ta thấy mỗi lần chỉ có 2 Thread được thực thi thôi, đó cũng chính là giá trị của corePoolSize.

public static void main(String args[]) {
    int corePoolSize = 2;
    int maximumPoolSize = 4;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;
    ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(15);

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Phương thức khởi tạo của MyRunnable có tham số name và executor truyền vào
        MyRunnable myRunnable = new MyRunnable("Runnable " + i, executor);
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}
Kết quả chỉ có 2 Thread chạy cùng lúc
Kết quả chỉ có 2 Thread chạy cùng lúc

Tìm Hiểu threadFactory

Ở các ví dụ trên đây chúng ta không cần để ý đến threadFactory, khi này Thread Pool sẽ mặc định gọi đến Executors.defaultThreadFactory() cho chúng ta. Vậy threadFactory là gì?

Như chúng ta đã biết, khi dùng đến Thread Pool, chúng ta chỉ truyền vào đây các Runnable. Mà như cách thức tạo Thread từ Runnable mình có nói, để một Runnable start được, chúng ta phải đưa nó vào một khởi tạo của lớp Thread rồi gọi đến phương thức start() từ lớp Thread này. Trong Thread Pool cũng vậy, nó cũng cần có một Thread bao lấy Runnable cần chạy rồi start Runnable đó. Và Executors.defaultThreadFactory() giúp tận dụng pattern Factory để xây dựng sẵn một kịch bản giúp tạo ra Thread từ Runnable này, nó không đơn thuần chỉ new một Thread rồi truyền Runnable vào đâu, mà nó còn tạo sẵn các giá trị đồng nhất cho các Thread trong Pool, như đặt tên mặc định cho Thread, nhóm chung vào một ThreadGroup, khai báo cùng độ ưu tiên,…

Bài Thực Hành Số 6 – Tự Xây Dựng threadFactory Trong Thread Pool

Cũng với code quen thuộc như các bài thực hành trước, nhưng dưới đây mình thay việc sử dụng MyRunnable bằng một lớp vô danh của Runnable để nhanh chóng sửa nội dung in ra console của nó khi được chạy.

public static void main(String args[]) {
    int corePoolSize = 2;
    int maximumPoolSize = 4;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;
    ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(15);

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Tạo một Runnable vô danh rồi in tên của lớp ra khi được thực thi
        Runnable myRunnable = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " thực thi");
            }
        };
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}

Bạn thấy rằng code trên chưa cũng chưa khai báo gì tới threadFactory cả và vì vậy mà tên của Runnable khi start sẽ như sau.

Kết quả khi in ra tên mặc định của Thread
Kết quả khi in ra tên mặc định của Thread

Bạn có thấy rằng các Thread này được nhóm chung một ThreadGroup nên chúng có cùng tên pool-1 ở đầu không. Còn tại sao chỉ chó thread-1thread-2 được thực thi trong khi chúng ta có tới 10 Runnable, điều này là vì 2 Thread được tạo ra với các tên thread-1thread-2 được tận dụng trở lại để khởi chạy các Runnable tiếp theo trong Queue, hệ thống không tạo ra thêm một Thread nào mới ngoài 2 Thread này hết. Bạn đã thấy sự lợi hại của Thread Pool trong việc tận dụng tối đa resource cho việc thực thi các tác vụ song song chưa nào.

Dưới đây là việc thay đổi nhỏ, bằng cách tạo ra một lớp vô danh nữa kế thừa từ lớp ThreadFactory và rồi override phương thức newThread của nó. Bằng cách này chúng ta sẽ kiểm soát tên Thread được khởi tạo thông qua hai tham số của Thread như sau.

public static void main(String args[]) {
    int corePoolSize = 2;
    int maximumPoolSize = 4;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;
    ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(15);

    ThreadFactory threadFactory = new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "Thread của tôi");
        }
    };

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Tạo một Runnable vô danh rồi in tên của lớp ra khi được thực thi
        int index = i;
        Runnable myRunnable = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " - " + index + " thực thi");
            }
        };
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}

Với việc thay đổi trên thì console sẽ như sau.

Kết quả khi in ra tên Thread được xây dựng lại thông qua threadFactory
Kết quả khi in ra tên Thread được xây dựng lại thông qua threadFactory

Tìm Hiểu workQueue

Như các ví dụ trên bạn cũng đã hiểu workQueue là một tham số giúp chúng ta định nghĩa thể loại Queue mà Thread Pool dùng để chứa đựng các Runnable đã vào Pool nhưng còn đang đợi, chưa được thực thi. Có Queue cho phép chúng ta chỉ định cứng độ lớn cho hàng đợi, có Queue không cần. Vậy thì tóm lại là ThreadPoolExecutor có thể có bao nhiêu hàng đợi. Mình xin liệt kê chúng từng phần như sau.

SynchronousQueue

Queue này được giới thiệu là một dạng Queue bàn giao trực tiếp. Tức là chúng không lưu trữ bất kỳ Runnable nào trên Queue cả, ngay khi chúng nhận được các Runnable, chúng sẽ tìm cách thực thi ngay các Runnable này. Queue này không thể chỉ định độ lớn, nó chứa đựng tối đa các Runnable đưa vào Pool. Bạn đã làm quen với Queue này ở Bài thực hành số 3 trên đây.

Tuy nhiên nếu như có nhiều Runnable vào Queue hơn số lượng có thể thực thi (thông qua các tham số corePoolSizemaximumPoolSize mà bạn đã biết), thì các Runnable chưa được start ngay sẽ bị hủy, và một RejectedExecutionException sẽ được tung ra. Bài thực hành dưới đây mình sẽ cho bạn xem ý này, một lát nữa ở mục bên dưới chúng ta sẽ xử lý Exception này sao cho đẹp mắt hơn.

Bài Thực Hành Số 7 – Sử Dụng SynchronousQueue

public static void main(String args[]) {
    int corePoolSize = 2;
    int maximumPoolSize = 4;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;
    SynchronousQueue<Runnable> workQueue = new SynchronousQueue<>();

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Phương thức khởi tạo của MyRunnable có tham số name và executor truyền vào
        MyRunnable myRunnable = new MyRunnable("Runnable " + i, executor);
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}

Bạn thấy rằng Pool của chúng ta được nới rộng ra chạy với tối đa 4 Thread thì… boom, Exception bị tung ra và các Runnable còn chưa kịp start đều đã bị hủy hết.

Kết quả khi thực thi chỉ được 4 Thread thì Exception tung ra
Kết quả khi thực thi chỉ được 4 Thread thì Exception tung ra

LinkedBlockingQueue

Được mệnh danh là Queue không giới hạn. Thường thì người ta dùng đến hàng đợi này mà không quá quan tâm đến sức chứa tối đa của nó (mặc dù bạn hoàn toàn có thể chỉ định sức chứa này thông qua phương thức khởi tạo của nó). Việc xem như có sức chứa không giới hạn này giúp cho các Thread bên trong Thread Pool thường chỉ dùng đến số lượng corePoolSize để thực thi các tác vụ, và như vậy maximumPoolSize cũng sẽ bị lu mờ trong việc dùng hàng đợi kiểu này. Bạn xem code ở bài thực hành sau (Bài thực hành số 1 & 2 trên đây đã nói về LinkedBlockingQueue rồi, tuy nhiên lúc đó bạn chỉ định corePoolSizemaximumPoolSize như nhau, bài thực hành dưới đây cho thấy nếu chúng ra dùng khác nhau 2 thông số này).

Bài Thực Hành Số 8 – Sử Dụng LinkedBlockingQueue

public static void main(String args[]) {
    int corePoolSize = 2;
    int maximumPoolSize = 4;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;
    LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Phương thức khởi tạo của MyRunnable có tham số name và executor truyền vào
        MyRunnable myRunnable = new MyRunnable("Runnable " + i, executor);
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}

Bạn xem Pool của chúng ta chỉ chạy mỗi 2 Thread một lần mà thôi, và không có Thread nào bị hủy do Queue chứa được rất nhiều.

Kết quả chỉ có 2 Thread chạy cùng lúc
Kết quả chỉ có 2 Thread chạy cùng lúc

ArrayBlockingQueue

Nếu bạn sợ các hàng đợi không giới hạn trên đây có thể gây ảnh hưởng xấu đến hiệu năng của hệ thống. Thì đây ArrayBlockingQueue luôn đòi hỏi bạn phải khai báo độ lớn của hàng đợi. Tuy nhiên việc dùng Queue kiểu này thường khá đau đầu, vì chúng ta không biết chỉ định độ lớn của hàng đợi bao nhiêu là đủ. Không phải việc tiết kiệm hàng đợi thông qua chỉ định độ lớn nhỏ lại là một ý hay đâu nhé, việc các Runnable không được vào hàng đợi do không đủ chỗ cho nó, có thể khiến các Runnable này bị hủy, và lại dẫn tới việc tăng corePoolSize hay maximumPoolSize lên, thì lại là vấn đề đau đầu khác. Như bài thực hành sau nếu bạn chỉ định hàng đợi chỉ có 2 Runnable chờ, thì không đủ để chạy 10 Runnable, và như vậy bạn sẽ thấy số còn lại chưa kịp vào hàng đợi sẽ bị hủy và một Exception cũng xuất hiện.

Bài Thực Hành Số 9 – Sử Dụng ArrayBlockingQueue

public static void main(String args[]) {
    int corePoolSize = 2;
    int maximumPoolSize = 4;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;
    ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Phương thức khởi tạo của MyRunnable có tham số name và executor truyền vào
        MyRunnable myRunnable = new MyRunnable("Runnable " + i, executor);
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}
Kết quả không đủ 10 Runnable được thực thi
Kết quả không đủ 10 Runnable được thực thi

Tìm Hiểu handler

Các ví dụ trên đây chưa có ví dụ nào nói rõ về việc sử dụng handler này cả. Mình dùng đến tên tham số handle để chỉ về mục cuối cùng cần tìm hiểu của bài viết hôm nay thôi, thực ra nó là RejectedExecutionHandler. Đọc đầy đủ tên lớp giúp chúng ta nắm rõ hơn về công năng của thành phần này đúng không nào. Đây là thành phần giúp chúng ta quản lý cách thức hành xử khi mà các Runnable bị từ chối thực hiện bởi Thread Pool, bởi vì không vào được Queue, như hai Bài thực hành số 7 & 9 trên đây.

Để hiểu về RejectedExecutionHandler hơn là để vận dụng các handler vào thực tế, chúng ta hãy đến với bài thực hành sau.

Bài Thực Hành Số 10 – Xây Dựng RejectedExecutionHandler Của Chúng Ta

Với bài thực hành này chúng ta sẽ xây dựng một thể hiện của RejectedExecutionHandler, với thể hiện này chúng ta chỉ cần in ra console Runnable nào đã được hủy mà thôi. Code của Bài thực hành số 9 trên đây được xây dựng lại như sau.

public static void main(String args[]) {
    int corePoolSize = 2;
    int maximumPoolSize = 4;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;
    ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
    RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println(r.toString() + " bị hủy.");
        }
    };

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Phương thức khởi tạo của MyRunnable có tham số name và executor truyền vào
        MyRunnable myRunnable = new MyRunnable("Runnable " + i, executor);
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}

Với việc chỉ định handler như trên, bạn thấy rằng khi này console sẽ in ra các Runnable bị hủy rõ ràng đúng không nào, thay vì tung ra một Exception và không rõ nghĩa gì cả ở Bài thực hành số 9.

Kết quả không đủ 10 Runnable được thực thi, nhưng biết được Runnable nào bị hủy
Kết quả không đủ 10 Runnable được thực thi, nhưng biết được Runnable nào bị hủy

Tuy với bài thực hành trên chúng ta hiểu được cách mà Thread Pool gọi đến handle một khi Runnable bị hủy. Nhưng cách thức trên đây dù sao cũng chỉ mang tính in ra console để tham khảo. Thực tế khi sử dụng ThreadPoolExecutor chúng ta có các handler được xây dựng sẵn như sau.

ThreadPoolExecutor.AbortPolicy

Nếu bạn sử dụng handler này. Uhm, nó y chang Bài thực hành số 9. Vì thực ra nếu chúng ta không khai báo gì cho handler, ThreadPoolExecutor sẽ sử dụng ThreadPoolExecutor.AbortPolicy làm mặc định cho chúng ta. Tuy nhiên bài thực hành tiếp theo đây chúng ta cũng sẽ khai báo nó một cách tường minh cho dễ hiểu.

Bài Thực Hành Số 11 – Sử dụng ThreadPoolExecutor.AbortPolicy

Bạn hãy để ý đến tham số cuối cùng của ThreadPoolExecutor như sau.

public static void main(String args[]) {
    int corePoolSize = 2;
    int maximumPoolSize = 4;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;
    ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.AbortPolicy());

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Phương thức khởi tạo của MyRunnable có tham số name và executor truyền vào
        MyRunnable myRunnable = new MyRunnable("Runnable " + i, executor);
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}

Kết quả như mình có nói, không khác Bài thực hành số 9.

Kết quả giống như bài thực hành số 9
Kết quả giống như bài thực hành số 9

ThreadPoolExecutor.CallerRunsPolicy

Khi sử dụng handler này thì bạn có thể yên tâm rằng không có một Runnable nào bị hủy cả. Ngay khi Runnable đó bị từ chối, thay vì tung ra một Exception, handle dựng sẵn này sẽ thực thi lại Runnable đó.

Bài Thực Hành Số 12 – Sử dụng ThreadPoolExecutor.CallerRunsPolicy

Nào chúng ta chỉ thay thế một chút tham số của code ở bài thực hành trên thôi.

public static void main(String args[]) {
    int corePoolSize = 2;
    int maximumPoolSize = 4;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;
    ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.CallerRunsPolicy());

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Phương thức khởi tạo của MyRunnable có tham số name và executor truyền vào
        MyRunnable myRunnable = new MyRunnable("Runnable " + i, executor);
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}

Kết quả là đầy đủ 10 Runnable được thực thi.

Kết quả đủ 10 Runnable được thực thi
Kết quả đủ 10 Runnable được thực thi

ThreadPoolExecutor.DiscardPolicy

Handler này chỉ đơn giản là bỏ qua Runnable bị từ chối thôi, không làm gì cả.

Bài Thực Hành Số 13 – Sử dụng ThreadPoolExecutor.DiscardPolicy

Không nói nhiều, mời bạn cùng xem code.

public static void main(String args[]) {
    int corePoolSize = 2;
    int maximumPoolSize = 4;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;
    ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.DiscardPolicy());

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Phương thức khởi tạo của MyRunnable có tham số name và executor truyền vào
        MyRunnable myRunnable = new MyRunnable("Runnable " + i, executor);
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}
Kết quả chỉ có 6 Runnable được thực thi, số Runnable bị hủy không hiện ra
Kết quả chỉ có 6 Runnable được thực thi, số Runnable bị hủy không hiện ra

ThreadPoolExecutor.DiscardOldestPolicy

Handler này đảm bảo khi Runnable chưa được thực thi mà bị hủy, thì những Runnable tồn tại lâu nhất sẽ bị hủy trước.

Bài Thực Hành Số 14 – Sử dụng ThreadPoolExecutor.DiscardOldestPolicy

Cũng chỉ với một thay đổi ở tham số.

public static void main(String args[]) {
    int corePoolSize = 2;
    int maximumPoolSize = 4;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;
    ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);

    // Khai báo một Thread Pool thông qua ThreadPoolExecutor()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());

    // Khai báo 10 Runnable, và "quăng" chúng vào Thread Pool vừa khai báo
    for (int i = 1; i <= 10; i++) {
        // Phương thức khởi tạo của MyRunnable có tham số name và executor truyền vào
        MyRunnable myRunnable = new MyRunnable("Runnable " + i, executor);
        executor.execute(myRunnable);
    }

    // Phương thức này đã nói ở phần ExecutorService của bài hôm trước
    executor.shutdown();
}

Bạn xem với 4 Runnable đầu không có gì phải bàn, nó là các Runnable được Thread Pool mang vào thực thi sớm nhất. Thế nhưng khi Runnable được lấy tiếp từ Queue để thực thi, thì chính là các Runnable số 9 & 10 được chỉ định, đây là các Runnable “trẻ nhất”, chúng được thêm vào Queue sau cùng, các Runnable “già” khác đã thăng thiên sớm rồi nhé.

Kết quả chỉ có 6 Runnable được thực thi, sự ưu tiên Runnable tiếp theo đó trong Queue đáng chú ý
Kết quả chỉ có 6 Runnable được thực thi, sự ưu tiên Runnable tiếp theo đó trong Queue đáng chú ý

Kết Luận

Chúng ta vừa mới đi qua một bài viết khá dài về ThreadPoolExecutor. Tuy vậy lý do cho nội dung “lê thê” này cũng bởi các dòng code được lặp đi lặp lại, thay đổi nhau các con số, để bạn có cái hiểu bao quát nhất cách sử dụng ThreadPoolExecutor của bài hôm nay. Mọi kiến thức về Thread Pool đã được thể hiện rõ ra từ các bài trước hết cả rồi.

Và với việc kết thúc bài học hôm nay, thì bạn có thể thấy dù với việc sử dụng cách thức nào để xây dựng một Thread Pool, tóm lại chúng vẫn chỉ là ThreadPoolExecutor mà thôi. Bạn có thể chọn cách khai báo ngắn gọn của bài hôm trước, hay khai báo rõ ràng tường minh như bài hôm nay đều được hết nhé.

Bài hôm nay cũng kết thúc chuỗi bài khá đồ sộ về Thread, đồng bộ hóa Thread, và Thread Pool, để rồi chúng ta cùng mở ra một phần kiến thức mới trong lập trình Java.

Cảm ơn bạn đã đọc các bài viết của Yellow Code Books. Bạn hãy ủng hộ blog bằng cách:
– Đánh giá 5 sao ở mỗi bài viết nếu thấy thích.
– Comment bên dưới mỗi bài viết nếu có thắc mắc.
– Để lại địa chỉ email của bạn ở thanh bên phải để nhận được thông báo sớm nhất khi có bài viết mới.
– Chia sẻ các bài viết của Yellow Code Books đến nhiều người khác.
– Ủng hộ blog theo hướng dẫn ở thanh bên phải để blog ngày càng phát triển hơn.

3 comments

  1. Hi anh,
    Sau loạt bài thread này, hy vọng anh sẽ làm về asynchronous programming cũng như so sánh nó với Thread. Cám ơn anh 😀

    No votes yet.
    Please wait...
  2. Anh viết bài rất có tâm, các ví dụ đơn giản, dễ hiểu.
    Cảm ơn anh rất nhiều. Hi vọng sau loạt bài này anh có thể làm thêm về asynchronous programming

    No votes yet.
    Please wait...
  3. anh ơi đén bài 50 là hết chương trình r ạ?

    No votes yet.
    Please wait...

Leave a Reply