diff --git a/.gitignore b/.gitignore index bd3712a..9cfa0cf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ HELP.md -.gradle -build/ +.gradle/* +build/* !gradle/wrapper/gradle-wrapper.jar !**/src/main/**/build/ !**/src/test/**/build/ @@ -18,7 +18,7 @@ bin/ !**/src/test/**/bin/ ### IntelliJ IDEA ### -.idea +.idea/* *.iws *.iml *.ipr diff --git a/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java b/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java index 6e9cb2c..1b02bec 100644 --- a/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java +++ b/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java @@ -4,6 +4,11 @@ import com.thread.concurrency.counter.batch.ConcurrentBatchingCounter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; @@ -16,6 +21,7 @@ import java.util.function.Consumer; @SpringBootApplication +@EnableAsync public class SpringThreadConcurrencyApplication { public static void main(String[] args) { @@ -77,5 +83,14 @@ private static List range(int numberOfThreads, int expected) { } return result; } - + @Bean + public Executor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // Spring에서 사용하는 스레드를 제어한느 설정 + executor.setCorePoolSize(50); // thread-pool에 살아있는 thread의 최소 개수 + executor.setMaxPoolSize(50); // thread-pool에서 사용할 수 있는 최대 개수 + executor.setQueueCapacity(500); //thread-pool에 최대 queue 크기 + executor.setThreadNamePrefix("AsyncApp-"); + executor.initialize(); + return executor; + } } diff --git a/src/main/java/com/thread/concurrency/async/controller/AsyncController.java b/src/main/java/com/thread/concurrency/async/controller/AsyncController.java new file mode 100644 index 0000000..6be4cdf --- /dev/null +++ b/src/main/java/com/thread/concurrency/async/controller/AsyncController.java @@ -0,0 +1,35 @@ +package com.thread.concurrency.async.controller; + +import com.thread.concurrency.async.service.AsyncService; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Controller; + +import java.time.Duration; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +@Controller +public class AsyncController { + private final AsyncService asyncService; + + public AsyncController(AsyncService asyncService) { + this.asyncService = asyncService; + } + + @Async + public CompletableFuture calculateRunTime(int cnt, int waitTime) throws InterruptedException { + LocalTime lt1 = LocalTime.now(); + List> hellos = new ArrayList<>(); + for(int i=0; i results = hellos.stream().map(CompletableFuture::join) + .toList(); + LocalTime lt2 = LocalTime.now(); + long dif = Duration.between(lt1, lt2).toMillis(); + return CompletableFuture.completedFuture(dif+"가 걸렸습니다."); + } +} diff --git a/src/main/java/com/thread/concurrency/async/service/AsyncService.java b/src/main/java/com/thread/concurrency/async/service/AsyncService.java new file mode 100644 index 0000000..9cb2a00 --- /dev/null +++ b/src/main/java/com/thread/concurrency/async/service/AsyncService.java @@ -0,0 +1,18 @@ +package com.thread.concurrency.async.service; + +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.concurrent.CompletableFuture; + + +@Service +public class AsyncService { + @Async + public CompletableFuture voidParamStringReturn (long waitTime, String message) throws InterruptedException{ + System.out.println("비동기적으로 실행 - "+ + Thread.currentThread().getName()); + Thread.sleep(waitTime); + return CompletableFuture.completedFuture(message); + } +} diff --git a/src/main/java/com/thread/concurrency/counter/queueCounter/Consumer.java b/src/main/java/com/thread/concurrency/counter/queueCounter/Consumer.java new file mode 100644 index 0000000..11930df --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/queueCounter/Consumer.java @@ -0,0 +1,8 @@ +package com.thread.concurrency.counter.queueCounter; + +import java.util.concurrent.TimeUnit; + +public interface Consumer { + void consumeEvent(long timeout, TimeUnit unit) throws InterruptedException; + Long show(); +} diff --git a/src/main/java/com/thread/concurrency/counter/queueCounter/CounterConsumer.java b/src/main/java/com/thread/concurrency/counter/queueCounter/CounterConsumer.java new file mode 100644 index 0000000..964d86c --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/queueCounter/CounterConsumer.java @@ -0,0 +1,31 @@ +package com.thread.concurrency.counter.queueCounter; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class CounterConsumer implements Consumer { + private final BlockingQueue queue; + private final AtomicLong count = new AtomicLong(0); + + public CounterConsumer(BlockingQueue queue) { + this.queue = queue; + } + + public void consumeEvent(long timeout, TimeUnit unit) throws InterruptedException { + while (true) { + Long value = queue.poll(timeout, unit); + if(value == null){ + break; + } + count.addAndGet(value); + } + } + public Long show(){ + while(true){ + if(queue.isEmpty()){ + return count.get(); + } + } + } +} diff --git a/src/main/java/com/thread/concurrency/counter/queueCounter/CounterProducer.java b/src/main/java/com/thread/concurrency/counter/queueCounter/CounterProducer.java new file mode 100644 index 0000000..24afef8 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/queueCounter/CounterProducer.java @@ -0,0 +1,15 @@ +package com.thread.concurrency.counter.queueCounter; + +import java.util.concurrent.BlockingQueue; + +public class CounterProducer implements Producer{ + private final BlockingQueue queue; + + public CounterProducer(BlockingQueue queue) { + this.queue = queue; + } + + public void add(long value) throws InterruptedException { + queue.put(value); + } +} diff --git a/src/main/java/com/thread/concurrency/counter/queueCounter/Producer.java b/src/main/java/com/thread/concurrency/counter/queueCounter/Producer.java new file mode 100644 index 0000000..71d943d --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/queueCounter/Producer.java @@ -0,0 +1,5 @@ +package com.thread.concurrency.counter.queueCounter; + +public interface Producer { + void add(long value) throws InterruptedException; +} diff --git a/src/test/java/com/thread/concurrency/CounterTest.java b/src/test/java/com/thread/concurrency/CounterTest.java new file mode 100644 index 0000000..d88727d --- /dev/null +++ b/src/test/java/com/thread/concurrency/CounterTest.java @@ -0,0 +1,54 @@ +package com.thread.concurrency; + +import com.thread.concurrency.counter.*; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.springframework.boot.test.context.SpringBootTest; + +import java.time.Duration; +import java.time.LocalTime; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Stream; + +@SpringBootTest +public class CounterTest { + private final int valueToAdd = 1; + private final int nAddsPerThread = 50000000; + private final int nThreads = 9; + + public static Stream counterProvider() { + return Stream.of(new AtomicCounter(), new CompletableFutureCounter(), new SynchronizedCounter()); + } + + @ParameterizedTest + @MethodSource("counterProvider") + @DisplayName("스레드 안전한 카운터로 동시에 여러 더하기 수행하기.") + public void 여러_더하기_수행_Executor(Counter counter) throws InterruptedException { + LocalTime lt1 = LocalTime.now(); + int initalCount = counter.show(); + + ExecutorService service = Executors.newFixedThreadPool(nThreads); + CountDownLatch latch = new CountDownLatch(nThreads); + for (int i = 0; i < nThreads; i++) { + service.submit(() -> { + for(int j=0; j> hellos = new ArrayList<>(); + for(int i=0; i<10; i++){ + hellos.add(asyncController.calculateRunTime(10, 1000)); + } + // 모든 비동기 호출이 완료될 때까지 대기하고 결과를 리스트에 넣기 + List results = hellos.stream().map(CompletableFuture::join) + .toList(); + results.forEach(logger::info); + } +} diff --git a/src/test/java/com/thread/concurrency/async/AsyncServiceTest.java b/src/test/java/com/thread/concurrency/async/AsyncServiceTest.java new file mode 100644 index 0000000..b4ea59c --- /dev/null +++ b/src/test/java/com/thread/concurrency/async/AsyncServiceTest.java @@ -0,0 +1,52 @@ +package com.thread.concurrency.async; + +import com.thread.concurrency.async.service.AsyncService; +import org.junit.jupiter.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +@SpringBootTest +public class AsyncServiceTest { + private static final Logger logger = LoggerFactory.getLogger(AsyncServiceTest.class); + @Autowired + private AsyncService asyncService; + + @Test + @DisplayName("입력은 void 출력은 String인 비동기 함수 단일 호출") + public void testGetString() throws ExecutionException, InterruptedException { + CompletableFuture helloWorld = asyncService.voidParamStringReturn(1000, "기본 메세지"); + Assertions.assertEquals("기본 메세지",helloWorld.get()); + } + + @Test + @DisplayName("입력은 void 출력은 String인 비동기 함수 다중 호출") + public void testGetMultiString() throws InterruptedException { + List> hellos = new ArrayList<>(); + for(int i=0; i<100; i++){ + hellos.add(asyncService.voidParamStringReturn(1000,i+"번째 메세지")); + } + // 모든 비동기 호출이 완료될 때까지 대기하고 결과를 리스트에 넣기 + List results = hellos.stream().map(CompletableFuture::join) + .toList(); + results.forEach(logger::info); + } + + @Test + @DisplayName("입력은 void 출력은 String인 비동기 함수 단일 호출 타임아웃 발생.") + public void testGetStringTimeOutIsThisAsync() throws InterruptedException { + // voidParamStringReturn가 비동기 메서드인지 의문이 생김. + CompletableFuture completableFuture = asyncService.voidParamStringReturn(4000, "타임아웃 발생 안 함!"); + long timeOutValue = 1; + TimeUnit timeUnit = TimeUnit.SECONDS; + // 1초가 지난 후 타임아웃 발생 + Assertions.assertThrows(ExecutionException.class, () -> completableFuture.orTimeout(timeOutValue,timeUnit).get()); + } +} diff --git a/src/test/java/com/thread/concurrency/queueCounter/QueueCounterTest.java b/src/test/java/com/thread/concurrency/queueCounter/QueueCounterTest.java new file mode 100644 index 0000000..56eb89d --- /dev/null +++ b/src/test/java/com/thread/concurrency/queueCounter/QueueCounterTest.java @@ -0,0 +1,115 @@ +package com.thread.concurrency.queueCounter; + +import com.thread.concurrency.counter.queueCounter.Consumer; +import com.thread.concurrency.counter.queueCounter.CounterConsumer; +import com.thread.concurrency.counter.queueCounter.CounterProducer; +import com.thread.concurrency.counter.queueCounter.Producer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.LocalTime; +import java.util.concurrent.*; + +public class QueueCounterTest { + private final int valueToAdd = 1; + private final int nAddsPerThread = 50000000; + private final int producerNThreads = 9; + private final int consumerNThreads = 9; + private final long timeout = 1; + private final int queueCapacity = 1000; + private final TimeUnit unit = TimeUnit.SECONDS; + + @Test + @DisplayName("멀티 프로듀서 싱글 컨슈머") + public void 멀티_프로듀서_싱글_컨슈머() throws InterruptedException { + BlockingQueue queue = new LinkedBlockingQueue<>(queueCapacity); + Consumer consumer = new CounterConsumer(queue); + Producer producer = new CounterProducer(queue); + LocalTime lt1 = LocalTime.now(); + Long initalCount = consumer.show(); + ExecutorService producerService = Executors.newFixedThreadPool(producerNThreads); + ExecutorService consumerService = Executors.newFixedThreadPool(consumerNThreads); + CountDownLatch producerLatch = new CountDownLatch(producerNThreads); + CountDownLatch consumerLatch = new CountDownLatch(1); + + // 프로듀서 스레드 생성 + for (int i = 0; i < producerNThreads; i++) { + producerService.submit(() -> { + try { + for(int j=0; j { + try { + consumer.consumeEvent(timeout, unit); + consumerLatch.countDown(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + consumerLatch.await(); + producerLatch.await(); + + Long finalCount = consumer.show(); + LocalTime lt2 = LocalTime.now(); + long dif = Duration.between(lt1, lt2).getNano(); + System.out.println("멀티_프로듀서_단일_컨슈머 테스트가 걸린 시간 : " + dif / 1000000 + "ms"); + Assertions.assertEquals(initalCount + nAddsPerThread*producerNThreads*valueToAdd, finalCount); + } + @Test + @DisplayName("멀티 프로듀서 멀티 컨슈머") + public void 멀티_프로듀서_멀티_컨슈머() throws InterruptedException { + BlockingQueue queue = new LinkedBlockingQueue<>(queueCapacity); + Consumer consumer = new CounterConsumer(queue); + Producer producer = new CounterProducer(queue); + LocalTime lt1 = LocalTime.now(); + Long initalCount = consumer.show(); + ExecutorService producerService = Executors.newFixedThreadPool(producerNThreads); + ExecutorService consumerService = Executors.newFixedThreadPool(consumerNThreads); + CountDownLatch producerLatch = new CountDownLatch(producerNThreads); + CountDownLatch consumerLatch = new CountDownLatch(consumerNThreads); + + // 프로듀서 스레드 생성 + for (int i = 0; i < producerNThreads; i++) { + producerService.submit(() -> { + try { + for(int j=0; j { + try { + consumer.consumeEvent(timeout, unit); + consumerLatch.countDown(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + consumerLatch.await(); + producerLatch.await(); + + Long finalCount = consumer.show(); + LocalTime lt2 = LocalTime.now(); + long dif = Duration.between(lt1, lt2).getNano(); + System.out.println("멀티_프로듀서_멀티_컨슈머 테스트가 걸린 시간 : " + dif / 1000000 + "ms"); + Assertions.assertEquals(initalCount + nAddsPerThread*producerNThreads*valueToAdd, finalCount); + } +}