diff --git a/apache-kafka-4/pom.xml b/apache-kafka-4/pom.xml index ffdd84b03754..1cad893182ff 100644 --- a/apache-kafka-4/pom.xml +++ b/apache-kafka-4/pom.xml @@ -100,6 +100,7 @@ + 21 3.9.0 1.19.3 1.19.3 diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java new file mode 100644 index 000000000000..b81c036297e9 --- /dev/null +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java @@ -0,0 +1,60 @@ +package com.baeldung.kafka.commitfailure.batch; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerService { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); + private final KafkaConsumer consumer; + private final AtomicBoolean running = new AtomicBoolean(true); + + public KafkaConsumerService(Properties consumerProps, String topic) { + this.consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(List.of(topic)); + } + + public void consume() { + try { + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(250)); + if (records.isEmpty()) { + continue; + } + simulateDBCall(records); + consumer.commitSync(); + } + } catch (WakeupException ex) { + if (running.get()) { + log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + throw ex; + } + } finally { + consumer.close(); + } + } + + public void shutdown() { + running.compareAndSet(true, false); + consumer.wakeup(); + } + + private void simulateDBCall(ConsumerRecords records) { + try { + log.info("Simulating long running batch db update for records {}", records.count()); + Thread.sleep(1000L); + } catch (InterruptedException ex) { + Thread.currentThread() + .interrupt(); + throw new RuntimeException(ex); + } + } +} diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java new file mode 100644 index 000000000000..7d9a2dafe47b --- /dev/null +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java @@ -0,0 +1,172 @@ +package com.baeldung.kafka.commitfailure.fixed.async; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.StreamSupport; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerService { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); + private final KafkaConsumer consumer; + private final AtomicBoolean running = new AtomicBoolean(true); + private final ExecutorService workers; + private final Map committableOffsets = new ConcurrentHashMap<>(); + + public KafkaConsumerService(Properties consumerProps, String topic) { + this.consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + try { + commitOffsets(partitions); + } catch (Exception ex) { + log.error("Commit failed during rebalance {} {}", ex.getMessage(), ex, ex.getCause()); + } finally { + partitions.forEach(committableOffsets::remove); + } + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + partitions.forEach(committableOffsets::remove); + } + }); + workers = Executors.newVirtualThreadPerTaskExecutor(); + } + + public void consume() { + try { + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + if (records.isEmpty()) { + continue; + } + + List> futures = StreamSupport.stream(records.spliterator(), false) + .map(record -> CompletableFuture.runAsync(() -> simulateDBUpdate(record), workers) + .whenComplete((ignored, ex) -> { + if (ex == null) { + markComplete(record); + } else { + log.error("Failed offset and send to DLQ {} {} {}", record.offset(), record.key(), ex.getMessage()); + } + }) + .exceptionally(ex -> null)) + .toList(); + + try { + CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)) + .orTimeout(1000, TimeUnit.MILLISECONDS) + .join(); + } catch (CompletionException ex) { + log.error("Batch processing timed out or failed {} {}", ex.getMessage(), ex, ex.getCause()); + } + + commitOffsets(); + } + } catch (WakeupException ex) { + if (running.get()) { + log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + throw ex; + } + } finally { + commitOffsets(); + consumer.close(); + } + } + + public void shutdown() { + running.compareAndSet(true, false); + consumer.wakeup(); + try { + workers.shutdown(); + if (!workers.awaitTermination(60, TimeUnit.SECONDS)) { + workers.shutdownNow(); + } + } catch (InterruptedException ex) { + workers.shutdownNow(); + Thread.currentThread() + .interrupt(); + } + } + + private void simulateDBUpdate(ConsumerRecord record) { + try { + log.info("Simulating a db call - record key {} value {}", record.key(), record.value()); + Thread.sleep(300L); + } catch (InterruptedException ex) { + Thread.currentThread() + .interrupt(); + throw new RuntimeException(ex); + } + } + + private void markComplete(ConsumerRecord record) { + TopicPartition tp = new TopicPartition(record.topic(), record.partition()); + committableOffsets.computeIfAbsent(tp, k -> new AtomicLong(-1)) + .accumulateAndGet(record.offset() + 1, Math::max); + } + + private void commitOffsets() { + Map toCommit = new HashMap<>(); + + committableOffsets.forEach((tp, atomicOffset) -> { + long val = atomicOffset.get(); + if (val != -1L) { + toCommit.put(tp, new OffsetAndMetadata(val)); + } + }); + + if (toCommit.isEmpty()) { + return; + } + consumer.commitSync(toCommit); + toCommit.forEach((tp, meta) -> { + AtomicLong ref = committableOffsets.get(tp); + if (ref != null) { + ref.compareAndSet(meta.offset(), -1L); + } + }); + } + + private void commitOffsets(Collection partitions) { + Map toCommit = new HashMap<>(); + + partitions.forEach(tp -> { + AtomicLong ref = committableOffsets.get(tp); + if (ref != null) { + long val = ref.get(); + if (val != -1L) { + toCommit.put(tp, new OffsetAndMetadata(val)); + } + } + }); + + if (toCommit.isEmpty()) { + return; + } + consumer.commitSync(toCommit); + } +} diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java new file mode 100644 index 000000000000..7bc84ddd3948 --- /dev/null +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java @@ -0,0 +1,60 @@ +package com.baeldung.kafka.commitfailure.fixed.batch; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerService { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); + private final KafkaConsumer consumer; + private final AtomicBoolean running = new AtomicBoolean(true); + + public KafkaConsumerService(Properties consumerProps, String topic) { + this.consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(List.of(topic)); + } + + public void consume() { + try { + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + if (records.isEmpty()) { + continue; + } + simulateDBUpdate(records); + consumer.commitSync(); + } + } catch (WakeupException ex) { + if (running.get()) { + log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + throw ex; + } + } finally { + consumer.close(); + } + } + + public void shutdown() { + running.compareAndSet(true, false); + consumer.wakeup(); + } + + private void simulateDBUpdate(ConsumerRecords records) { + try { + log.info("Simulating long running batch db update {}", records.count()); + Thread.sleep(1000L); + } catch (InterruptedException ex) { + Thread.currentThread() + .interrupt(); + throw new RuntimeException(ex); + } + } +} diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java new file mode 100644 index 000000000000..7d677f7d8cc1 --- /dev/null +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java @@ -0,0 +1,61 @@ +package com.baeldung.kafka.commitfailure.fixed.sequential; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerService { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); + private final KafkaConsumer consumer; + private final AtomicBoolean running = new AtomicBoolean(true); + + public KafkaConsumerService(Properties consumerProps, String topic) { + this.consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(List.of(topic)); + } + + public void consume() { + try { + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + if (records.isEmpty()) { + continue; + } + records.forEach(this::simulateDBUpdate); + consumer.commitSync(); + } + } catch (WakeupException ex) { + if (running.get()) { + log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + throw ex; + } + } finally { + consumer.close(); + } + } + + public void shutdown() { + running.compareAndSet(true, false); + consumer.wakeup(); + } + + private void simulateDBUpdate(ConsumerRecord record) { + try { + log.info("Simulating a db call - record key {} value {}", record.key(), record.value()); + Thread.sleep(100L); + } catch (InterruptedException ex) { + Thread.currentThread() + .interrupt(); + throw new RuntimeException(ex); + } + } +} diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java new file mode 100644 index 000000000000..078553600015 --- /dev/null +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java @@ -0,0 +1,61 @@ +package com.baeldung.kafka.commitfailure.sequential; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerService { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); + private final KafkaConsumer consumer; + private final AtomicBoolean running = new AtomicBoolean(true); + + public KafkaConsumerService(Properties props, String topic) { + this.consumer = new KafkaConsumer<>(props); + consumer.subscribe(List.of(topic)); + } + + public void consume() { + try { + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + if (records.isEmpty()) { + continue; + } + records.forEach(this::simulateDBCall); + consumer.commitSync(); + } + } catch (WakeupException ex) { + if (running.get()) { + log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + throw ex; + } + } finally { + consumer.close(); + } + } + + public void shutdown() { + running.compareAndSet(true, false); + consumer.wakeup(); + } + + private void simulateDBCall(ConsumerRecord record) { + try { + log.info("Simulating a DB call - record key {} value {}", record.key(), record.value()); + Thread.sleep(100L); + } catch (InterruptedException ex) { + Thread.currentThread() + .interrupt(); + throw new RuntimeException(ex); + } + } +} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java new file mode 100644 index 000000000000..2fa5679028bf --- /dev/null +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java @@ -0,0 +1,77 @@ +package com.baeldung.kafka.commitfailure.batch; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class KafkaConsumerServiceLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); + + @Test + void givenProducerMessagesAreSent_whenConsumerIsRunningAsBatch_thenConsumerThrowsCommitFailedException() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference uncaughtException = new AtomicReference<>(); + + KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); + Thread th = new Thread(kafkaConsumerService::consume); + th.setUncaughtExceptionHandler((thread, ex) -> { + uncaughtException.set(ex); + countDownLatch.countDown(); + }); + th.start(); + + try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { + for (int num = 0; num < 300; num++) { + producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num)); + producer.flush(); + } + } + + countDownLatch.await(30, TimeUnit.SECONDS); + assertThat(uncaughtException.get()).isInstanceOf(CommitFailedException.class); + + kafkaConsumerService.shutdown(); + } + + private static Properties getProducerConfig() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProperties; + } + + private static Properties getConsumerConfig() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-batch-consumer-app"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); + consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 500); + + return consumerProperties; + } +} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java new file mode 100644 index 000000000000..0f6523cc2eaa --- /dev/null +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java @@ -0,0 +1,76 @@ +package com.baeldung.kafka.commitfailure.fixed.async; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class KafkaConsumerServiceLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); + + @Test + void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerDoesNotThrowCommitFailedException() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference uncaughtException = new AtomicReference<>(); + KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); + + Thread th = new Thread(kafkaConsumerService::consume); + th.setUncaughtExceptionHandler((thread, ex) -> { + uncaughtException.set(ex); + countDownLatch.countDown(); + }); + th.start(); + + try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { + for (int num = 0; num < 100; num++) { + producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num)); + producer.flush(); + } + } + + countDownLatch.await(30, TimeUnit.SECONDS); + assertThat(uncaughtException.get()).doesNotThrowAnyException(); + + kafkaConsumerService.shutdown(); + } + + private static Properties getProducerConfig() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProperties; + } + + private static Properties getConsumerConfig() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-async-consumer-app"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); + consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000); + + return consumerProperties; + } +} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java new file mode 100644 index 000000000000..25e3e5aa90fc --- /dev/null +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java @@ -0,0 +1,75 @@ +package com.baeldung.kafka.commitfailure.fixed.batch; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class KafkaConsumerServiceLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); + + @Test + void givenProducerMessagesAreSent_whenConsumerIsRunningAsBatch_thenConsumesDoesNotThrowCommitFailedException() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference uncaughtException = new AtomicReference<>(); + + KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); + Thread th = new Thread(kafkaConsumerService::consume); + th.setUncaughtExceptionHandler((thread, ex) -> { + uncaughtException.set(ex); + countDownLatch.countDown(); + }); + th.start(); + + try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { + for (int num = 0; num < 300; num++) { + producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num)); + producer.flush(); + } + } + + countDownLatch.await(30, TimeUnit.SECONDS); + assertThat(uncaughtException.get()).doesNotThrowAnyException(); + kafkaConsumerService.shutdown(); + } + + private static Properties getProducerConfig() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProperties; + } + + private static Properties getConsumerConfig() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-batch-consumer-app-fixed"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); + consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3000); + + return consumerProperties; + } +} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java new file mode 100644 index 000000000000..e51ae03f379b --- /dev/null +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java @@ -0,0 +1,75 @@ +package com.baeldung.kafka.commitfailure.fixed.sequential; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class KafkaConsumerServiceLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); + + @Test + void givenProducerMessagesIsSent_whenConsumerIsRunning_thenConsumerDoesNotThrowsCommitFailedException() throws InterruptedException { + KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference uncaughtException = new AtomicReference<>(); + + Thread th = new Thread(kafkaConsumerService::consume); + th.setUncaughtExceptionHandler((thread, ex) -> { + uncaughtException.set(ex); + countDownLatch.countDown(); + }); + + th.start(); + + try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { + producer.send(new ProducerRecord<>("test-topic", "x1", "test")); + producer.flush(); + } + + countDownLatch.await(10, TimeUnit.SECONDS); + assertThat(uncaughtException.get()).doesNotThrowAnyException(); + + kafkaConsumerService.shutdown(); + } + + private static Properties getProducerConfig() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProperties; + } + + private static Properties getConsumerConfig() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-seq-consumer-app-fixed"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); + consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300); + + return consumerProperties; + } +} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java new file mode 100644 index 000000000000..f4648c8e0c0c --- /dev/null +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java @@ -0,0 +1,77 @@ +package com.baeldung.kafka.commitfailure.sequential; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class KafkaConsumerServiceLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); + + @Test + void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerThrowsCommitFailedException() throws InterruptedException { + KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference uncaughtException = new AtomicReference<>(); + + Thread th = new Thread(kafkaConsumerService::consume); + th.setUncaughtExceptionHandler((thread, ex) -> { + uncaughtException.set(ex); + countDownLatch.countDown(); + }); + + th.start(); + + try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { + producer.send(new ProducerRecord<>("test-topic", "x1", "test1")); + producer.flush(); + } + + countDownLatch.await(30, TimeUnit.SECONDS); + assertThat(uncaughtException.get()).isInstanceOf(CommitFailedException.class); + + kafkaConsumerService.shutdown(); + } + + private static Properties getProducerConfig() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProperties; + } + + private static Properties getConsumerConfig() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "seq-consumer-app"); + consumerProperties.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "consumer-1"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); + consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 50); + + return consumerProperties; + } +} diff --git a/core-java-modules/core-java-concurrency-advanced-7/pom.xml b/core-java-modules/core-java-concurrency-advanced-7/pom.xml index 32ac6448175d..0d8557a69133 100644 --- a/core-java-modules/core-java-concurrency-advanced-7/pom.xml +++ b/core-java-modules/core-java-concurrency-advanced-7/pom.xml @@ -20,10 +20,72 @@ ${awaitility.version} test + + org.openjdk.jmh + jmh-core + ${jmh.core.version} + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.core.version} + + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${maven.compiler.source.version} + ${maven.compiler.target.version} + false + + --enable-preview + + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.core.version} + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + --enable-preview + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + shade + + benchmarks + + + org.openjdk.jmh.Main + + + + + + + + + 1.7.0 + 21 + 21 + 1.37 diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/classinit/HeavyClass.java b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/classinit/HeavyClass.java new file mode 100644 index 000000000000..4ddf22ca3b8b --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/classinit/HeavyClass.java @@ -0,0 +1,29 @@ +package com.baeldung.virtualthread.classinit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HeavyClass { + + private static final Logger LOGGER = LoggerFactory.getLogger(HeavyClass.class); + + static { + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + + LOGGER.info("static initialization done"); + } + + { + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + + LOGGER.info("initialization done"); + } +} \ No newline at end of file diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/classloader/CustomClassLoader.java b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/classloader/CustomClassLoader.java new file mode 100644 index 000000000000..27a7fcb15f58 --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/classloader/CustomClassLoader.java @@ -0,0 +1,55 @@ +package com.baeldung.virtualthread.classloader; + +import java.io.IOException; +import java.nio.file.Path; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CustomClassLoader extends ClassLoader { + + private static final Logger LOGGER = LoggerFactory.getLogger(CustomClassLoader.class); + private final Path classDir; + + public CustomClassLoader(Path classDir) { + super(ClassLoader.getSystemClassLoader()); + this.classDir = classDir; + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + LOGGER.info("Load class for {}", name); + + Class clazz = findLoadedClass(name); + + if (clazz == null) { + try { + clazz = findClass(name); + } catch (ClassNotFoundException ex) { + clazz = super.loadClass(name, resolve); + } + } + + if (resolve) { + resolveClass(clazz); + } + + return clazz; + } + + @Override + protected Class findClass(String name) throws ClassNotFoundException { + LOGGER.info("Finding class for {}", name); + + try { + Path file = classDir.resolve(name.replace('.', '/') + ".class"); + byte[] bytes = java.nio.file.Files.readAllBytes(file); + Thread.sleep(100); + + return defineClass(name, bytes, 0, bytes.length); + } catch (InterruptedException | IOException ex) { + LOGGER.error("Error while finding class file {}", ex.getMessage()); + throw new ClassNotFoundException(ex.getMessage(), ex); + } + } +} diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/classloader/MyClass.java b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/classloader/MyClass.java new file mode 100644 index 000000000000..10ef734d65bb --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/classloader/MyClass.java @@ -0,0 +1,6 @@ +package com.baeldung.virtualthread.classloader; + +public class MyClass { + public MyClass() { + } +} \ No newline at end of file diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/foreignfunction/ForeignFunctionClass.java b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/foreignfunction/ForeignFunctionClass.java new file mode 100644 index 000000000000..b13368c95b64 --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/foreignfunction/ForeignFunctionClass.java @@ -0,0 +1,33 @@ +package com.baeldung.virtualthread.foreignfunction; + +import static java.lang.foreign.ValueLayout.JAVA_INT; +import static java.lang.foreign.ValueLayout.JAVA_LONG; + +import java.lang.foreign.FunctionDescriptor; +import java.lang.foreign.Linker; +import java.lang.foreign.SymbolLookup; +import java.lang.invoke.MethodHandle; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ForeignFunctionClass { + + private static final Logger LOGGER = LoggerFactory.getLogger(ForeignFunctionClass.class); + + public void execute() { + LOGGER.info("Running foreign function sleep..."); + + Linker linker = Linker.nativeLinker(); + SymbolLookup stdlib = linker.defaultLookup(); + MethodHandle sleep = linker.downcallHandle(stdlib.find("sleep") + .orElseThrow(), FunctionDescriptor.of(JAVA_INT, JAVA_LONG)); + + try { + sleep.invoke(100); + } catch (Throwable ex) { + System.out.println("Error in native sleep..."); + throw new RuntimeException(ex); + } + } +} diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/nativemethod/NativeDemo.java b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/nativemethod/NativeDemo.java new file mode 100644 index 000000000000..122d97601ae0 --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/nativemethod/NativeDemo.java @@ -0,0 +1,10 @@ +package com.baeldung.virtualthread.nativemethod; + +public class NativeDemo { + + static { + System.loadLibrary("native-lib"); + } + + public native String nativeCall(); +} diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/synchronize/BenchmarkVirtualThread.java b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/synchronize/BenchmarkVirtualThread.java new file mode 100644 index 000000000000..7a6d5d4391cb --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/synchronize/BenchmarkVirtualThread.java @@ -0,0 +1,48 @@ +package com.baeldung.virtualthread.synchronize; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +@BenchmarkMode({ Mode.AverageTime, Mode.Throughput }) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 1, time = 5, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 3, time = 5, timeUnit = TimeUnit.SECONDS) +@Fork(value = 2) +@State(Scope.Benchmark) +public class BenchmarkVirtualThread { + + private final CartService cartService = new CartService(); + + @Param({ "100", "1000", "10000" }) + private int CONCURRENCY; + + @Benchmark + public void benchmark() throws InterruptedException, IOException { + List threads = new ArrayList<>(); + IntStream.range(0, CONCURRENCY).forEach(i -> threads.add(Thread.startVirtualThread(() -> cartService.update(UUID.randomUUID() + .toString(), 2)))); + + threads.forEach(th -> { + try { + th.join(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + }); + } +} \ No newline at end of file diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/synchronize/CartService.java b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/synchronize/CartService.java new file mode 100644 index 000000000000..e3a0a0c772a4 --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/synchronize/CartService.java @@ -0,0 +1,43 @@ +package com.baeldung.virtualthread.synchronize; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CartService { + + private static final Logger LOGGER = LoggerFactory.getLogger(CartService.class); + + private final Map products; + private final Map locks = new ConcurrentHashMap<>(); + + public CartService() { + this.products = new HashMap<>(); + } + + public void update(String productId, int quantity) { + Object lock = locks.computeIfAbsent(productId, k -> new Object()); + + synchronized (lock) { + simulateAPI(); + products.merge(productId, quantity, Integer::sum); + } + + LOGGER.info("Updated Cart for {} {}", productId, quantity); + } + + public Map getProducts() { + return Map.copyOf(products); + } + + private void simulateAPI() { + try { + Thread.sleep(50); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } +} diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/synchronize/fixed/CartService.java b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/synchronize/fixed/CartService.java new file mode 100644 index 000000000000..a252e8ca9fc1 --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/virtualthread/synchronize/fixed/CartService.java @@ -0,0 +1,53 @@ +package com.baeldung.virtualthread.synchronize.fixed; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CartService { + + private static final Logger LOGGER = LoggerFactory.getLogger(CartService.class); + + private final Map products; + private final Map locks = new ConcurrentHashMap<>(); + + public CartService() { + this.products = new HashMap<>(); + } + + public void update(String productId, int quantity) { + Lock lock = locks.computeIfAbsent(productId, k -> new ReentrantLock()); + + try { + if (lock.tryLock(500, TimeUnit.MILLISECONDS)) { + try { + simulateAPI(); + products.merge(productId, quantity, Integer::sum); + } finally { + lock.unlock(); + } + LOGGER.info("Updated Cart for {} {}", productId, quantity); + } + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + private void simulateAPI() { + try { + Thread.sleep(50); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + public Map getProducts() { + return Map.copyOf(products); + } +} diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/test/java/com/baeldung/virtualthread/classinit/HeavyClassTest.java b/core-java-modules/core-java-concurrency-advanced-7/src/test/java/com/baeldung/virtualthread/classinit/HeavyClassTest.java new file mode 100644 index 000000000000..539b2c4eeee9 --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-7/src/test/java/com/baeldung/virtualthread/classinit/HeavyClassTest.java @@ -0,0 +1,51 @@ +package com.baeldung.virtualthread.classinit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; + +import org.junit.jupiter.api.Test; + +import jdk.jfr.Recording; +import jdk.jfr.consumer.RecordedEvent; +import jdk.jfr.consumer.RecordingFile; + +public class HeavyClassTest { + + @Test + void givenJFRIsEnabled_whenVThreadIsBlocked_thenDetectVThreadPinned() throws IOException, InterruptedException { + Path file = Path.of("pinning_1.jfr"); + + try (Recording recording = new Recording()) { + recording.enable("jdk.VirtualThreadPinned") + .withThreshold(Duration.ofMillis(1)); + recording.start(); + + Thread th = Thread.ofVirtual() + .start(HeavyClass::new); + th.join(); + + recording.stop(); + recording.dump(file); + } + + try (RecordingFile rf = new RecordingFile(file)) { + assertTrue(rf.hasMoreEvents()); + + while (rf.hasMoreEvents()) { + RecordedEvent event = rf.readEvent(); + System.out.println(event); + assertEquals("jdk.VirtualThreadPinned", event.getEventType() + .getName()); + assertEquals("Virtual Thread Pinned", event.getEventType() + .getLabel()); + } + } + + Files.delete(file); + } +} diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/test/java/com/baeldung/virtualthread/classloader/CustomClassLoaderTest.java b/core-java-modules/core-java-concurrency-advanced-7/src/test/java/com/baeldung/virtualthread/classloader/CustomClassLoaderTest.java new file mode 100644 index 000000000000..a023d6f55051 --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-7/src/test/java/com/baeldung/virtualthread/classloader/CustomClassLoaderTest.java @@ -0,0 +1,67 @@ +package com.baeldung.virtualthread.classloader; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; + +import org.junit.jupiter.api.Test; + +import jdk.jfr.Recording; +import jdk.jfr.consumer.RecordedEvent; +import jdk.jfr.consumer.RecordingFile; + +public class CustomClassLoaderTest { + + @Test + void givenJFRRecIsEnabled_whenVThreadIsBlocked_thenDetectVThreadPinned() throws Exception { + Path classDir = Paths.get(CustomClassLoader.class.getProtectionDomain() + .getCodeSource() + .getLocation() + .toURI()); + + CustomClassLoader loader = new CustomClassLoader(classDir); + Path file = Path.of("pinning_3.jfr"); + + try (Recording recording = new Recording()) { + recording.enable("jdk.VirtualThreadPinned") + .withThreshold(Duration.ofMillis(1)); + recording.start(); + + Thread th = Thread.ofVirtual() + .start(() -> { + try { + Class clazz = Class.forName("com.baeldung.virtualthread.classloader.MyClass", + true, loader); + + System.out.println(Thread.currentThread() + " loaded class : " + clazz.getName()); + } catch (ClassNotFoundException ex) { + throw new RuntimeException(ex); + } + }); + + th.join(); + + recording.stop(); + recording.dump(file); + } + + try (RecordingFile rf = new RecordingFile(file)) { + assertTrue(rf.hasMoreEvents()); + + while (rf.hasMoreEvents()) { + RecordedEvent event = rf.readEvent(); + + assertEquals("jdk.VirtualThreadPinned", event.getEventType() + .getName()); + assertEquals("Virtual Thread Pinned", event.getEventType() + .getLabel()); + } + } + + Files.delete(file); + } +} diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/test/java/com/baeldung/virtualthread/synchronize/CartServiceTest.java b/core-java-modules/core-java-concurrency-advanced-7/src/test/java/com/baeldung/virtualthread/synchronize/CartServiceTest.java new file mode 100644 index 000000000000..84e1d29568f4 --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-7/src/test/java/com/baeldung/virtualthread/synchronize/CartServiceTest.java @@ -0,0 +1,84 @@ +package com.baeldung.virtualthread.synchronize; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import jdk.jfr.Recording; +import jdk.jfr.consumer.RecordedEvent; +import jdk.jfr.consumer.RecordingFile; + +public class CartServiceTest { + + private final CartService cartService = new CartService(); + + @Test + void givenJFRRecIsEnabled_whenVThreadIsBlocked_thenDetectVThreadPinned() throws Exception { + Path file = Path.of("pinning_4.jfr"); + + try (Recording recording = new Recording()) { + recording.enable("jdk.VirtualThreadPinned") + .withThreshold(Duration.ofMillis(1)); + recording.start(); + + Thread th = Thread.ofVirtual().start(() -> + cartService.update("test1", 2)); + + th.join(); + + recording.stop(); + recording.dump(file); + } + + try (RecordingFile rf = new RecordingFile(file)) { + assertTrue(rf.hasMoreEvents()); + + while (rf.hasMoreEvents()) { + RecordedEvent event = rf.readEvent(); + + System.out.println(event); + assertEquals("jdk.VirtualThreadPinned", event.getEventType().getName()); + assertEquals("Virtual Thread Pinned", event.getEventType().getLabel()); + } + } + + Files.delete(file); + } + + @Test + void givenProductsIsPresent_whenProductIsAdded_thenProductIsUpdated() throws InterruptedException { + String productId = "test2"; + Thread th1 = Thread.ofVirtual().start(() -> + cartService.update(productId, 2)); + + Thread th2 = Thread.ofVirtual().start(() -> + cartService.update(productId, 3)); + + th1.join(); + th2.join(); + + Map products = cartService.getProducts(); + + assertTrue(products.containsKey(productId)); + assertEquals(5, products.get(productId)); + } + + @Test + void givenProductIsNotPresent_whenProductIsAdded_thenProductIsUpdated() throws InterruptedException { + String productId = "test3"; + Thread th = Thread.ofVirtual().start(() -> + cartService.update(productId, 2)); + th.join(); + + Map products = cartService.getProducts(); + + assertTrue(products.containsKey(productId)); + assertEquals(2, products.get(productId)); + } +} diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/test/java/com/baeldung/virtualthread/synchronize/fixed/CartServiceTest.java b/core-java-modules/core-java-concurrency-advanced-7/src/test/java/com/baeldung/virtualthread/synchronize/fixed/CartServiceTest.java new file mode 100644 index 000000000000..dfc64fd0ad93 --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-7/src/test/java/com/baeldung/virtualthread/synchronize/fixed/CartServiceTest.java @@ -0,0 +1,80 @@ +package com.baeldung.virtualthread.synchronize.fixed; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import jdk.jfr.Recording; +import jdk.jfr.consumer.RecordingFile; + +public class CartServiceTest { + + private final CartService cartService = new CartService(); + + @Test + void givenJFRIsEnabled_whenVThreadIsBlocked_thenDetectVirtualThreadPinned() throws Exception { + Path file = Path.of("no-pinning.jfr"); + + try (Recording recording = new Recording()) { + recording.enable("jdk.VirtualThreadPinned") + .withThreshold(Duration.ofMillis(1)); + recording.start(); + + Thread th1 = Thread.ofVirtual().start(() -> + cartService.update("test1", 2)); + + Thread th2 = Thread.ofVirtual().start(() -> + cartService.update("test1", 3)); + + th1.join(); + th2.join(); + + recording.stop(); + recording.dump(file); + } + + try (RecordingFile rf = new RecordingFile(file)) { + assertFalse(rf.hasMoreEvents()); + } + + Files.delete(file); + } + + @Test + void givenProductsIsPresent_whenProductIsAdded_thenProductIsUpdated() throws InterruptedException { + String productId = "test4"; + Thread th1 = Thread.ofVirtual().start(() -> + cartService.update(productId, 2)); + + Thread th2 = Thread.ofVirtual().start(() -> + cartService.update(productId, 3)); + + th1.join(); + th2.join(); + + Map products = cartService.getProducts(); + + assertTrue(products.containsKey(productId)); + assertEquals(5, products.get(productId)); + } + + @Test + void givenProductIsNotPresent_whenProductIsAdded_thenProductIsUpdate() throws InterruptedException { + String productId = "test5"; + Thread th = Thread.ofVirtual().start(() -> + cartService.update(productId, 2)); + th.join(); + + Map products = cartService.getProducts(); + + assertTrue(products.containsKey(productId)); + assertEquals(2, products.get(productId)); + } +}