diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index dd84ea7..3205926 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -12,6 +12,7 @@ A clear and concise description of what the bug is. **To Reproduce** Steps to reproduce the behavior: + 1. Go to '...' 2. Click on '....' 3. Scroll down to '....' @@ -24,15 +25,17 @@ A clear and concise description of what you expected to happen. If applicable, add screenshots to help explain your problem. **Desktop (please complete the following information):** - - OS: [e.g. iOS] - - Browser [e.g. chrome, safari] - - Version [e.g. 22] + +- OS: [e.g. iOS] +- Browser [e.g. chrome, safari] +- Version [e.g. 22] **Smartphone (please complete the following information):** - - Device: [e.g. iPhone6] - - OS: [e.g. iOS8.1] - - Browser [e.g. stock browser, safari] - - Version [e.g. 22] + +- Device: [e.g. iPhone6] +- OS: [e.g. iOS8.1] +- Browser [e.g. stock browser, safari] +- Version [e.g. 22] **Additional context** Add any other context about the problem here. diff --git a/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java b/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java index 42b9717..43c5be1 100644 --- a/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java +++ b/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java @@ -1,13 +1,81 @@ package com.thread.concurrency; +import com.thread.concurrency.counter.batch.BatchCounter; +import com.thread.concurrency.counter.batch.ConcurrentBatchingCounter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + @SpringBootApplication public class SpringThreadConcurrencyApplication { public static void main(String[] args) { SpringApplication.run(SpringThreadConcurrencyApplication.class, args); + + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage initialMemoryUsage = memoryMXBean.getHeapMemoryUsage(); + long initialTime = System.currentTimeMillis(); + + // Run the test + int totalRequest = Integer.MAX_VALUE; + conditionalMultiThreading(totalRequest); + + MemoryUsage finalMemoryUsage = memoryMXBean.getHeapMemoryUsage(); + long finalTime = System.currentTimeMillis(); + long elapsedTime = finalTime - initialTime; + long usedMemory = finalMemoryUsage.getUsed() - initialMemoryUsage.getUsed(); + + // request with comma + System.out.println("Total request: " + String.format("%,d", totalRequest)); + // seconds + System.out.println("Elapsed time: " + elapsedTime / 1000 + " s"); + // megabytes + System.out.println("Used memory: " + usedMemory / 1024 / 1024 + " MB"); + } + + private static void conditionalMultiThreading(int expected) { + BatchCounter counter = new ConcurrentBatchingCounter(); + + // given + int numberOfThreads = 128; + List iterPerThread = range(numberOfThreads, expected); + Consumer task = (Integer number) -> { + for (int i = 0; i < number; i++) { + counter.add(1); + } + counter.flush(); + }; + // when + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { + List> futures = iterPerThread.stream().map(number -> CompletableFuture.runAsync(() -> task.accept(number), executor)).toList(); + futures.forEach(CompletableFuture::join); + } + // then + assert expected == counter.show(); + } + + private static List range(int numberOfThreads, int expected) { + int baseValue = expected / numberOfThreads; + int remainder = expected % numberOfThreads; + + List result = new ArrayList<>(); + for (int i = 0; i < numberOfThreads; i++) { + if (i < remainder) { + result.add(baseValue + 1); + } else { + result.add(baseValue); + } + } + return result; } } diff --git a/src/main/java/com/thread/concurrency/counter/BatchingCounter.java b/src/main/java/com/thread/concurrency/counter/BatchingCounter.java deleted file mode 100644 index f2396ef..0000000 --- a/src/main/java/com/thread/concurrency/counter/BatchingCounter.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.thread.concurrency.counter; - -import org.springframework.stereotype.Component; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -@Component -public class BatchingCounter implements Counter { - private static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - private static final ConcurrentLinkedQueue jobQueue = new ConcurrentLinkedQueue<>(); - private static volatile int count = 100; - - public BatchingCounter() { - Runnable runnableTask = () -> { - while (!jobQueue.isEmpty()) { - synchronized (this) { - var value = jobQueue.poll(); - count += value == null ? 0 : value; - } - } - }; - // context switching을 최소화하는 최소한의 시간마다 실행하여 성능 향상 - scheduledExecutorService.scheduleAtFixedRate(runnableTask, 4, 5, TimeUnit.MILLISECONDS); - } - - @Override - public void add(int value) { - jobQueue.add(value); - } - - @Override - public int show() { - return count; - } -} diff --git a/src/main/java/com/thread/concurrency/counter/batch/BatchCounter.java b/src/main/java/com/thread/concurrency/counter/batch/BatchCounter.java new file mode 100644 index 0000000..bfd3a77 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/batch/BatchCounter.java @@ -0,0 +1,7 @@ +package com.thread.concurrency.counter.batch; + +import com.thread.concurrency.counter.Counter; + +public interface BatchCounter extends Counter { + void flush(); +} diff --git a/src/main/java/com/thread/concurrency/counter/batch/ConcurrentBatchingCounter.java b/src/main/java/com/thread/concurrency/counter/batch/ConcurrentBatchingCounter.java new file mode 100644 index 0000000..07b70be --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/batch/ConcurrentBatchingCounter.java @@ -0,0 +1,39 @@ +package com.thread.concurrency.counter.batch; + +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + +@Component +public class ConcurrentBatchingCounter implements BatchCounter { + + private final AtomicLong counter = new AtomicLong(); + private final ConcurrentMap batch = new ConcurrentHashMap<>(); + + @Override + public void add(int value) { + var threadId = Thread.currentThread().threadId(); + batch.computeIfAbsent(threadId, k -> new LongAdder()).add(value); + } + + @Override + public int show() { + return counter.intValue(); + } + + private void flush(long threadId) { + var value = batch.remove(threadId); + if (value != null) { + counter.addAndGet(value.longValue()); + } + } + + @Override + public void flush() { + var threadId = Thread.currentThread().threadId(); + flush(threadId); + } +} diff --git a/src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java b/src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java new file mode 100644 index 0000000..cbd6ccd --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java @@ -0,0 +1,48 @@ +package com.thread.concurrency.counter.batch; + +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +@Component +@Profile("dev") +public class ConcurrentParameterizedBatchingCounter implements BatchCounter { + + private static final int BATCH_SIZE = 100; + + private final AtomicLong counter = new AtomicLong(); + private final ConcurrentMap> batch = new ConcurrentHashMap<>(); + + @Override + public void add(int value) { + var threadId = Thread.currentThread().threadId(); + batch.computeIfAbsent(threadId, k -> new ArrayList<>()).add(value); + if (batch.get(threadId).size() >= BATCH_SIZE) { + flush(threadId); + } + } + + @Override + public int show() { + return counter.intValue(); + } + + private void flush(long threadId) { + var list = batch.getOrDefault(threadId, null); + if (list != null && !list.isEmpty()) { + counter.addAndGet(list.stream().mapToLong(Integer::longValue).sum()); + batch.remove(threadId); + } + } + + @Override + public void flush() { + var threadId = Thread.currentThread().threadId(); + flush(threadId); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index e92628c..4cc2930 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1 +1,2 @@ spring.application.name=spring-thread-concurrency +spring.profiles.active=default diff --git a/src/test/java/com/thread/concurrency/counter/CounterTest.java b/src/test/java/com/thread/concurrency/counter/CounterTest.java index 3493483..2958804 100644 --- a/src/test/java/com/thread/concurrency/counter/CounterTest.java +++ b/src/test/java/com/thread/concurrency/counter/CounterTest.java @@ -5,60 +5,44 @@ import org.junit.jupiter.params.provider.MethodSource; import org.springframework.boot.test.context.SpringBootTest; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Stream; -import static java.lang.Thread.sleep; - @SpringBootTest public class CounterTest { public static Stream counterProvider() { - return Stream.of(new BatchingCounter(), new LockCounter(), new PollingCounter(), new BasicCounter()); + return Stream.of(new LockCounter(), new PollingCounter()); } - private static void assertThen(Counter counter, int expectedValue, int actualValue) { - System.out.println("Expected value: " + expectedValue); - System.out.println("Actual value: " + actualValue); - if (counter instanceof BasicCounter) { - System.out.println("BasicCounter is not thread-safe"); - Assertions.assertNotEquals(expectedValue, actualValue); - } else { - System.out.println("Counter is thread-safe"); - Assertions.assertEquals(expectedValue, actualValue); + private static void whenAdd(Counter counter, int nThreads, int addPerThread) { + try (ExecutorService executor = Executors.newFixedThreadPool(nThreads)) { + for (int i = 0; i < nThreads; i++) { + executor.submit(() -> { + for (int j = 0; j < addPerThread; j++) { + counter.add(1); + } + }); + } } } @ParameterizedTest @MethodSource("counterProvider") - public void stressTest(Counter counter) throws InterruptedException { - int initialValue = counter.show(); + public void stressTest(Counter counter) { + // given int nThreads = 100; - int nAddsPerThread = 1000; - int valueToAdd = 1; - int expectedValue = initialValue + nThreads * nAddsPerThread * valueToAdd; - - - // define runnable job - CountDownLatch latch = new CountDownLatch(nThreads); - Runnable job = () -> { - try { - latch.countDown(); // decrease the count - latch.await(); // wait until the count reaches 0 - for (int i = 0; i < nAddsPerThread; i++) { - counter.add(valueToAdd); - } - } catch (InterruptedException ignored) { - } - }; - - // start nThreads threads - for (int i = 0; i < nThreads; i++) { - Thread.ofVirtual().start(job); - } + int addPerThread = 1000; + int expectedValue = counter.show() + nThreads * addPerThread; - sleep(300); // wait for all threads to finish + // when + long start = System.currentTimeMillis(); + whenAdd(counter, nThreads, addPerThread); + long end = System.currentTimeMillis(); - assertThen(counter, expectedValue, counter.show()); + // then + Assertions.assertEquals(expectedValue, counter.show()); + System.out.println("Time elapsed: " + (end - start) + "ms"); } } diff --git a/src/test/java/com/thread/concurrency/SpringThreadConcurrencyApplicationTests.java b/src/test/java/com/thread/concurrency/counter/SpringThreadConcurrencyApplicationTests.java similarity index 77% rename from src/test/java/com/thread/concurrency/SpringThreadConcurrencyApplicationTests.java rename to src/test/java/com/thread/concurrency/counter/SpringThreadConcurrencyApplicationTests.java index 675a607..60cb352 100644 --- a/src/test/java/com/thread/concurrency/SpringThreadConcurrencyApplicationTests.java +++ b/src/test/java/com/thread/concurrency/counter/SpringThreadConcurrencyApplicationTests.java @@ -1,15 +1,15 @@ -package com.thread.concurrency; +package com.thread.concurrency.counter; +import com.thread.concurrency.SpringThreadConcurrencyApplication; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; + import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @SpringBootTest class SpringThreadConcurrencyApplicationTests { - @Test void contextLoads() { assertDoesNotThrow(() -> SpringThreadConcurrencyApplication.main(new String[]{})); } - } diff --git a/src/test/java/com/thread/concurrency/counter/batch/BatchCounterTest.java b/src/test/java/com/thread/concurrency/counter/batch/BatchCounterTest.java new file mode 100644 index 0000000..0c68150 --- /dev/null +++ b/src/test/java/com/thread/concurrency/counter/batch/BatchCounterTest.java @@ -0,0 +1,110 @@ +package com.thread.concurrency.counter.batch; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class BatchCounterTest { + private final List numbers; + private final Runnable defaultTask; + private final int partialSum; + private BatchCounter counter; + + public BatchCounterTest() { + this.numbers = range(); + this.defaultTask = () -> { + for (Integer number : numbers) { + counter.add(number); + } + counter.flush(); + }; + this.partialSum = numbers.stream().reduce(0, Integer::sum); + } + + private static List range() { + return IntStream.range(0, 1000).boxed().collect(Collectors.toList()); + } + + private static List range(int numberOfThreads, int expected) { + int baseValue = expected / numberOfThreads; + int remainder = expected % numberOfThreads; + + List result = new ArrayList<>(); + for (int i = 0; i < numberOfThreads; i++) { + if (i < remainder) { + result.add(baseValue + 1); + } else { + result.add(baseValue); + } + } + return result; + } + + @BeforeEach + void setUp() { + this.counter = new ConcurrentBatchingCounter(); + } + + + @Test + void singleThreading() { + defaultTask.run(); + assertEquals(partialSum, counter.show()); + } + + + @Test + void conditionalMultiThreading() { + // given + int numberOfThreads = 2; + int expected = Integer.MAX_VALUE / 1024; + List iterPerThread = range(numberOfThreads, expected); + Consumer task = (Integer number) -> { + for (int i = 0; i < number; i++) { + counter.add(1); + } + counter.flush(); + }; + // when + try (ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads)) { + for (int num : iterPerThread) { + executor.submit(() -> task.accept(num)); + } + } + // then + assertEquals(expected, counter.show()); + } + + @Test + void conditionalAsyncVirtualMultiThreading() { + // given + int numberOfThreads = 2; + int expected = Integer.MAX_VALUE / 1024; + List iterPerThread = range(numberOfThreads, expected); + Consumer task = (Integer number) -> { + for (int i = 0; i < number; i++) { + counter.add(1); + } + counter.flush(); + }; + // when + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { + List> futures = iterPerThread.stream() + .map(number -> CompletableFuture.runAsync(() -> task.accept(number), executor)) + .toList(); + futures.forEach(CompletableFuture::join); + } + // then + assertEquals(expected, counter.show()); + } +}