Concurrent calls in java

JDK concurrent package

Модель памяти, существующая на данный момент в Java, гарантирует ожидаемый порядок выполнения многопоточного кода, при отсутствии в этом коде гонок потоков. И для того, чтобы обезопасить ваш код от гонок, придуманы различные способы синхронизации и обмена данными между ними.

  • Atomic
  • Locks
  • Collections
  • Synchronization points
  • Executors
  • Accumulators _jdk 1.8_
Atomic

В дочернем пакете java.util.concurrent.atomic находится набор классов для атомарной работы с примитивными типами. Контракт данных классов гарантирует выполнение операции compare-and-set за «1 единицу процессорного времени». При установке нового значения этой переменной вы также передаете ее старое значение (подход оптимистичной блокировки). Если с момента вызова метода значение переменной отличается от ожидаемого — результатом выполнения будет false .

Для примера возьмем два массива long переменных [1,2,3,4,5] и [-1,-2,-3,-4,-5] . Каждый из потоков будет последовательно итерироваться по массиву и суммировать элементы в единую переменную. Код (groovy) с пессимистичной блокировкой выглядит так:

class Sum < static monitor = new Object() static volatile long sum = 0 >class Summer implements Callable < long[] data Object call() throws Exception < data.each < synchronized (Sum.monitor) < println("$: add $ to $") Sum.sum += it > > > > Executors.newFixedThreadPool(2).invokeAll([ new Summer(data: [1,2,3,4,5]), new Summer(data: [-1,-2,-3,-4,-5]) ]) print("Sum: $") 

Результат выполнения будет ожидаемым:

pool-1-thread-1: add 1 to 0 pool-1-thread-2: add -1 to 1 pool-1-thread-1: add 2 to 0 pool-1-thread-2: add -2 to 2 pool-1-thread-1: add 3 to 0 pool-1-thread-2: add -3 to 3 pool-1-thread-1: add 4 to 0 pool-1-thread-1: add 5 to 4 pool-1-thread-2: add -4 to 9 pool-1-thread-2: add -5 to 5 Sum: 0 
  • попытка блокирования монитора
  • блокировка потока
  • разблокировка монитора
  • разблокировка потока

Рассмотрим использование AtomicLong для реализации оптимистичной блокировки при расчете этой же суммы:

class Sum < static volatile AtomicLong sum = new AtomicLong(0) >class Summer implements Callable < long[] data Object call() throws Exception < data.each < while(true) < long localSum = Sum.sum.get() if (Sum.sum.compareAndSet(localSum, localSum + it)) < println("$: add $ to $") break; > else < println("[MISS!] $: add $ to $") > > > > > Executors.newFixedThreadPool(2).invokeAll([ new Summer(data: [1,2,3,4,5]), new Summer(data: [-1,-2,-3,-4,-5]) ]) print("Sum: $") 

Как видно из результатов «ошибочных» попыток было не так уж и много:

[MISS!] pool-1-thread-1: add 1 to -1 pool-1-thread-2: add -1 to -1 pool-1-thread-2: add -2 to -3 [MISS!] pool-1-thread-1: add 1 to -3 pool-1-thread-2: add -3 to -6 pool-1-thread-1: add 1 to -5 [MISS!] pool-1-thread-2: add -4 to -5 pool-1-thread-1: add 2 to -7 pool-1-thread-2: add -4 to -7 pool-1-thread-1: add 3 to -9 pool-1-thread-2: add -5 to -9 pool-1-thread-1: add 4 to -5 pool-1-thread-1: add 5 to 0 Sum: 0 

При решении использовать оптимистичную блокировку важно, чтобы действие с модифицируемой переменной не занимало много времени. Чем дольше это действие — тем чаще будут случаться ошибочные compare-and-set , и тем чаще придется выполнять это действие повторно.

На основе compare-and-set может также реализовываться неблокирующая read блокировка. В данном случае в atomic переменной будет храниться версия обрабатываемого объекта. Получив значение версии до вычислений мы можем сверить ее после вычисления. Обычные read-write блокировки вступают в силу, только если проверка версии провалилась.

class Transaction < long debit >class Account < AtomicLong version = new AtomicLong() ReadWriteLock readWriteLock = new ReentrantReadWriteLock() Listtransactions = new ArrayList() > long balance(Account account) < ReentrantReadWriteLock.ReadLock locked while(true) < long balance = 0 long version = account.version.get() account.transactions.each //volatile write for JMM if (account.version.compareAndSet(version, version)) < if (locked) return balance > else < locked = account.readWriteLock.readLock() >> > void modifyTransaction(Account account, int position, long newDebit)
Locks
ReentrantLock

В отличие от syncronized блокировок, ReentrantLock позволяет более гибко выбирать моменты снятия и получения блокировки т.к. использует обычные Java вызовы. Также ReentrantLock позволяет получить информацию о текущем состоянии блокировки, разрешает «ожидать» блокировку в течение определенного времени. Поддерживает правильное рекурсивное получение и освобождение блокировки для одного потока. Если вам необходимы честные блокировки (соблюдающие очередность при захвате монитора) — ReentrantLock также снабжен этим механизмом.

