Java запуск пула потоков

Как это работает в мире java. Пул потоков

Основной принцип программирования гласит: не изобретать велосипед. Но иногда, чтобы понять, что происходит и как использовать инструмент неправильно, нам нужно это сделать. Сегодня изобретаем паттерн многопоточного выполнения задач.

Представим, что у вас которая вызывает большую загрузку процессора:

public class Counter < public Double count(double a) < for (int i = 0; i < 1000000; i++) < a = a + Math.tan(a); >return a; > >

Мы хотим как можно быстрее обработать ряд таких задач, попробуем*:

public class SingleThreadClient < public static void main(String[] args) < Counter counter = new Counter(); long start = System.nanoTime(); double value = 0; for (int i = 0; i < 400; i++) < value += counter.count(i); >System.out.println(format("Executed by %d s, value : %f", (System.nanoTime() - start) / (1000_000_000), value)); > >

На моей тачке с 4 физическими ядрами использование ресурсов процессора top -pid :

image

Как вы заметили, загрузка одного процессора на один java-процесс с одним выполняемым потоком составляет 100%, но общая загрузка процессора в пользовательском пространстве составляет всего 2,5%, и у нас есть много неиспользуемых системных ресурсов.

Давайте попробуем использовать больше, добавив больше рабочих потоков:

public class MultithreadClient < public static void main(String[] args) throws ExecutionException, InterruptedException < ExecutorService threadPool = Executors.newFixedThreadPool(8); Counter counter = new Counter(); long start = System.nanoTime(); List> futures = new ArrayList<>(); for (int i = 0; i < 400; i++) < final int j = i; futures.add( CompletableFuture.supplyAsync( () ->counter.count(j), threadPool )); > double value = 0; for (Future future : futures) < value += future.get(); >System.out.println(format("Executed by %d s, value : %f", (System.nanoTime() - start) / (1000_000_000), value)); threadPool.shutdown(); > >

image

ThreadPoolExecutor

Для ускорения мы использовали ThreadPool — в java его роль играет ThreadPoolExecutor, который может быть реализован непосредственно или из одного из методов в классе Utilities. Если мы заглянем внутрь ThreadPoolExecutor, мы можем найти очередь:

private final BlockingQueue workQueue;

в которой задачи собираются, если запущено больше потоков чем размер начального пула. Если запущено меньше потоков начального размера пула, пул попробует стартовать новый поток:

public void execute(Runnable command) < . if (workerCountOf(c) < corePoolSize) < if (addWorker(command, true)) return; . if (isRunning(c) && workQueue.offer(command)) < . addWorker(null, false); . >>

Каждый addWorker запускает новый поток с задачей Runnable, которая опрашивает workQueue на наличие новых задач и выполняет их.

final void runWorker(Worker w)

ThreadPoolExecutor имеет очень понятный javadoc, поэтому нет смысла его перефразировать. Вместо этого, давайте попробуем сделать наш собственный:

public class ThreadPool implements Executor < private final QueueworkQueue = new ConcurrentLinkedQueue<>(); private volatile boolean isRunning = true; public ThreadPool(int nThreads) < for (int i = 0; i < nThreads; i++) < new Thread(new TaskWorker()).start(); >> @Override public void execute(Runnable command) < if (isRunning) < workQueue.offer(command); >> public void shutdown() < isRunning = false; >private final class TaskWorker implements Runnable < @Override public void run() < while (isRunning) < Runnable nextTask = workQueue.poll(); if (nextTask != null) < nextTask.run(); >> > > >

Теперь давайте выполним ту же задачу, что и выше, с нашим пулом.
Меняем строку в MultithreadClient:

// ExecutorService threadPool = Executors.newFixedThreadPool (8); ThreadPool threadPool = new ThreadPool (8);

Время выполнения практически одинаковое — 15 секунд.

Читайте также:  Oracle java 6 virustotal

Размер пула потоков

Попробуем еще больше увеличить количество запущенных потоков в пуле — до 100.

ThreadPool threadPool = new ThreadPool(100);

Мы можем видеть, что время выполнения увеличилось до 28 секунд — почему это произошло?

Существует несколько независимых причин, по которым производительность могла упасть, например, из-за постоянных переключений контекста процессора, когда он приостанавливает работу над одной задачей и должен переключаться на другую, переключение включает сохранение состояния и восстановление состояния. Пока процессор ​​занято переключением состояний, оно не делает никакой полезной работы над какой-либо задачей.

Количество переключений контекста процесса можно увидеть, посмотрев на csw параметр при выводе команды top.

image

На 8 потоках:

image

На 100 потоках:

Размер зависит от типа выполняемых задач. Разумеется, размер пула потоков редко должен быть захардокожен, скорее он должен быть настраиваемый а оптимальный размер выводится из мониторинга пропускной способности исполняемых задач.

Предполагая, что потоки не блокируют друг друга, нет циклов ожидания I/O, и время обработки задач одинаково, оптимальный пул потоков = Runtime.getRuntime().availableProcessors() + 1.

Если потоки в основном ожидают I/O, то оптимальный размер пула должен быть увеличен на отношение между временем ожидания процесса и временем вычисления. Например. У нас есть процесс, который тратит 50% времени в iowait, тогда размер пула может быть 2 * Runtime.getRuntime().availableProcessors() + 1.

Другие виды пулов

  1. Пул потоков с ограничением по памяти, который блокирует отправку задачи, когда в очереди слишком много задач MemoryAwareThreadPoolExecutor
  2. Пул потоков, который регистрирует JMX-компонент для контроля и настройки размера пула в runtime.
    JMXEnabledThreadPoolExecutor

Источник

Thread Pools

Most of the executor implementations in java.util.concurrent use thread pools, which consist of worker threads. This kind of thread exists separately from the Runnable and Callable tasks it executes and is often used to execute multiple tasks.

Читайте также:  Javascript имя вызвавшей функции

Using worker threads minimizes the overhead due to thread creation. Thread objects use a significant amount of memory, and in a large-scale application, allocating and deallocating many thread objects creates a significant memory management overhead.

One common type of thread pool is the fixed thread pool. This type of pool always has a specified number of threads running; if a thread is somehow terminated while it is still in use, it is automatically replaced with a new thread. Tasks are submitted to the pool via an internal queue, which holds extra tasks whenever there are more active tasks than threads.

An important advantage of the fixed thread pool is that applications using it degrade gracefully. To understand this, consider a web server application where each HTTP request is handled by a separate thread. If the application simply creates a new thread for every new HTTP request, and the system receives more requests than it can handle immediately, the application will suddenly stop responding to all requests when the overhead of all those threads exceed the capacity of the system. With a limit on the number of the threads that can be created, the application will not be servicing HTTP requests as quickly as they come in, but it will be servicing them as quickly as the system can sustain.

A simple way to create an executor that uses a fixed thread pool is to invoke the newFixedThreadPool factory method in java.util.concurrent.Executors This class also provides the following factory methods:

  • The newCachedThreadPool method creates an executor with an expandable thread pool. This executor is suitable for applications that launch many short-lived tasks.
  • The newSingleThreadExecutor method creates an executor that executes a single task at a time.
  • Several factory methods are ScheduledExecutorService versions of the above executors.
Читайте также:  Python library named requests

If none of the executors provided by the above factory methods meet your needs, constructing instances of java.util.concurrent.ThreadPoolExecutor or java.util.concurrent.ScheduledThreadPoolExecutor will give you additional options.

Источник

Оцените статью