Несмотря на то, что syncronized и ReentrantLock блокировки очень похожи — реализация на уровне JVM отличается довольно сильно.
Не вдаваясь в подробности JMM: использовать ReentrantLock вместо предоставляемой JVM syncronized блокировки стоит только в том случае, если у вас очень часто происходит битва потоков за монитор. В случае, когда в syncronized метод _обычно_ попадает лишь один поток — производительность ReentrantLock уступает механизму блокировок JVM.

Читайте также:  Epiclinic ru onlajn rasshifrovka mrt html
ReentrantReadWriteLock

Дополняет свойства ReentrantLock возможностью захватывать множество блокировок на чтение и блокировку на запись. Блокировка на запись может быть «опущена» до блокировки на чтение, если это необходимо.

StampedLock _jdk 1.8_

Реализовывает оптимистичные и пессимистичные блокировки на чтение-запись с возможностью их дальнейшего увеличения или уменьшения. Оптимистичная блокировка реализуется через «штамп» лока (javadoc):

double distanceFromOriginV1() < // A read-only method long stamp; if ((stamp = sl.tryOptimisticRead()) != 0L) < // optimistic double currentX = x; double currentY = y; if (sl.validate(stamp)) return Math.sqrt(currentX * currentX + currentY * currentY); >stamp = sl.readLock(); // fall back to read lock try < double currentX = x; double currentY = y; return Math.sqrt(currentX * currentX + currentY * currentY); >finally < sl.unlockRead(stamp); >> 
Collections
ArrayBlockingQueue

Честная очередь для передачи сообщения из одного потока в другой. Поддерживает блокирующие ( put() take() ) и неблокирующие ( offer() pool() ) методы. Запрещает null значения. Емкость очереди должна быть указанна при создании.

ConcurrentHashMap

Ключ-значение структура, основанная на hash функции. Отсутствуют блокировки на чтение. При записи блокируется только часть карты (сегмент). Кол-во сегментов ограничено ближайшей к concurrencyLevel степени 2.

ConcurrentSkipListMap

Сбалансированная многопоточная ключ-значение структура (O(log n)). Поиск основан на списке с пропусками. Карта должна иметь возможность сравнивать ключи.

ConcurrentSkipListSet

ConcurrentSkipListMap без значений.

CopyOnWriteArrayList

Блокирующий на запись, не блокирующий на чтение список. Любая модификация создает новый экземпляр массива в памяти.

CopyOnWriteArraySet

CopyOnWriteArrayList без значений.

DelayQueue

PriorityBlockingQueue разрешающая получить элемент только после определенной задержки (задержка объявляется через Delayed интерфейс объекта). DelayQueue может быть использована для реализации планировщика. Емкость очереди не фиксирована.

LinkedBlockingDeque

Двунаправленная BlockingQueue , основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.

LinkedBlockingQueue

Однонаправленная BlockingQueue , основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.

LinkedTransferQueue

Однонаправленная `BlockingQueue`, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована. Данная очередь позволяет ожидать когда элемент «заберет» обработчик.

PriorityBlockingQueue

Однонаправленная `BlockingQueue`, разрешающая приоритизировать сообщения (через сравнение элементов). Запрещает null значения.

Читайте также:  Css select text height
SynchronousQueue

Однонаправленная `BlockingQueue`, реализующая transfer() логику для put() методов.

Synchronization points
CountDownLatch

Барьер ( await() ), ожидающий конкретного (или больше) кол-ва вызовов countDown() . Состояние барьера не может быть сброшено.

CyclicBarrier

Барьер ( await() ), ожидающий конкретного кол-ва вызовов await() другими потоками. Когда кол-во потоков достигнет указанного будет вызван опциональный callback и блокировка снимется. Барьер сбрасывает свое состояние в начальное при освобождении ожидающих потоков и может быть использован повторно.

Exchanger

Барьер (`exchange()`) для синхронизации двух потоков. В момент синхронизации возможна volatile передача объектов между потоками.

Phaser

Расширение `CyclicBarrier`, позволяющая регистрировать и удалять участников на каждый цикл барьера.

Semaphore

Барьер, разрешающий только указанному кол-во потоков захватить монитор. По сути расширяет функционал `Lock` возможность находиться в блоке нескольким потокам.

Executors

ExecutorService пришел на замену new Thread(runnable) чтобы упростить работу с потоками. ExecutorService помогает повторно использовать освободившиеся потоки, организовывать очереди из задач для пула потоков, подписываться на результат выполнения задачи. Вместо интерфейса Runnable пул использует интерфейс Callable (умеет возвращать результат и кидать ошибки).

ExecutorService pool = Executors.newFixedThreadPool(4) Future future = pool.submit(new Callable() < Object call() throws Exception < println("In thread") return "From thread" >>) println("From main") println(future.get()) try < pool.submit(new Callable() < Object call() throws Exception < throw new IllegalStateException() >>).get() > catch (ExecutionException e) ")> pool.shutdown() 

Метод invokeAll отдает управление вызвавшему потоку только по завершению всех задач. Метод invokeAny возвращает результат первой успешно выполненной задачи, отменяя все последующие.

ThreadPoolExecutor

Пул потоков с возможностью указывать рабочее и максимальное кол-во потоков в пуле, очередь для задач.

ScheduledThreadPoolExecutor

Расширяет функционал ThreadPoolExecutor возможностью выполнять задачи отложенно или регулярно.

ThreadPoolExecutor

Более легкий пул потоков для «самовоспроизводящих» задач. Пул ожидает вызовов `fork()` и `join()` методов у дочерних задач в родительской.

class LNode < Listchilds = [] def object > class Finder extends RecursiveTask  < LNode node Object expect protected LNode compute() < if (node?.object?.equals(expect)) < return node >node?.childs?.collect < new Finder(node: it, expect: expect).fork() >?.collect < it.join() >?.find < it != null >> > ForkJoinPool es = new ForkJoinPool() def invoke = es.invoke(new Finder( node: new LNode( childs: [ new LNode(object: "ivalid"), new LNode( object: "ivalid", childs: [new LNode(object: "test")] ) ] ), expect: "test" )) print("$") 
Accumulators _jdk 1.8_

Аккумуляторы позволяют выполнять примитивные операции (сумма/поиск максимального значения) над числовыми элементами в многопоточной среде без использования CAS.

Источник

How to Make Concurrent Service API Calls in Java Using Spring Boot

When you’re in a microservice environment, it often makes sense to make some calls to multiple services at the same time. This allows for the time an operation needs to complete to be reduced from the sum of all the time spent waiting to the maximum time spent over the span of calls.

For example, let’s say you make three calls in one service, and let’s further say that all three can be called in any order. If:

Then, if you do not make those calls concurrently, then you will have to wait 500 + 700 + 300 = 1500ms. If, however, you make all three at the same time and wait for them to complete before returning, you will only incur the cost of waiting for the longest service. In this case, that is Call #2, and means you will have to wait a total of 700ms.

To demo one way to accomplish this in Spring Boot, we’ll start by creating a service that simulates a long running process. We’ve done something similar to this in the post on caching in Nginx. Go to the spring initializr and select the «Web» dependency. In the resulting file, change the server port to be on 9000 by modifying the application.properties:

Then modify the entry point to look like this:

@SpringBootApplication @RestController public class SlowApplication < public static void main(String[] args) < SpringApplication.run(SlowApplication.class, args); >@GetMapping("/slow") public String slow() throws InterruptedException < Thread.sleep(2000); return ""; > > 

This returns a very simple JSON object after a time delay of two seconds, simulating a service that takes awhile to respond.

Making Concurrent Calls

Now we’ll create a service to consume this simple slow server, sending off a few calls and waiting for all of them to complete before returning.

Go back to the spring initializer, and select no additional dependencies. We’ll illustrate this with a command line runner.

You will need to add a few to your pom.xml, if your using maven:

  com.fasterxml.jackson.core jackson-databind  org.springframework spring-web 5.1.8.RELEASE  

We will use the rest template to send calls to the other service, and we will use Jackson to deserialize the response for us.

We will need an executor to spin up threads that our application can use, and we also need to add an @EnableAsync annotation to the entrypoint:

@SpringBootApplication @EnableAsync public class ConcurrentcallsApplication < public static void main(String[] args) < SpringApplication.run(ConcurrentcallsApplication.class, args); >@Bean public RestTemplate restTemplate() < return new RestTemplate(); >@Bean public Executor executor() < ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(5); executor.setQueueCapacity(500); executor.initialize(); return executor; >> 

We can then create a service class that will make calls to our other, «slow» microservice:

@Service public class SlowServiceCaller < @Autowired private RestTemplate restTemplate; @Async public CompletableFuturecallOtherService() < String localSlowServiceEndpoint = "http://localhost:9000/slow"; JsonNode responseObj = restTemplate.getForObject(localSlowServiceEndpoint, JsonNode.class); return CompletableFuture.completedFuture(responseObj); >> 

With this configuration, Spring will inject a proxy for every time SlowServiceCaller.callOtherService() is called, ensuring that the previously defined Executor is responsible for executing the calls. As long as we return a CompletableFuture here, it doesn’t necessarily matter what we do. This could be a database query, this could be a compute-intensive process using in memory data, or any other potentially long running process. Here, obviously, we’re firing off a network call.

To demonstrate this, we’ll fire up a CommandLineRunner like so:

@Component public class ConcurrentRunner implements CommandLineRunner < @Autowired SlowServiceCaller slowServiceCaller; @Override public void run(String. args) throws Exception < Instant start = Instant.now(); List> allFutures = new ArrayList<>(); for (int i = 0; i < 10; i++) < allFutures.add(slowServiceCaller.callOtherService()); >CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0])).join(); for (int i = 0; i < 10; i++) < System.out.println("response: " + allFutures.get(i).get().toString()); >System.out.println("Total time: " + Duration.between(start, Instant.now()).getSeconds()); > > 

Источник

Оцените статью