diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 296b73ab300..61158143c1c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,16 +1,16 @@ [versions] checkstyle = "11.1.0" com-google-auto-service = "1.1.1" -graalvm = "25.0.0" +graalvm = "25.0.1" jackson = "2.19.2" jacoco = "0.8.13" java = "25" org-mapstruct = "1.6.3" org-mapstruct-extensions-spring = "1.1.3" -org-springdoc = "2.8.13" +org-springdoc = "2.8.14" pmd = "7.17.0" -spotbugs = "4.9.7" -spring-ai = "1.1.0-RC1" +spotbugs = "4.9.8" +spring-ai = "1.1.0" spring-boot = "3.5.7" spring-cloud-aws = "3.4.0" spring-cloud-dependencies = "2025.0.0" @@ -18,29 +18,29 @@ spring-shell = "3.4.1" [libraries] com-github-miachm-sods-sods = "com.github.miachm.sods:SODS:1.7.0" -com-github-mizosoft-methanol = "com.github.mizosoft.methanol:methanol:1.8.3" +com-github-mizosoft-methanol = "com.github.mizosoft.methanol:methanol:1.8.4" com-github-spotbugs-spotbugs-annotations = { module = "com.github.spotbugs:spotbugs-annotations", version.ref = "spotbugs" } com-google-auto-service-auto-service = { module = "com.google.auto.service:auto-service", version.ref = "com-google-auto-service" } com-google-auto-service-auto-service-annotations = { module = "com.google.auto.service:auto-service-annotations", version.ref = "com-google-auto-service" } com-squareup-javapoet = "com.squareup:javapoet:1.13.0" commons-validator = "commons-validator:commons-validator:1.10.0" -io-swagger-core-v3-swagger-annotations = "io.swagger.core.v3:swagger-annotations:2.2.38" -io-swagger-parser-v3-swagger-parser = "io.swagger.parser.v3:swagger-parser:2.1.34" -loki-logback-appender = "com.github.loki4j:loki-logback-appender:2.0.0" -org-apache-activemq-artemis-jakarta-server = "org.apache.activemq:artemis-jakarta-server:2.42.0" +io-swagger-core-v3-swagger-annotations = "io.swagger.core.v3:swagger-annotations:2.2.40" +io-swagger-parser-v3-swagger-parser = "io.swagger.parser.v3:swagger-parser:2.1.35" +loki-logback-appender = "com.github.loki4j:loki-logback-appender:1.6.0" +org-apache-activemq-artemis-jakarta-server = "org.apache.activemq:artemis-jakarta-server:2.44.0" org-apache-poi-poi-ooxml = "org.apache.poi:poi-ooxml:5.4.1" -org-eclipse-jgit-org-eclipse-jgit = "org.eclipse.jgit:org.eclipse.jgit:7.3.0.202506031305-r" +org-eclipse-jgit-org-eclipse-jgit = "org.eclipse.jgit:org.eclipse.jgit:7.4.0.202509020913-r" org-graalvm-polyglot-java = { module = "org.graalvm.polyglot:java", version.ref = "graalvm" } org-graalvm-polyglot-js = { module = "org.graalvm.polyglot:js", version.ref = "graalvm" } org-graalvm-polyglot-polyglot = { module = "org.graalvm.polyglot:polyglot", version.ref = "graalvm" } org-graalvm-polyglot-python = { module = "org.graalvm.polyglot:python", version.ref = "graalvm" } -org-graalvm-polyglot-ruby = { module = "org.graalvm.polyglot:ruby", version.ref = "graalvm" } +org-graalvm-polyglot-ruby = "org.graalvm.polyglot:ruby:25.0.0" org-json = "org.json:json:20250517" org-mapstruct = { module = "org.mapstruct:mapstruct", version.ref = "org-mapstruct" } org-mapstruct-extensions-spring-mapstruct-spring-annotations = { module = "org.mapstruct.extensions.spring:mapstruct-spring-annotations", version.ref = "org-mapstruct-extensions-spring" } org-mapstruct-extensions-spring-mapstruct-spring-extensions = { module = "org.mapstruct.extensions.spring:mapstruct-spring-extensions", version.ref = "org-mapstruct-extensions-spring" } org-mapstruct-mapstruct-processor = { module = "org.mapstruct:mapstruct-processor", version.ref = "org-mapstruct" } -org-openapitools-jackson-databind-nullable = "org.openapitools:jackson-databind-nullable:0.2.7" +org-openapitools-jackson-databind-nullable = "org.openapitools:jackson-databind-nullable:0.2.8" org-springdoc-springdoc-openapi-starter-common = { module = "org.springdoc:springdoc-openapi-starter-common", version.ref = "org-springdoc" } org-springdoc-springdoc-openapi-starter-webmvc-ui = { module = "org.springdoc:springdoc-openapi-starter-webmvc-ui", version.ref = "org-springdoc" } org-wiremock-wiremock = "org.wiremock:wiremock-standalone:3.13.1" @@ -51,8 +51,8 @@ com-github-ben-manes-versions = "com.github.ben-manes.versions:0.53.0" gradle-git-properties = "com.gorylenko.gradle-git-properties:2.5.2" jib = "com.google.cloud.tools.jib:3.4.5" nl-littlerobots-version-catalog-update = "nl.littlerobots.version-catalog-update:1.0.1" -org-graalvm-buildtools-native = "org.graalvm.buildtools.native:0.11.1" -org-openapi-generator = "org.openapi.generator:7.16.0" -org-sonarqube = "org.sonarqube:7.0.0.6105" -spotbugs = "com.github.spotbugs:6.4.3" +org-graalvm-buildtools-native = "org.graalvm.buildtools.native:0.11.3" +org-openapi-generator = "org.openapi.generator:7.17.0" +org-sonarqube = "org.sonarqube:7.0.1.6134" +spotbugs = "com.github.spotbugs:6.4.5" spotless = "com.diffplug.gradle.spotless:8.0.0" diff --git a/server/ee/apps/runtime-job-app/build.gradle.kts b/server/ee/apps/runtime-job-app/build.gradle.kts index be15490ba64..6ae59de67a7 100644 --- a/server/ee/apps/runtime-job-app/build.gradle.kts +++ b/server/ee/apps/runtime-job-app/build.gradle.kts @@ -36,7 +36,7 @@ dependencies { implementation(project(":server:libs:core:evaluator:evaluator-impl")) implementation(project(":server:libs:core:file-storage:file-storage-base64-service")) implementation(project(":server:libs:core:file-storage:file-storage-filesystem-service")) - implementation(project(":server:libs:core:message:message-broker:message-broker-sync")) + implementation(project(":server:libs:core:message:message-broker:message-broker-memory")) implementation(project(":server:libs:core:message:message-event:message-event-impl")) implementation(project(":server:libs:platform:platform-component:platform-component-api")) implementation(project(":server:libs:platform:platform-component:platform-component-context:platform-component-context-service")) diff --git a/server/ee/apps/runtime-job-app/src/main/java/com/bytechef/runtime/job/platform/connection/ConnectionContext.java b/server/ee/apps/runtime-job-app/src/main/java/com/bytechef/runtime/job/platform/connection/ConnectionContext.java index c7bc33febb1..e06a419eca0 100644 --- a/server/ee/apps/runtime-job-app/src/main/java/com/bytechef/runtime/job/platform/connection/ConnectionContext.java +++ b/server/ee/apps/runtime-job-app/src/main/java/com/bytechef/runtime/job/platform/connection/ConnectionContext.java @@ -7,8 +7,8 @@ package com.bytechef.runtime.job.platform.connection; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; /** @@ -19,22 +19,16 @@ public class ConnectionContext { private static final AtomicLong ATOMIC_CONNECTION_ID = new AtomicLong(1); - - private static final ThreadLocal> CURRENT_CONNECTION_MAP = ThreadLocal.withInitial( - HashMap::new); + private static final Map CONNECTION_MAP = new ConcurrentHashMap<>(); public static Map getConnectionParameters(long id) { - Map parameterMap = CURRENT_CONNECTION_MAP.get(); - - return parameterMap.get(id).parameters; + return CONNECTION_MAP.get(id).parameters; } public static long putConnectionParameters(String name, Map parameters) { - Map parameterMap = CURRENT_CONNECTION_MAP.get(); - long connectionId = -1; - for (Map.Entry entry : parameterMap.entrySet()) { + for (Map.Entry entry : CONNECTION_MAP.entrySet()) { Connection connection = entry.getValue(); if (connection.name.equals(name)) { @@ -47,9 +41,7 @@ public static long putConnectionParameters(String name, Map parameter if (connectionId == -1) { connectionId = ATOMIC_CONNECTION_ID.getAndIncrement(); - parameterMap.putIfAbsent(connectionId, new Connection(name, parameters)); - - CURRENT_CONNECTION_MAP.set(parameterMap); + CONNECTION_MAP.putIfAbsent(connectionId, new Connection(name, parameters)); } return connectionId; diff --git a/server/ee/apps/runtime-job-app/src/main/resources/config/application.yml b/server/ee/apps/runtime-job-app/src/main/resources/config/application.yml index f48c27e08fa..a1e0b3028be 100644 --- a/server/ee/apps/runtime-job-app/src/main/resources/config/application.yml +++ b/server/ee/apps/runtime-job-app/src/main/resources/config/application.yml @@ -89,8 +89,8 @@ bytechef: host: port: 25 message-broker: - # Messaging provider between Coordinator and Workers (local) default: local - provider: local + # Messaging provider between Coordinator and Workers (memory) default: memory + provider: memory # When the worker is enabled, subscribe to the default "default" queue with 10 concurrent consumers. # You may also route workflow tasks to other arbitrarily named task queues by specifying the "node" # property on any given task. diff --git a/server/ee/libs/ai/ai-copilot/ai-copilot-service/src/main/java/com/bytechef/ee/ai/copilot/config/AiCopilotConfiguration.java b/server/ee/libs/ai/ai-copilot/ai-copilot-service/src/main/java/com/bytechef/ee/ai/copilot/config/AiCopilotConfiguration.java index b4a8583c54a..f28136c48db 100644 --- a/server/ee/libs/ai/ai-copilot/ai-copilot-service/src/main/java/com/bytechef/ee/ai/copilot/config/AiCopilotConfiguration.java +++ b/server/ee/libs/ai/ai-copilot/ai-copilot-service/src/main/java/com/bytechef/ee/ai/copilot/config/AiCopilotConfiguration.java @@ -56,8 +56,10 @@ public AiCopilotConfiguration(ApplicationProperties applicationProperties) { this.model = options.getModel(); this.openAiApiKey = openAi.getApiKey(); - this.pgVector = copilot.getVectorstore() - .getPgVector(); + + ApplicationProperties.Ai.Copilot.Vectorstore vectorstore = copilot.getVectorstore(); + + this.pgVector = vectorstore.getPgVector(); this.temperature = options.getTemperature(); } diff --git a/server/ee/libs/atlas/atlas-execution/atlas-execution-remote-client/src/main/java/com/bytechef/ee/atlas/execution/remote/client/service/RemoteJobServiceClient.java b/server/ee/libs/atlas/atlas-execution/atlas-execution-remote-client/src/main/java/com/bytechef/ee/atlas/execution/remote/client/service/RemoteJobServiceClient.java index 5a5332a2385..dc431a3f873 100644 --- a/server/ee/libs/atlas/atlas-execution/atlas-execution-remote-client/src/main/java/com/bytechef/ee/atlas/execution/remote/client/service/RemoteJobServiceClient.java +++ b/server/ee/libs/atlas/atlas-execution/atlas-execution-remote-client/src/main/java/com/bytechef/ee/atlas/execution/remote/client/service/RemoteJobServiceClient.java @@ -50,6 +50,11 @@ public void deleteJob(long id) { throw new UnsupportedOperationException(); } + @Override + public Optional fetchJob(Long id) { + throw new UnsupportedOperationException(); + } + @Override public Optional fetchLastJob() { throw new UnsupportedOperationException(); diff --git a/server/libs/atlas/atlas-coordinator/atlas-coordinator-impl/build.gradle.kts b/server/libs/atlas/atlas-coordinator/atlas-coordinator-impl/build.gradle.kts index 7ed8693e2dd..419cbb78093 100644 --- a/server/libs/atlas/atlas-coordinator/atlas-coordinator-impl/build.gradle.kts +++ b/server/libs/atlas/atlas-coordinator/atlas-coordinator-impl/build.gradle.kts @@ -20,7 +20,7 @@ dependencies { testImplementation(project(":server:libs:atlas:atlas-execution:atlas-execution-service")) testImplementation(project(":server:libs:atlas:atlas-file-storage:atlas-file-storage-impl")) testImplementation(project(":server:libs:atlas:atlas-worker:atlas-worker-impl")) - testImplementation(project(":server:libs:core:message:message-broker:message-broker-sync")) + testImplementation(project(":server:libs:core:message:message-broker:message-broker-memory")) testImplementation(project(":server:libs:core:commons:commons-data")) testImplementation(project(":server:libs:core:evaluator:evaluator-impl")) testImplementation(project(":server:libs:core:file-storage:file-storage-base64-service")) diff --git a/server/libs/atlas/atlas-coordinator/atlas-coordinator-impl/src/test/java/com/bytechef/atlas/coordinator/TaskCoordinatorIntTest.java b/server/libs/atlas/atlas-coordinator/atlas-coordinator-impl/src/test/java/com/bytechef/atlas/coordinator/TaskCoordinatorIntTest.java index 0d3c8004099..d3b147d2637 100644 --- a/server/libs/atlas/atlas-coordinator/atlas-coordinator-impl/src/test/java/com/bytechef/atlas/coordinator/TaskCoordinatorIntTest.java +++ b/server/libs/atlas/atlas-coordinator/atlas-coordinator-impl/src/test/java/com/bytechef/atlas/coordinator/TaskCoordinatorIntTest.java @@ -52,6 +52,7 @@ import com.bytechef.file.storage.base64.service.Base64FileStorageService; import com.bytechef.jackson.config.JacksonConfiguration; import com.bytechef.liquibase.config.LiquibaseConfiguration; +import com.bytechef.message.broker.memory.SyncMessageBroker; import com.bytechef.platform.coordinator.job.JobSyncExecutor; import com.bytechef.test.config.jdbc.AbstractIntTestJdbcConfiguration; import com.bytechef.test.config.testcontainers.PostgreSQLContainerConfiguration; @@ -62,7 +63,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -75,7 +75,6 @@ import org.springframework.core.env.Environment; import org.springframework.core.io.support.ResourcePatternResolver; import org.springframework.data.jdbc.repository.config.EnableJdbcRepositories; -import org.springframework.lang.Nullable; /** * @author Arik Cohen @@ -96,23 +95,18 @@ public class TaskCoordinatorIntTest { private final Evaluator evaluator = SpelEvaluator.create(); @Autowired - @Nullable private Environment environment; @Autowired - @Nullable private ContextService contextService; @Autowired - @Nullable private JobService jobService; @Autowired - @Nullable private TaskExecutionService taskExecutionService; @Autowired - @Nullable private WorkflowService workflowService; @Test @@ -135,9 +129,8 @@ private Job executeWorkflow(String workflowId) { taskHandlerMap.put("randomHelper/v1/randomInt", taskExecution -> null); JobSyncExecutor jobSyncExecutor = new JobSyncExecutor( - Objects.requireNonNull(contextService), environment, evaluator, Objects.requireNonNull(jobService), - List.of(), Objects.requireNonNull(taskExecutionService), taskHandlerMap::get, TASK_FILE_STORAGE, - Objects.requireNonNull(workflowService)); + contextService, environment, evaluator, jobService, -1, SyncMessageBroker::new, List.of(), + taskExecutionService, taskHandlerMap::get, TASK_FILE_STORAGE, -1, workflowService); return jobSyncExecutor.execute(new JobParametersDTO(workflowId, Collections.singletonMap("yourName", "me"))); } diff --git a/server/libs/atlas/atlas-execution/atlas-execution-api/src/main/java/com/bytechef/atlas/execution/service/JobService.java b/server/libs/atlas/atlas-execution/atlas-execution-api/src/main/java/com/bytechef/atlas/execution/service/JobService.java index c1320fb1eeb..3a06621a0c5 100644 --- a/server/libs/atlas/atlas-execution/atlas-execution-api/src/main/java/com/bytechef/atlas/execution/service/JobService.java +++ b/server/libs/atlas/atlas-execution/atlas-execution-api/src/main/java/com/bytechef/atlas/execution/service/JobService.java @@ -32,6 +32,8 @@ public interface JobService { void deleteJob(long id); + Optional fetchJob(Long id); + Optional fetchLastJob(); Optional fetchLastWorkflowJob(String workflowId); diff --git a/server/libs/atlas/atlas-execution/atlas-execution-repository/atlas-execution-repository-memory/src/main/java/com/bytechef/atlas/execution/repository/memory/InMemoryJobRepository.java b/server/libs/atlas/atlas-execution/atlas-execution-repository/atlas-execution-repository-memory/src/main/java/com/bytechef/atlas/execution/repository/memory/InMemoryJobRepository.java index 8477c4fb5f7..a63459e8bee 100644 --- a/server/libs/atlas/atlas-execution/atlas-execution-repository/atlas-execution-repository-memory/src/main/java/com/bytechef/atlas/execution/repository/memory/InMemoryJobRepository.java +++ b/server/libs/atlas/atlas-execution/atlas-execution-repository/atlas-execution-repository-memory/src/main/java/com/bytechef/atlas/execution/repository/memory/InMemoryJobRepository.java @@ -71,7 +71,9 @@ public int countRunningJobs() { @Override public void deleteById(Long id) { - throw new UnsupportedOperationException(); + Cache cache = Objects.requireNonNull(cacheManager.getCache(CACHE)); + + cache.evict(TenantCacheKeyUtils.getKey(id)); } @Override diff --git a/server/libs/atlas/atlas-execution/atlas-execution-service/src/main/java/com/bytechef/atlas/execution/service/JobServiceImpl.java b/server/libs/atlas/atlas-execution/atlas-execution-service/src/main/java/com/bytechef/atlas/execution/service/JobServiceImpl.java index e8ef6fbb798..360070c48bb 100644 --- a/server/libs/atlas/atlas-execution/atlas-execution-service/src/main/java/com/bytechef/atlas/execution/service/JobServiceImpl.java +++ b/server/libs/atlas/atlas-execution/atlas-execution-service/src/main/java/com/bytechef/atlas/execution/service/JobServiceImpl.java @@ -68,6 +68,11 @@ public void deleteJob(long id) { jobRepository.deleteById(id); } + @Override + public Optional fetchJob(Long id) { + return jobRepository.findById(id); + } + @Override @Transactional(readOnly = true) public Optional fetchLastJob() { diff --git a/server/libs/atlas/atlas-worker/atlas-worker-impl/build.gradle.kts b/server/libs/atlas/atlas-worker/atlas-worker-impl/build.gradle.kts index 0da5cc21216..edec77bece7 100644 --- a/server/libs/atlas/atlas-worker/atlas-worker-impl/build.gradle.kts +++ b/server/libs/atlas/atlas-worker/atlas-worker-impl/build.gradle.kts @@ -17,6 +17,6 @@ dependencies { testImplementation(project(":server:libs:atlas:atlas-file-storage:atlas-file-storage-impl")) testImplementation(project(":server:libs:core:evaluator:evaluator-impl")) testImplementation(project(":server:libs:core:file-storage:file-storage-base64-service")) - testImplementation(project(":server:libs:core:message:message-broker:message-broker-sync")) + testImplementation(project(":server:libs:core:message:message-broker:message-broker-memory")) testImplementation(project(":server:libs:test:test-support")) } diff --git a/server/libs/atlas/atlas-worker/atlas-worker-impl/src/test/java/com/bytechef/atlas/worker/TaskWorkerTest.java b/server/libs/atlas/atlas-worker/atlas-worker-impl/src/test/java/com/bytechef/atlas/worker/TaskWorkerTest.java index 895b6ccb858..67a89033c0b 100644 --- a/server/libs/atlas/atlas-worker/atlas-worker-impl/src/test/java/com/bytechef/atlas/worker/TaskWorkerTest.java +++ b/server/libs/atlas/atlas-worker/atlas-worker-impl/src/test/java/com/bytechef/atlas/worker/TaskWorkerTest.java @@ -40,7 +40,7 @@ import com.bytechef.evaluator.Evaluator; import com.bytechef.evaluator.SpelEvaluator; import com.bytechef.file.storage.base64.service.Base64FileStorageService; -import com.bytechef.message.broker.sync.SyncMessageBroker; +import com.bytechef.message.broker.memory.SyncMessageBroker; import com.bytechef.message.event.MessageEvent; import com.bytechef.test.extension.ObjectMapperSetupExtension; import java.io.File; diff --git a/server/libs/automation/automation-configuration/automation-configuration-service/src/main/java/com/bytechef/automation/configuration/service/SharedTemplateServiceImpl.java b/server/libs/automation/automation-configuration/automation-configuration-service/src/main/java/com/bytechef/automation/configuration/service/SharedTemplateServiceImpl.java index 3a372978dd0..584fde0c201 100644 --- a/server/libs/automation/automation-configuration/automation-configuration-service/src/main/java/com/bytechef/automation/configuration/service/SharedTemplateServiceImpl.java +++ b/server/libs/automation/automation-configuration/automation-configuration-service/src/main/java/com/bytechef/automation/configuration/service/SharedTemplateServiceImpl.java @@ -43,14 +43,15 @@ public SharedTemplateServiceImpl(SharedTemplateRepository sharedTemplateReposito @Override @Transactional(readOnly = true) public Optional fetchSharedTemplate(UUID uuid) { - return TenantUtils.callWithTenantId(TenantContext.DEFAULT_TENANT_ID, - () -> sharedTemplateRepository.findByUuid(uuid)); + return TenantUtils.callWithTenantId( + TenantContext.DEFAULT_TENANT_ID, () -> sharedTemplateRepository.findByUuid(uuid)); } @Override @Transactional(readOnly = true) public SharedTemplate getSharedTemplate(UUID uuid) { - return TenantUtils.callWithTenantId(TenantContext.DEFAULT_TENANT_ID, + return TenantUtils.callWithTenantId( + TenantContext.DEFAULT_TENANT_ID, () -> sharedTemplateRepository.findByUuid(uuid) .orElseThrow(() -> new IllegalArgumentException("Shared template not found for uuid: " + uuid))); } diff --git a/server/libs/config/app-config/src/main/java/com/bytechef/config/ApplicationProperties.java b/server/libs/config/app-config/src/main/java/com/bytechef/config/ApplicationProperties.java index 76f65b2677a..40836174455 100644 --- a/server/libs/config/app-config/src/main/java/com/bytechef/config/ApplicationProperties.java +++ b/server/libs/config/app-config/src/main/java/com/bytechef/config/ApplicationProperties.java @@ -1756,7 +1756,7 @@ public void setRequired(boolean required) { public static class MessageBroker { public enum Provider { - AMQP, AWS, JMS, KAFKA, LOCAL, REDIS + AMQP, AWS, JMS, KAFKA, MEMORY, REDIS } private Provider provider = Provider.JMS; diff --git a/server/libs/core/commons/commons-util/src/main/java/com/bytechef/commons/util/CollectionUtils.java b/server/libs/core/commons/commons-util/src/main/java/com/bytechef/commons/util/CollectionUtils.java index 8d0f1c5e96b..0780518defa 100644 --- a/server/libs/core/commons/commons-util/src/main/java/com/bytechef/commons/util/CollectionUtils.java +++ b/server/libs/core/commons/commons-util/src/main/java/com/bytechef/commons/util/CollectionUtils.java @@ -214,7 +214,8 @@ public static List map(List list, Function mapper) { Validate.notNull(list, "'list' must not be null"); Validate.notNull(mapper, "'mapper' must not be null"); - return list.stream() + return new ArrayList<>(list) + .stream() .map(mapper) .toList(); } @@ -223,7 +224,8 @@ public static List map(Set set, Function mapper) { Validate.notNull(set, "'set' must not be null"); Validate.notNull(mapper, "'mapper' must not be null"); - return set.stream() + return new ArrayList<>(set) + .stream() .map(mapper) .toList(); } diff --git a/server/libs/core/message/message-broker/message-broker-api/src/main/java/com/bytechef/message/broker/annotation/ConditionalOnMessageBrokerLocal.java b/server/libs/core/message/message-broker/message-broker-api/src/main/java/com/bytechef/message/broker/annotation/ConditionalOnMessageBrokerMemory.java similarity index 93% rename from server/libs/core/message/message-broker/message-broker-api/src/main/java/com/bytechef/message/broker/annotation/ConditionalOnMessageBrokerLocal.java rename to server/libs/core/message/message-broker/message-broker-api/src/main/java/com/bytechef/message/broker/annotation/ConditionalOnMessageBrokerMemory.java index c6cbd20ebd4..c1ba042ead2 100644 --- a/server/libs/core/message/message-broker/message-broker-api/src/main/java/com/bytechef/message/broker/annotation/ConditionalOnMessageBrokerLocal.java +++ b/server/libs/core/message/message-broker/message-broker-api/src/main/java/com/bytechef/message/broker/annotation/ConditionalOnMessageBrokerMemory.java @@ -34,6 +34,6 @@ @Target({ ElementType.TYPE, ElementType.METHOD }) -@ConditionalOnProperty(prefix = "bytechef", name = "message-broker.provider", havingValue = "local") -public @interface ConditionalOnMessageBrokerLocal { +@ConditionalOnProperty(prefix = "bytechef", name = "message-broker.provider", havingValue = "memory") +public @interface ConditionalOnMessageBrokerMemory { } diff --git a/server/libs/core/message/message-broker/message-broker-sync/build.gradle.kts b/server/libs/core/message/message-broker/message-broker-memory/build.gradle.kts similarity index 100% rename from server/libs/core/message/message-broker/message-broker-sync/build.gradle.kts rename to server/libs/core/message/message-broker/message-broker-memory/build.gradle.kts diff --git a/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/AbstractMessageBroker.java b/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/AbstractMessageBroker.java new file mode 100644 index 00000000000..e5148d7117e --- /dev/null +++ b/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/AbstractMessageBroker.java @@ -0,0 +1,42 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytechef.message.broker.memory; + +import com.bytechef.message.route.MessageRoute; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Abstract base implementation of the {@link MessageBroker} interface. This class provides a framework for managing + * receivers and routing messages to them based on specified routes. Concrete subclasses need to implement the + * {@code send} method to define the specific behavior for message delivery. + * + * @author Ivica Cardic + */ +public abstract class AbstractMessageBroker implements MemoryMessageBroker { + + protected final Map> receiverMap = new HashMap<>(); + + @Override + public void receive(MessageRoute messageRoute, Receiver receiver) { + List receivers = receiverMap.computeIfAbsent(messageRoute, k -> new ArrayList<>()); + + receivers.add(receiver); + } +} diff --git a/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/AsyncMessageBroker.java b/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/AsyncMessageBroker.java new file mode 100644 index 00000000000..bacbea37569 --- /dev/null +++ b/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/AsyncMessageBroker.java @@ -0,0 +1,83 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytechef.message.broker.memory; + +import com.bytechef.commons.util.ConvertUtils; +import com.bytechef.commons.util.JsonUtils; +import com.bytechef.message.Retryable; +import com.bytechef.message.route.MessageRoute; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.Validate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.thread.Threading; +import org.springframework.core.env.Environment; +import org.springframework.util.Assert; + +/** + * An asynchronous implementation of a message broker for routing messages to subscribed listeners. This class extends + * {@code AbstractMessageBroker}, providing a non-blocking approach to message delivery using an {@code Executor} for + * task execution. The {@code AsyncMessageBroker} is designed to decouple the process of message sending and delivery by + * executing receiver logic in separate threads managed by the provided executor. + * + * @author Ivica Cardic + */ +public class AsyncMessageBroker extends AbstractMessageBroker { + + private static final Logger logger = LoggerFactory.getLogger(AsyncMessageBroker.class); + + private final Executor executor; + + public AsyncMessageBroker(Environment environment) { + if (Threading.VIRTUAL.isActive(environment)) { + executor = Executors.newVirtualThreadPerTaskExecutor(); + } else { + executor = Executors.newCachedThreadPool(); + } + } + + @Override + public void send(MessageRoute messageRoute, Object message) { + Assert.notNull(messageRoute, "'messageRoute' must not be null"); + + if (message instanceof Retryable retryable) { + delay(retryable.getRetryDelayMillis()); + } + + List receivers = receiverMap.get(messageRoute); + + Assert.isTrue(receivers != null && !receivers.isEmpty(), "no listeners subscribed for: " + messageRoute); + + for (Receiver receiver : Validate.notNull(receivers, "receivers")) { + executor.execute(() -> receiver.receive( + ConvertUtils.convertValue(JsonUtils.read(JsonUtils.write(message)), message.getClass()))); + } + } + + private void delay(long value) { + try { + TimeUnit.MILLISECONDS.sleep(value); + } catch (InterruptedException e) { + if (logger.isTraceEnabled()) { + logger.trace(e.getMessage(), e); + } + } + } +} diff --git a/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/MemoryMessageBroker.java b/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/MemoryMessageBroker.java new file mode 100644 index 00000000000..552dbca3142 --- /dev/null +++ b/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/MemoryMessageBroker.java @@ -0,0 +1,32 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytechef.message.broker.memory; + +import com.bytechef.message.broker.MessageBroker; +import com.bytechef.message.route.MessageRoute; + +/** + * @author Ivica Cardic + */ +public interface MemoryMessageBroker extends MessageBroker { + + void receive(MessageRoute messageRoute, Receiver receiver); + + interface Receiver { + void receive(Object message); + } +} diff --git a/server/libs/core/message/message-broker/message-broker-sync/src/main/java/com/bytechef/message/broker/sync/SyncMessageBroker.java b/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/SyncMessageBroker.java similarity index 68% rename from server/libs/core/message/message-broker/message-broker-sync/src/main/java/com/bytechef/message/broker/sync/SyncMessageBroker.java rename to server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/SyncMessageBroker.java index c6147120ad2..8e38d58850d 100644 --- a/server/libs/core/message/message-broker/message-broker-sync/src/main/java/com/bytechef/message/broker/sync/SyncMessageBroker.java +++ b/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/SyncMessageBroker.java @@ -16,28 +16,23 @@ * Modifications copyright (C) 2025 ByteChef */ -package com.bytechef.message.broker.sync; +package com.bytechef.message.broker.memory; import com.bytechef.commons.util.ConvertUtils; import com.bytechef.commons.util.JsonUtils; -import com.bytechef.message.broker.MessageBroker; import com.bytechef.message.route.MessageRoute; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.lang3.Validate; import org.springframework.util.Assert; /** - * a simple, non-thread-safe implementation of the {@link MessageBroker} interface. Useful for testing. + * Simple, non-thread-safe implementation of the {@link MessageBroker} interface. Useful for testing. * * @author Arik Cohen + * @author Ivica Cardic * @since Jul 10, 2016 */ -public class SyncMessageBroker implements MessageBroker { - - private final Map> receiverMap = new HashMap<>(); +public class SyncMessageBroker extends AbstractMessageBroker { @Override public void send(MessageRoute messageRoute, Object message) { @@ -52,14 +47,4 @@ public void send(MessageRoute messageRoute, Object message) { ConvertUtils.convertValue(JsonUtils.read(JsonUtils.write(message)), message.getClass())); } } - - public void receive(MessageRoute messageRoute, Receiver receiver) { - List receivers = receiverMap.computeIfAbsent(messageRoute, k -> new ArrayList<>()); - - receivers.add(receiver); - } - - public interface Receiver { - void receive(Object message); - } } diff --git a/server/libs/core/message/message-broker/message-broker-sync/src/main/java/com/bytechef/message/broker/sync/config/LocalMessageBrokerConfiguration.java b/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/config/MemoryMessageBrokerConfiguration.java similarity index 65% rename from server/libs/core/message/message-broker/message-broker-sync/src/main/java/com/bytechef/message/broker/sync/config/LocalMessageBrokerConfiguration.java rename to server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/config/MemoryMessageBrokerConfiguration.java index ac0c8e9b47d..8f01282a7f1 100644 --- a/server/libs/core/message/message-broker/message-broker-sync/src/main/java/com/bytechef/message/broker/sync/config/LocalMessageBrokerConfiguration.java +++ b/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/config/MemoryMessageBrokerConfiguration.java @@ -14,32 +14,33 @@ * limitations under the License. */ -package com.bytechef.message.broker.sync.config; +package com.bytechef.message.broker.memory.config; -import com.bytechef.message.broker.annotation.ConditionalOnMessageBrokerLocal; -import com.bytechef.message.broker.sync.SyncMessageBroker; +import com.bytechef.message.broker.annotation.ConditionalOnMessageBrokerMemory; +import com.bytechef.message.broker.memory.AsyncMessageBroker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; /** * @author Ivica Cardic */ @Configuration -@ConditionalOnMessageBrokerLocal -public class LocalMessageBrokerConfiguration { +@ConditionalOnMessageBrokerMemory +public class MemoryMessageBrokerConfiguration { - private static final Logger logger = LoggerFactory.getLogger(LocalMessageBrokerConfiguration.class); + private static final Logger logger = LoggerFactory.getLogger(MemoryMessageBrokerConfiguration.class); - public LocalMessageBrokerConfiguration() { + public MemoryMessageBrokerConfiguration() { if (logger.isInfoEnabled()) { - logger.info("Message broker provider type enabled: local"); + logger.info("Message broker provider type enabled: memory"); } } @Bean - SyncMessageBroker syncMessageBroker() { - return new SyncMessageBroker(); + AsyncMessageBroker asyncMessageBroker(Environment environment) { + return new AsyncMessageBroker(environment); } } diff --git a/server/libs/core/message/message-broker/message-broker-sync/src/main/java/com/bytechef/message/broker/sync/config/LocalMessageBrokerListenerRegistrarConfiguration.java b/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/config/MemoryMessageBrokerListenerRegistrarConfiguration.java similarity index 63% rename from server/libs/core/message/message-broker/message-broker-sync/src/main/java/com/bytechef/message/broker/sync/config/LocalMessageBrokerListenerRegistrarConfiguration.java rename to server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/config/MemoryMessageBrokerListenerRegistrarConfiguration.java index 5aacfa98a4f..a50201bf880 100644 --- a/server/libs/core/message/message-broker/message-broker-sync/src/main/java/com/bytechef/message/broker/sync/config/LocalMessageBrokerListenerRegistrarConfiguration.java +++ b/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/config/MemoryMessageBrokerListenerRegistrarConfiguration.java @@ -16,13 +16,13 @@ * Modifications copyright (C) 2025 ByteChef */ -package com.bytechef.message.broker.sync.config; +package com.bytechef.message.broker.memory.config; -import com.bytechef.message.broker.annotation.ConditionalOnMessageBrokerLocal; +import com.bytechef.message.broker.annotation.ConditionalOnMessageBrokerMemory; import com.bytechef.message.broker.config.MessageBrokerConfigurer; import com.bytechef.message.broker.config.MessageBrokerListenerRegistrar; -import com.bytechef.message.broker.sync.SyncMessageBroker; -import com.bytechef.message.broker.sync.listener.LocalListenerEndpointRegistrar; +import com.bytechef.message.broker.memory.MemoryMessageBroker; +import com.bytechef.message.broker.memory.listener.MemoryListenerEndpointRegistrar; import com.bytechef.message.route.MessageRoute; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; @@ -37,34 +37,34 @@ * @author Ivica Cardic */ @Configuration -@ConditionalOnMessageBrokerLocal -public class LocalMessageBrokerListenerRegistrarConfiguration +@ConditionalOnMessageBrokerMemory +public class MemoryMessageBrokerListenerRegistrarConfiguration implements - MessageBrokerListenerRegistrar, + MessageBrokerListenerRegistrar, SmartInitializingSingleton { private static final Logger logger = LoggerFactory.getLogger( - LocalMessageBrokerListenerRegistrarConfiguration.class); + MemoryMessageBrokerListenerRegistrarConfiguration.class); - private final List> messageBrokerConfigurers; - private final SyncMessageBroker syncMessageBroker; + private final List> messageBrokerConfigurers; + private final MemoryMessageBroker memoryMessageBroker; @SuppressFBWarnings("EI") - public LocalMessageBrokerListenerRegistrarConfiguration( + public MemoryMessageBrokerListenerRegistrarConfiguration( @Autowired( - required = false) List> messageBrokerConfigurers, - SyncMessageBroker syncMessageBroker) { + required = false) List> messageBrokerConfigurers, + MemoryMessageBroker memoryMessageBroker) { - this.syncMessageBroker = syncMessageBroker; + this.memoryMessageBroker = memoryMessageBroker; this.messageBrokerConfigurers = messageBrokerConfigurers == null ? List.of() : messageBrokerConfigurers; } @Override public void afterSingletonsInstantiated() { - LocalListenerEndpointRegistrar listenerEndpointRegistrar = new LocalListenerEndpointRegistrar( - syncMessageBroker); + MemoryListenerEndpointRegistrar listenerEndpointRegistrar = new MemoryListenerEndpointRegistrar( + memoryMessageBroker); - for (MessageBrokerConfigurer messageBrokerConfigurer : messageBrokerConfigurers) { + for (MessageBrokerConfigurer messageBrokerConfigurer : messageBrokerConfigurers) { messageBrokerConfigurer.configure(listenerEndpointRegistrar, this); } @@ -72,7 +72,7 @@ public void afterSingletonsInstantiated() { @Override public void registerListenerEndpoint( - LocalListenerEndpointRegistrar listenerEndpointRegistrar, MessageRoute messageRoute, int concurrency, + MemoryListenerEndpointRegistrar listenerEndpointRegistrar, MessageRoute messageRoute, int concurrency, Object delegate, String methodName) { Class delegateClass = delegate.getClass(); diff --git a/server/libs/core/message/message-broker/message-broker-sync/src/main/java/com/bytechef/message/broker/sync/listener/LocalListenerEndpointRegistrar.java b/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/listener/MemoryListenerEndpointRegistrar.java similarity index 90% rename from server/libs/core/message/message-broker/message-broker-sync/src/main/java/com/bytechef/message/broker/sync/listener/LocalListenerEndpointRegistrar.java rename to server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/listener/MemoryListenerEndpointRegistrar.java index d5d27eff8a8..1e213b76637 100644 --- a/server/libs/core/message/message-broker/message-broker-sync/src/main/java/com/bytechef/message/broker/sync/listener/LocalListenerEndpointRegistrar.java +++ b/server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/listener/MemoryListenerEndpointRegistrar.java @@ -14,9 +14,9 @@ * limitations under the License. */ -package com.bytechef.message.broker.sync.listener; +package com.bytechef.message.broker.memory.listener; -import com.bytechef.message.broker.sync.SyncMessageBroker; +import com.bytechef.message.broker.memory.MemoryMessageBroker; import com.bytechef.message.route.MessageRoute; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -30,16 +30,16 @@ /** * @author Ivica Cardic */ -public class LocalListenerEndpointRegistrar { +public class MemoryListenerEndpointRegistrar { - private final SyncMessageBroker syncMessageBroker; + private final MemoryMessageBroker memoryMessageBroker; - public LocalListenerEndpointRegistrar(SyncMessageBroker syncMessageBroker) { - this.syncMessageBroker = syncMessageBroker; + public MemoryListenerEndpointRegistrar(MemoryMessageBroker memoryMessageBroker) { + this.memoryMessageBroker = memoryMessageBroker; } public void registerListenerEndpoint(MessageRoute messageRoute, Object delegate, String methodName) { - syncMessageBroker.receive( + memoryMessageBroker.receive( messageRoute, message -> { try { diff --git a/server/libs/modules/components/data-stream/src/main/java/com/bytechef/component/datastream/action/definition/DataStreamStreamActionDefinition.java b/server/libs/modules/components/data-stream/src/main/java/com/bytechef/component/datastream/action/definition/DataStreamStreamActionDefinition.java index c133bf9f919..7ac16677c8a 100644 --- a/server/libs/modules/components/data-stream/src/main/java/com/bytechef/component/datastream/action/definition/DataStreamStreamActionDefinition.java +++ b/server/libs/modules/components/data-stream/src/main/java/com/bytechef/component/datastream/action/definition/DataStreamStreamActionDefinition.java @@ -47,6 +47,8 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; + +import com.bytechef.tenant.util.TenantUtils; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameter; @@ -143,7 +145,8 @@ protected Object perform( } }); - JobExecution jobExecution = jobLauncher.run(job, jobParameters); + JobExecution jobExecution = TenantUtils.callWithTenantId( + TenantContext.DEFAULT_TENANT_ID, () -> jobLauncher.run(job, jobParameters)); List failureExceptions = jobExecution.getAllFailureExceptions(); diff --git a/server/libs/modules/components/map/build.gradle.kts b/server/libs/modules/components/map/build.gradle.kts index 3246eaab69c..b1254504ebc 100644 --- a/server/libs/modules/components/map/build.gradle.kts +++ b/server/libs/modules/components/map/build.gradle.kts @@ -11,7 +11,7 @@ dependencies { implementation(project(":server:libs:atlas:atlas-worker:atlas-worker-impl")) implementation(project(":server:libs:core:commons:commons-util")) implementation(project(":server:libs:core:file-storage:file-storage-base64-service")) - implementation(project(":server:libs:core:message:message-broker:message-broker-sync")) + implementation(project(":server:libs:core:message:message-broker:message-broker-memory")) implementation(project(":server:libs:modules:task-dispatchers:map")) implementation(project(":server:libs:platform:platform-api")) implementation(project(":server:libs:platform:platform-file-storage:platform-file-storage-impl")) diff --git a/server/libs/modules/components/map/src/main/java/com/bytechef/component/map/MapTaskDispatcherAdapterTaskHandler.java b/server/libs/modules/components/map/src/main/java/com/bytechef/component/map/MapTaskDispatcherAdapterTaskHandler.java index 4e13a534667..f5ea4b27c0b 100644 --- a/server/libs/modules/components/map/src/main/java/com/bytechef/component/map/MapTaskDispatcherAdapterTaskHandler.java +++ b/server/libs/modules/components/map/src/main/java/com/bytechef/component/map/MapTaskDispatcherAdapterTaskHandler.java @@ -41,7 +41,7 @@ import com.bytechef.error.ExecutionError; import com.bytechef.evaluator.Evaluator; import com.bytechef.file.storage.base64.service.Base64FileStorageService; -import com.bytechef.message.broker.sync.SyncMessageBroker; +import com.bytechef.message.broker.memory.SyncMessageBroker; import com.bytechef.message.event.MessageEvent; import com.bytechef.task.dispatcher.map.MapTaskDispatcher; import java.util.ArrayList; diff --git a/server/libs/modules/components/map/src/test/java/com/bytechef/component/map/MapTaskDispatcherAdapterTaskHandlerTest.java b/server/libs/modules/components/map/src/test/java/com/bytechef/component/map/MapTaskDispatcherAdapterTaskHandlerTest.java index e0c15b9e577..5610b7461d1 100644 --- a/server/libs/modules/components/map/src/test/java/com/bytechef/component/map/MapTaskDispatcherAdapterTaskHandlerTest.java +++ b/server/libs/modules/components/map/src/test/java/com/bytechef/component/map/MapTaskDispatcherAdapterTaskHandlerTest.java @@ -39,7 +39,7 @@ import com.bytechef.evaluator.Evaluator; import com.bytechef.evaluator.SpelEvaluator; import com.bytechef.file.storage.base64.service.Base64FileStorageService; -import com.bytechef.message.broker.sync.SyncMessageBroker; +import com.bytechef.message.broker.memory.SyncMessageBroker; import com.bytechef.message.event.MessageEvent; import com.bytechef.test.extension.ObjectMapperSetupExtension; import java.util.Arrays; diff --git a/server/libs/modules/task-dispatchers/loop/src/test/resources/workflows/loop_v1_6.yaml b/server/libs/modules/task-dispatchers/loop/src/test/resources/workflows/loop_v1_6.yaml index deeadd1e23a..5dc06c493bf 100644 --- a/server/libs/modules/task-dispatchers/loop/src/test/resources/workflows/loop_v1_6.yaml +++ b/server/libs/modules/task-dispatchers/loop/src/test/resources/workflows/loop_v1_6.yaml @@ -20,7 +20,8 @@ tasks: value2: 5 combineOperation: "ANY" caseTrue: - - type: "loopBreak/v1" + - name: "loopBreak" + type: "loopBreak/v1" caseFalse: - name: "sumVar2" type: "var/v1/set" diff --git a/server/libs/platform/platform-component/platform-component-test-int-support/build.gradle.kts b/server/libs/platform/platform-component/platform-component-test-int-support/build.gradle.kts index cc7148a9acd..28498e458f0 100644 --- a/server/libs/platform/platform-component/platform-component-test-int-support/build.gradle.kts +++ b/server/libs/platform/platform-component/platform-component-test-int-support/build.gradle.kts @@ -15,7 +15,7 @@ dependencies { implementation(project(":server:libs:core:commons:commons-util")) implementation(project(":server:libs:core:evaluator:evaluator-impl")) implementation(project(":server:libs:core:encryption:encryption-api")) - implementation(project(":server:libs:core:message:message-broker:message-broker-sync")) + implementation(project(":server:libs:core:message:message-broker:message-broker-memory")) implementation(project(":server:libs:platform:platform-component:platform-component-api")) implementation(project(":server:libs:platform:platform-component:platform-component-context:platform-component-context-service")) implementation(project(":server:libs:platform:platform-component:platform-component-service")) diff --git a/server/libs/platform/platform-component/platform-component-test-int-support/src/main/java/com/bytechef/platform/component/test/ComponentJobTestExecutor.java b/server/libs/platform/platform-component/platform-component-test-int-support/src/main/java/com/bytechef/platform/component/test/ComponentJobTestExecutor.java index 3de1e94be61..9d0aa44db2d 100644 --- a/server/libs/platform/platform-component/platform-component-test-int-support/src/main/java/com/bytechef/platform/component/test/ComponentJobTestExecutor.java +++ b/server/libs/platform/platform-component/platform-component-test-int-support/src/main/java/com/bytechef/platform/component/test/ComponentJobTestExecutor.java @@ -29,6 +29,7 @@ import com.bytechef.commons.util.MapUtils; import com.bytechef.evaluator.Evaluator; import com.bytechef.file.storage.base64.service.Base64FileStorageService; +import com.bytechef.message.broker.memory.SyncMessageBroker; import com.bytechef.platform.component.constant.MetadataConstants; import com.bytechef.platform.constant.ModeType; import com.bytechef.platform.coordinator.job.JobSyncExecutor; @@ -73,9 +74,9 @@ public Job execute(String workflowId, Map inputs) { public Job execute(String workflowId, Map inputs, Map> taskHandlerMap) { JobSyncExecutor jobSyncExecutor = new JobSyncExecutor( - contextService, environment, evaluator, jobService, getTaskDispatcherPreSendProcessors(), - taskExecutionService, MapUtils.concat(this.taskHandlerMap, taskHandlerMap)::get, taskFileStorage, - workflowService); + contextService, environment, evaluator, jobService, -1, SyncMessageBroker::new, + getTaskDispatcherPreSendProcessors(), taskExecutionService, + MapUtils.concat(this.taskHandlerMap, taskHandlerMap)::get, taskFileStorage, -1, workflowService); return jobSyncExecutor.execute(new JobParametersDTO(workflowId, inputs)); } diff --git a/server/libs/platform/platform-coordinator/build.gradle.kts b/server/libs/platform/platform-coordinator/build.gradle.kts index 6b81e1f89cf..dad6382f0ce 100644 --- a/server/libs/platform/platform-coordinator/build.gradle.kts +++ b/server/libs/platform/platform-coordinator/build.gradle.kts @@ -13,7 +13,7 @@ dependencies { implementation(project(":server:libs:atlas:atlas-execution:atlas-execution-service")) implementation(project(":server:libs:atlas:atlas-worker:atlas-worker-impl")) implementation(project(":server:libs:core:commons:commons-util")) - implementation(project(":server:libs:core:message:message-broker:message-broker-sync")) + implementation(project(":server:libs:core:message:message-broker:message-broker-memory")) implementation(project(":server:libs:core:tenant:tenant-api")) implementation(project(":server:libs:platform:platform-api")) implementation(project(":server:libs:platform:platform-notification:platform-notification-api")) diff --git a/server/libs/platform/platform-coordinator/src/main/java/com/bytechef/platform/coordinator/job/JobSyncExecutor.java b/server/libs/platform/platform-coordinator/src/main/java/com/bytechef/platform/coordinator/job/JobSyncExecutor.java index 25102b44aa6..f9ebcead8d2 100644 --- a/server/libs/platform/platform-coordinator/src/main/java/com/bytechef/platform/coordinator/job/JobSyncExecutor.java +++ b/server/libs/platform/platform-coordinator/src/main/java/com/bytechef/platform/coordinator/job/JobSyncExecutor.java @@ -16,14 +16,19 @@ package com.bytechef.platform.coordinator.job; +import static com.bytechef.tenant.constant.TenantConstants.CURRENT_TENANT_ID; + import com.bytechef.atlas.configuration.domain.Workflow; import com.bytechef.atlas.configuration.service.WorkflowService; import com.bytechef.atlas.coordinator.TaskCoordinator; import com.bytechef.atlas.coordinator.event.ApplicationEvent; +import com.bytechef.atlas.coordinator.event.ErrorEvent; +import com.bytechef.atlas.coordinator.event.JobStatusApplicationEvent; import com.bytechef.atlas.coordinator.event.StartJobEvent; import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent; import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent; import com.bytechef.atlas.coordinator.event.listener.ApplicationEventListener; +import com.bytechef.atlas.coordinator.event.listener.TaskExecutionErrorEventListener; import com.bytechef.atlas.coordinator.event.listener.TaskStartedApplicationEventListener; import com.bytechef.atlas.coordinator.job.JobExecutor; import com.bytechef.atlas.coordinator.message.route.TaskCoordinatorMessageRoute; @@ -56,8 +61,11 @@ import com.bytechef.error.ExecutionError; import com.bytechef.evaluator.Evaluator; import com.bytechef.exception.ExecutionException; -import com.bytechef.message.broker.sync.SyncMessageBroker; +import com.bytechef.message.broker.MessageBroker; +import com.bytechef.message.broker.memory.MemoryMessageBroker; +import com.bytechef.message.broker.memory.MemoryMessageBroker.Receiver; import com.bytechef.message.event.MessageEvent; +import com.bytechef.message.route.MessageRoute; import com.bytechef.platform.coordinator.job.exception.TaskExecutionErrorType; import com.bytechef.platform.definition.WorkflowNodeType; import com.bytechef.platform.webhook.executor.constant.WebhookConstants; @@ -66,8 +74,14 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Stream; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; @@ -86,37 +100,48 @@ public class JobSyncExecutor { private static final Logger logger = LoggerFactory.getLogger(JobSyncExecutor.class); private static final List WEBHOOK_COMPONENTS = List.of("apiPlatform", "chat", "webhook"); + private static final int NO_TIMEOUT = -1; + private static final int UNLIMITED_TASK_EXECUTIONS = -1; private final ContextService contextService; + private final Map jobCompletionLatches = new ConcurrentHashMap<>(); private final ApplicationEventPublisher eventPublisher; private final JobFacade jobFacade; private final JobService jobService; private final TaskExecutionService taskExecutionService; private final TaskFileStorage taskFileStorage; private final WorkflowService workflowService; + private final long timeout; public JobSyncExecutor( ContextService contextService, Environment environment, Evaluator evaluator, JobService jobService, - List taskDispatcherPreSendProcessors, TaskExecutionService taskExecutionService, - TaskHandlerRegistry taskHandlerRegistry, TaskFileStorage taskFileStorage, WorkflowService workflowService) { + int maxTaskExecutions, Supplier memoryMessageBrokerSupplier, + List taskDispatcherPreSendProcessors, + TaskExecutionService taskExecutionService, TaskHandlerRegistry taskHandlerRegistry, + TaskFileStorage taskFileStorage, long timeout, WorkflowService workflowService) { this( - contextService, environment, evaluator, jobService, new SyncMessageBroker(), List.of(), List.of(), - taskDispatcherPreSendProcessors, List.of(), taskExecutionService, taskHandlerRegistry, taskFileStorage, - workflowService); + contextService, environment, evaluator, jobService, maxTaskExecutions, memoryMessageBrokerSupplier, + List.of(), List.of(), taskDispatcherPreSendProcessors, List.of(), taskExecutionService, taskHandlerRegistry, + taskFileStorage, timeout, workflowService); } @SuppressFBWarnings("EI") public JobSyncExecutor( ContextService contextService, Environment environment, Evaluator evaluator, JobService jobService, - SyncMessageBroker syncMessageBroker, List taskCompletionHandlerFactories, + int maxTaskExecutions, Supplier memoryMessageBrokerSupplier, + List taskCompletionHandlerFactories, List taskDispatcherAdapterFactories, List taskDispatcherPreSendProcessors, List taskDispatcherResolverFactories, TaskExecutionService taskExecutionService, - TaskHandlerRegistry taskHandlerRegistry, TaskFileStorage taskFileStorage, WorkflowService workflowService) { + TaskHandlerRegistry taskHandlerRegistry, TaskFileStorage taskFileStorage, long timeout, + WorkflowService workflowService) { this.contextService = contextService; - this.eventPublisher = createEventPublisher(syncMessageBroker); + + MemoryMessageBroker memoryMessageBroker = memoryMessageBrokerSupplier.get(); + + this.eventPublisher = createEventPublisher(memoryMessageBroker); this.jobFacade = new JobFacadeImpl( eventPublisher, contextService, jobService, taskExecutionService, taskFileStorage, workflowService); @@ -124,23 +149,32 @@ public JobSyncExecutor( this.jobService = jobService; this.taskExecutionService = taskExecutionService; this.taskFileStorage = taskFileStorage; + this.timeout = timeout; this.workflowService = workflowService; - syncMessageBroker.receive( - TaskCoordinatorMessageRoute.ERROR_EVENTS, event -> { - TaskExecution erroredTaskExecution = ((TaskExecutionErrorEvent) event).getTaskExecution(); - if (erroredTaskExecution.getError() != null) { - erroredTaskExecution.setStatus(TaskExecution.Status.FAILED); - } + TaskExecutionErrorEventListener taskExecutionErrorEventListener = new TaskExecutionErrorEventListener( + eventPublisher, jobService, null, taskExecutionService); + + receive( + memoryMessageBroker, TaskCoordinatorMessageRoute.ERROR_EVENTS, + event -> { + ErrorEvent errorEvent = (ErrorEvent) event; + + if (errorEvent instanceof TaskExecutionErrorEvent taskExecutionErrorEvent) { + TaskExecution taskExecution = taskExecutionErrorEvent.getTaskExecution(); - taskExecutionService.update(erroredTaskExecution); + Optional jobOptional = jobService.fetchJob( + Validate.notNull(taskExecution.getJobId(), "jobId")); - ExecutionError error = erroredTaskExecution.getError(); + if (jobOptional.isEmpty()) { + return; + } + } - logger.error(error.getMessage()); + taskExecutionErrorEventListener.onErrorEvent(errorEvent); }); - syncMessageBroker.receive(TaskCoordinatorMessageRoute.JOB_STOP_EVENTS, event -> {}); + receive(memoryMessageBroker, TaskCoordinatorMessageRoute.JOB_STOP_EVENTS, event -> {}); TaskHandlerResolverChain taskHandlerResolverChain = new TaskHandlerResolverChain(); @@ -149,19 +183,47 @@ public JobSyncExecutor( new TaskDispatcherAdapterTaskHandlerResolver(taskDispatcherAdapterFactories, taskHandlerResolverChain), new DefaultTaskHandlerResolver(taskHandlerRegistry))); - TaskWorker worker = new TaskWorker( - evaluator, eventPublisher, new JobSyncAsyncTaskExecutor(environment), taskHandlerResolverChain, - taskFileStorage); + JobSyncAsyncTaskExecutor jobSyncAsyncTaskExecutor = new JobSyncAsyncTaskExecutor( + environment, maxTaskExecutions); - syncMessageBroker.receive( - TaskWorkerMessageRoute.TASK_EXECUTION_EVENTS, e -> worker.onTaskExecutionEvent((TaskExecutionEvent) e)); + TaskWorker taskWorker = new TaskWorker( + evaluator, eventPublisher, jobSyncAsyncTaskExecutor, taskHandlerResolverChain, taskFileStorage); + + MemoryMessageBroker coordinatorMessageBroker = memoryMessageBrokerSupplier.get(); + + receive( + coordinatorMessageBroker, TaskWorkerMessageRoute.TASK_EXECUTION_EVENTS, event -> { + TaskExecutionEvent taskExecutionEvent = (TaskExecutionEvent) event; + + if (maxTaskExecutions != UNLIMITED_TASK_EXECUTIONS) { + jobSyncAsyncTaskExecutor.incrementAndCheck(taskExecutionEvent); + } + + TaskExecution taskExecution = taskExecutionEvent.getTaskExecution(); + + long jobId = Validate.notNull(taskExecution.getJobId(), "jobId"); + + Job job = jobService.getJob(jobId); + + if (job.getStatus() == Job.Status.FAILED) { + if (logger.isDebugEnabled()) { + logger.debug("Skipping task scheduling for FAILED job: {}", jobId); + } + + return; + } + + taskWorker.onTaskExecutionEvent(taskExecutionEvent); + }); TaskDispatcherChain taskDispatcherChain = new TaskDispatcherChain(); taskDispatcherChain.setTaskDispatcherResolvers( CollectionUtils.concat( getTaskDispatcherResolverStream(taskDispatcherResolverFactories, taskDispatcherChain), - Stream.of(new DefaultTaskDispatcher(eventPublisher, taskDispatcherPreSendProcessors)))); + Stream.of( + new DefaultTaskDispatcher( + createEventPublisher(coordinatorMessageBroker), taskDispatcherPreSendProcessors)))); JobExecutor jobExecutor = new JobExecutor( contextService, evaluator, taskDispatcherChain, taskExecutionService, taskFileStorage, workflowService); @@ -183,13 +245,33 @@ evaluator, eventPublisher, new JobSyncAsyncTaskExecutor(environment), taskHandle getApplicationEventListeners(taskExecutionService, jobService), List.of(), eventPublisher, jobExecutor, jobService, taskCompletionHandlerChain, taskDispatcherChain, taskExecutionService); - syncMessageBroker.receive(TaskCoordinatorMessageRoute.APPLICATION_EVENTS, + receive( + memoryMessageBroker, TaskCoordinatorMessageRoute.APPLICATION_EVENTS, event -> taskCoordinator.onApplicationEvent((ApplicationEvent) event)); - syncMessageBroker.receive( - TaskCoordinatorMessageRoute.TASK_EXECUTION_COMPLETE_EVENTS, - e -> taskCoordinator.onTaskExecutionCompleteEvent((TaskExecutionCompleteEvent) e)); - syncMessageBroker.receive(TaskCoordinatorMessageRoute.JOB_START_EVENTS, - e -> taskCoordinator.onStartJobEvent((StartJobEvent) e)); + receive( + memoryMessageBroker, TaskCoordinatorMessageRoute.APPLICATION_EVENTS, event -> { + if (event instanceof JobStatusApplicationEvent jobStatusEvent) { + long jobId = jobStatusEvent.getJobId(); + + Job.Status status = jobStatusEvent.getStatus(); + + if (status == Job.Status.COMPLETED || status == Job.Status.FAILED || status == Job.Status.STOPPED) { + CountDownLatch latch = jobCompletionLatches.get(getKey(jobId)); + + if (latch != null) { + latch.countDown(); + } + + jobSyncAsyncTaskExecutor.clearCounter(jobId); + } + } + }); + receive( + memoryMessageBroker, TaskCoordinatorMessageRoute.TASK_EXECUTION_COMPLETE_EVENTS, + event -> taskCoordinator.onTaskExecutionCompleteEvent((TaskExecutionCompleteEvent) event)); + receive( + memoryMessageBroker, TaskCoordinatorMessageRoute.JOB_START_EVENTS, + event -> taskCoordinator.onStartJobEvent((StartJobEvent) event)); } public Job execute(JobParametersDTO jobParametersDTO) { @@ -204,13 +286,25 @@ eventPublisher, contextService, new JobServiceWrapper(jobFactoryFunction), taskE return execute(jobParametersDTO, jobFacade); } - private static ApplicationEventPublisher createEventPublisher(SyncMessageBroker syncMessageBroker) { - return event -> syncMessageBroker.send(((MessageEvent) event).getRoute(), event); + private static ApplicationEventPublisher createEventPublisher(MessageBroker messageBroker) { + return event -> { + MessageEvent messageEvent = (MessageEvent) event; + + messageEvent.putMetadata(CURRENT_TENANT_ID, TenantContext.getCurrentTenantId()); + + messageBroker.send(messageEvent.getRoute(), messageEvent); + }; } private Job execute(JobParametersDTO jobParametersDTO, JobFacade jobFacade) { Job job = jobService.getJob(jobFacade.createJob(jobParametersDTO)); + long jobId = Validate.notNull(job.getId(), "id"); + + waitForJobCompletion(jobId); + + job = jobService.getJob(jobId); + checkForError(job); return checkForWebhookResponse(job); @@ -222,6 +316,10 @@ private List getApplicationEventListeners( return List.of(new TaskStartedApplicationEventListener(taskExecutionService, task -> {}, jobService)); } + private static String getKey(long jobId) { + return TenantContext.getCurrentTenantId() + "_" + jobId; + } + private static Stream getTaskDispatcherResolverStream( List taskDispatcherResolverFactories, TaskDispatcherChain taskDispatcherChain) { @@ -229,6 +327,14 @@ private static Stream getTaskDispatcherResolverStream( .map(taskDispatcherFactory -> taskDispatcherFactory.createTaskDispatcherResolver(taskDispatcherChain)); } + public void receive(MemoryMessageBroker messageBroker, MessageRoute messageRoute, Receiver receiver) { + messageBroker.receive(messageRoute, message -> { + TenantContext.setCurrentTenantId((String) ((MessageEvent) message).getMetadata(CURRENT_TENANT_ID)); + + receiver.receive(message); + }); + } + @FunctionalInterface public interface JobFactoryFunction { @@ -268,6 +374,56 @@ private Job checkForWebhookResponse(Job job) { .orElse(job); } + private void waitForJobCompletion(long jobId) { + Job job = jobService.getJob(jobId); + + if (job.getStatus() == Job.Status.COMPLETED || job.getStatus() == Job.Status.FAILED) { + return; + } + + CountDownLatch latch = jobCompletionLatches.computeIfAbsent(getKey(jobId), id -> new CountDownLatch(1)); + + try { + Optional last = taskExecutionService.fetchLastJobTaskExecution(jobId); + + if (last.isPresent()) { + TaskExecution taskExecution = last.get(); + + TaskExecution.Status status = taskExecution.getStatus(); + + if (status.isTerminated()) { + jobCompletionLatches.remove(getKey(jobId)); + + return; + } + } + } catch (Exception e) { + if (logger.isTraceEnabled()) { + logger.trace(e.getMessage(), e); + } + } + + try { + if (timeout == NO_TIMEOUT) { + latch.await(); + } else { + if (!latch.await(timeout, TimeUnit.SECONDS)) { + throw new TimeoutException("Timeout waiting for job completion: " + jobId); + } + } + } catch (InterruptedException | TimeoutException exception) { + if (logger.isTraceEnabled()) { + logger.trace(exception.getMessage()); + } + + job.setStatus(Job.Status.FAILED); + + jobService.update(job); + } finally { + jobCompletionLatches.remove(getKey(jobId)); + } + } + private record JobServiceWrapper(JobFactoryFunction jobFactoryFunction) implements JobService { @Override @@ -295,6 +451,11 @@ public void deleteJob(long id) { throw new UnsupportedOperationException(); } + @Override + public Optional fetchJob(Long id) { + throw new UnsupportedOperationException(); + } + @Override public Optional fetchLastJob() { throw new UnsupportedOperationException(); @@ -331,11 +492,14 @@ public Job update(Job job) { } } - private static class JobSyncAsyncTaskExecutor implements AsyncTaskExecutor { + private class JobSyncAsyncTaskExecutor implements AsyncTaskExecutor { private final Executor executor; + private final int maxTaskExecutions; + private final Map taskExecutionCounters = new ConcurrentHashMap<>(); - private JobSyncAsyncTaskExecutor(Environment environment) { + private JobSyncAsyncTaskExecutor(Environment environment, int maxTaskExecutions) { + this.maxTaskExecutions = maxTaskExecutions; if (Threading.VIRTUAL.isActive(environment)) { executor = Executors.newVirtualThreadPerTaskExecutor(); } else { @@ -347,18 +511,40 @@ private JobSyncAsyncTaskExecutor(Environment environment) { public void execute(Runnable task) { String tenantId = TenantContext.getCurrentTenantId(); - executor.execute( - () -> { - String currentTenantId = TenantContext.getCurrentTenantId(); + executor.execute(() -> { + String currentTenantId = TenantContext.getCurrentTenantId(); - try { - TenantContext.setCurrentTenantId(tenantId); + try { + TenantContext.setCurrentTenantId(tenantId); - task.run(); - } finally { - TenantContext.setCurrentTenantId(currentTenantId); - } - }); + task.run(); + } finally { + TenantContext.setCurrentTenantId(currentTenantId); + } + }); + } + + private void incrementAndCheck(TaskExecutionEvent taskExecutionEvent) { + TaskExecution taskExecution = taskExecutionEvent.getTaskExecution(); + + AtomicInteger taskExecutionCounter = taskExecutionCounters.computeIfAbsent( + getKey(Validate.notNull(taskExecution.getJobId(), "jobId")), (key) -> new AtomicInteger(0)); + + if (taskExecutionCounter.incrementAndGet() > maxTaskExecutions) { + taskExecution.setError( + new ExecutionError( + String.format( + "Maximum number of task executions (%d) exceeded in the workflow builder", + maxTaskExecutions), + List.of())); + taskExecution.setStatus(TaskExecution.Status.FAILED); + + eventPublisher.publishEvent(new TaskExecutionErrorEvent(taskExecution)); + } + } + + private void clearCounter(long jobId) { + taskExecutionCounters.remove(getKey(jobId)); } } } diff --git a/server/libs/platform/platform-webhook/platform-webhook-impl/build.gradle.kts b/server/libs/platform/platform-webhook/platform-webhook-impl/build.gradle.kts index 3cc037983da..1bb8a8102fe 100644 --- a/server/libs/platform/platform-webhook/platform-webhook-impl/build.gradle.kts +++ b/server/libs/platform/platform-webhook/platform-webhook-impl/build.gradle.kts @@ -5,7 +5,8 @@ dependencies { implementation(project(":server:libs:atlas:atlas-coordinator:atlas-coordinator-api")) implementation(project(":server:libs:atlas:atlas-worker:atlas-worker-api")) implementation(project(":server:libs:core:commons:commons-util")) - implementation(project(":server:libs:core:message:message-broker:message-broker-sync")) + implementation(project(":server:libs:core:message:message-broker:message-broker-memory")) + implementation(project(":server:libs:core:tenant:tenant-api")) implementation(project(":server:libs:platform:platform-workflow:platform-workflow-coordinator:platform-workflow-coordinator-api")) implementation(project(":server:libs:platform:platform-workflow:platform-workflow-execution:platform-workflow-execution-api")) implementation(project(":server:libs:platform:platform-file-storage:platform-file-storage-api")) diff --git a/server/libs/platform/platform-webhook/platform-webhook-impl/src/main/java/com/bytechef/platform/webhook/executor/config/WebhookConfiguration.java b/server/libs/platform/platform-webhook/platform-webhook-impl/src/main/java/com/bytechef/platform/webhook/executor/config/WebhookConfiguration.java index 08366650130..e5e73dd8d98 100644 --- a/server/libs/platform/platform-webhook/platform-webhook-impl/src/main/java/com/bytechef/platform/webhook/executor/config/WebhookConfiguration.java +++ b/server/libs/platform/platform-webhook/platform-webhook-impl/src/main/java/com/bytechef/platform/webhook/executor/config/WebhookConfiguration.java @@ -16,6 +16,8 @@ package com.bytechef.platform.webhook.executor.config; +import static com.bytechef.tenant.constant.TenantConstants.CURRENT_TENANT_ID; + import com.bytechef.atlas.configuration.service.WorkflowService; import com.bytechef.atlas.coordinator.task.completion.TaskCompletionHandlerFactory; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherPreSendProcessor; @@ -32,7 +34,8 @@ import com.bytechef.component.map.MapTaskDispatcherAdapterTaskHandler; import com.bytechef.component.map.constant.MapConstants; import com.bytechef.evaluator.Evaluator; -import com.bytechef.message.broker.sync.SyncMessageBroker; +import com.bytechef.message.broker.MessageBroker; +import com.bytechef.message.broker.memory.AsyncMessageBroker; import com.bytechef.message.event.MessageEvent; import com.bytechef.platform.configuration.accessor.JobPrincipalAccessorRegistry; import com.bytechef.platform.coordinator.job.JobSyncExecutor; @@ -56,6 +59,7 @@ import com.bytechef.task.dispatcher.map.completion.MapTaskCompletionHandler; import com.bytechef.task.dispatcher.parallel.ParallelTaskDispatcher; import com.bytechef.task.dispatcher.parallel.completion.ParallelTaskCompletionHandler; +import com.bytechef.tenant.TenantContext; import java.util.List; import org.springframework.cache.CacheManager; import org.springframework.context.ApplicationEventPublisher; @@ -79,25 +83,31 @@ WebhookWorkflowExecutor webhookExecutor( WebhookWorkflowSyncExecutor triggerSyncExecutor, TaskFileStorage taskFileStorage, WorkflowService workflowService) { - SyncMessageBroker syncMessageBroker = new SyncMessageBroker(); + AsyncMessageBroker asyncMessageBroker = new AsyncMessageBroker(environment); return new WebhookWorkflowExecutorImpl( eventPublisher, jobPrincipalAccessorRegistry, principalJobFacade, new JobSyncExecutor( - contextService, environment, evaluator, jobService, syncMessageBroker, + contextService, environment, evaluator, jobService, -1, () -> asyncMessageBroker, getTaskCompletionHandlerFactories( contextService, counterService, evaluator, taskExecutionService, taskFileStorage), getTaskDispatcherAdapterFactories(cacheManager, evaluator), taskDispatcherPreSendProcessors, getTaskDispatcherResolverFactories( - contextService, counterService, evaluator, jobService, syncMessageBroker, taskExecutionService, + contextService, counterService, evaluator, jobService, asyncMessageBroker, taskExecutionService, taskFileStorage), - taskExecutionService, taskHandlerRegistry, taskFileStorage, workflowService), + taskExecutionService, taskHandlerRegistry, taskFileStorage, 300, workflowService), triggerSyncExecutor, taskFileStorage); } - private static ApplicationEventPublisher getEventPublisher(SyncMessageBroker syncMessageBroker) { - return event -> syncMessageBroker.send(((MessageEvent) event).getRoute(), event); + private static ApplicationEventPublisher createEventPublisher(MessageBroker messageBroker) { + return event -> { + MessageEvent messageEvent = (MessageEvent) event; + + messageEvent.putMetadata(CURRENT_TENANT_ID, TenantContext.getCurrentTenantId()); + + messageBroker.send(((MessageEvent) event).getRoute(), event); + }; } private List getTaskCompletionHandlerFactories( @@ -146,10 +156,10 @@ public String getName() { private List getTaskDispatcherResolverFactories( ContextService contextService, CounterService counterService, Evaluator evaluator, JobService jobService, - SyncMessageBroker syncMessageBroker, TaskExecutionService taskExecutionService, + AsyncMessageBroker asyncMessageBroker, TaskExecutionService taskExecutionService, TaskFileStorage taskFileStorage) { - ApplicationEventPublisher eventPublisher = getEventPublisher(syncMessageBroker); + ApplicationEventPublisher eventPublisher = createEventPublisher(asyncMessageBroker); return List.of( (taskDispatcher) -> new WaitForApprovalTaskDispatcher(eventPublisher, jobService, taskExecutionService), diff --git a/server/libs/platform/platform-workflow/platform-workflow-task-dispatcher/platform-workflow-task-dispatcher-test-int-support/build.gradle.kts b/server/libs/platform/platform-workflow/platform-workflow-task-dispatcher/platform-workflow-task-dispatcher-test-int-support/build.gradle.kts index 71374870f3f..2859e4adae0 100644 --- a/server/libs/platform/platform-workflow/platform-workflow-task-dispatcher/platform-workflow-task-dispatcher-test-int-support/build.gradle.kts +++ b/server/libs/platform/platform-workflow/platform-workflow-task-dispatcher/platform-workflow-task-dispatcher-test-int-support/build.gradle.kts @@ -14,7 +14,7 @@ dependencies { implementation(project(":server:libs:core:commons:commons-util")) implementation(project(":server:libs:core:evaluator:evaluator-impl")) implementation(project(":server:libs:core:file-storage:file-storage-base64-service")) - implementation(project(":server:libs:core:message:message-broker:message-broker-sync")) + implementation(project(":server:libs:core:message:message-broker:message-broker-memory")) implementation(project(":server:libs:platform:platform-file-storage:platform-file-storage-impl")) implementation(project(":server:libs:platform:platform-coordinator")) implementation(project(":server:libs:test:test-int-support")) diff --git a/server/libs/platform/platform-workflow/platform-workflow-task-dispatcher/platform-workflow-task-dispatcher-test-int-support/src/main/java/com/bytechef/platform/workflow/task/dispatcher/test/workflow/TaskDispatcherJobTestExecutor.java b/server/libs/platform/platform-workflow/platform-workflow-task-dispatcher/platform-workflow-task-dispatcher-test-int-support/src/main/java/com/bytechef/platform/workflow/task/dispatcher/test/workflow/TaskDispatcherJobTestExecutor.java index 53e0dec35d3..005d2db0ed6 100644 --- a/server/libs/platform/platform-workflow/platform-workflow-task-dispatcher/platform-workflow-task-dispatcher-test-int-support/src/main/java/com/bytechef/platform/workflow/task/dispatcher/test/workflow/TaskDispatcherJobTestExecutor.java +++ b/server/libs/platform/platform-workflow/platform-workflow-task-dispatcher/platform-workflow-task-dispatcher-test-int-support/src/main/java/com/bytechef/platform/workflow/task/dispatcher/test/workflow/TaskDispatcherJobTestExecutor.java @@ -30,7 +30,7 @@ import com.bytechef.atlas.worker.task.handler.TaskHandler; import com.bytechef.error.ExecutionError; import com.bytechef.evaluator.SpelEvaluator; -import com.bytechef.message.broker.sync.SyncMessageBroker; +import com.bytechef.message.broker.memory.SyncMessageBroker; import com.bytechef.message.event.MessageEvent; import com.bytechef.platform.coordinator.job.JobSyncExecutor; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -84,13 +84,13 @@ public Job execute( SyncMessageBroker syncMessageBroker = new SyncMessageBroker(); JobSyncExecutor jobSyncExecutor = new JobSyncExecutor( - contextService, environment, SpelEvaluator.create(), jobService, syncMessageBroker, - taskCompletionHandlerFactoriesFunction.apply(counterService, taskExecutionService), - List.of(), List.of(), + contextService, environment, SpelEvaluator.create(), jobService, -1, () -> syncMessageBroker, + taskCompletionHandlerFactoriesFunction.apply(counterService, taskExecutionService), List.of(), List.of(), taskDispatcherResolverFactoriesFunction.apply( event -> syncMessageBroker.send(((MessageEvent) event).getRoute(), event), contextService, counterService, taskExecutionService), - taskExecutionService, taskHandlerMapSupplier.get()::get, taskFileStorage, workflowService); + taskExecutionService, + taskHandlerMapSupplier.get()::get, taskFileStorage, -1, workflowService); return jobSyncExecutor.execute(new JobParametersDTO(workflowId, inputs)); } diff --git a/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/build.gradle.kts b/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/build.gradle.kts index d3cb26c0c78..18c87e4b00c 100644 --- a/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/build.gradle.kts +++ b/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/build.gradle.kts @@ -9,8 +9,9 @@ dependencies { implementation(project(":server:libs:atlas:atlas-worker:atlas-worker-api")) implementation(project(":server:libs:core:commons:commons-util")) implementation(project(":server:libs:core:commons:commons-data")) - implementation(project(":server:libs:core:message:message-broker:message-broker-sync")) + implementation(project(":server:libs:core:message:message-broker:message-broker-memory")) implementation(project(":server:libs:core:file-storage:file-storage-base64-service")) + implementation(project(":server:libs:core:tenant:tenant-api")) implementation(project(":server:libs:platform:platform-configuration:platform-configuration-api")) implementation(project(":server:libs:platform:platform-file-storage:platform-file-storage-api")) implementation(project(":server:libs:platform:platform-coordinator")) diff --git a/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/src/main/java/com/bytechef/platform/workflow/test/config/TestExecutorConfiguration.java b/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/src/main/java/com/bytechef/platform/workflow/test/config/TestExecutorConfiguration.java index a9c39454beb..63d48883449 100644 --- a/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/src/main/java/com/bytechef/platform/workflow/test/config/TestExecutorConfiguration.java +++ b/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/src/main/java/com/bytechef/platform/workflow/test/config/TestExecutorConfiguration.java @@ -16,6 +16,8 @@ package com.bytechef.platform.workflow.test.config; +import static com.bytechef.tenant.constant.TenantConstants.CURRENT_TENANT_ID; + import com.bytechef.atlas.configuration.service.WorkflowService; import com.bytechef.atlas.coordinator.task.completion.TaskCompletionHandlerFactory; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherResolverFactory; @@ -41,7 +43,8 @@ import com.bytechef.component.map.constant.MapConstants; import com.bytechef.evaluator.Evaluator; import com.bytechef.file.storage.base64.service.Base64FileStorageService; -import com.bytechef.message.broker.sync.SyncMessageBroker; +import com.bytechef.message.broker.MessageBroker; +import com.bytechef.message.broker.memory.AsyncMessageBroker; import com.bytechef.message.event.MessageEvent; import com.bytechef.platform.component.service.ComponentDefinitionService; import com.bytechef.platform.coordinator.job.JobSyncExecutor; @@ -64,6 +67,7 @@ import com.bytechef.task.dispatcher.map.completion.MapTaskCompletionHandler; import com.bytechef.task.dispatcher.parallel.ParallelTaskDispatcher; import com.bytechef.task.dispatcher.parallel.completion.ParallelTaskCompletionHandler; +import com.bytechef.tenant.TenantContext; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; import org.springframework.cache.CacheManager; @@ -86,7 +90,7 @@ JobTestExecutor jobTestExecutor( ContextService contextService = new ContextServiceImpl(new InMemoryContextRepository(cacheManager)); CounterService counterService = new CounterServiceImpl(new InMemoryCounterRepository(cacheManager)); - SyncMessageBroker syncMessageBroker = new SyncMessageBroker(); + AsyncMessageBroker asyncMessageBroker = new AsyncMessageBroker(environment); InMemoryTaskExecutionRepository taskExecutionRepository = new InMemoryTaskExecutionRepository(cacheManager); @@ -97,22 +101,29 @@ JobTestExecutor jobTestExecutor( TaskFileStorage taskFileStorage = new TaskFileStorageImpl(new Base64FileStorageService()); return new JobTestExecutor( - componentDefinitionService, contextService, evaluator, + componentDefinitionService, contextService, evaluator, jobService, new JobSyncExecutor( - contextService, environment, evaluator, jobService, syncMessageBroker, + contextService, environment, evaluator, jobService, 1000, () -> asyncMessageBroker, getTaskCompletionHandlerFactories( contextService, counterService, evaluator, taskExecutionService, taskFileStorage), - getTaskDispatcherAdapterFactories(cacheManager, evaluator), + getTaskDispatcherAdapterFactories( + cacheManager, evaluator), List.of(new TestTaskDispatcherPreSendProcessor(jobService)), getTaskDispatcherResolverFactories( - contextService, counterService, evaluator, jobService, syncMessageBroker, - taskExecutionService, taskFileStorage), - taskExecutionService, taskHandlerRegistry, taskFileStorage, workflowService), + contextService, counterService, evaluator, jobService, asyncMessageBroker, taskExecutionService, + taskFileStorage), + taskExecutionService, taskHandlerRegistry, taskFileStorage, 300, workflowService), taskDispatcherDefinitionService, taskExecutionService, taskFileStorage); } - private static ApplicationEventPublisher getEventPublisher(SyncMessageBroker syncMessageBroker) { - return event -> syncMessageBroker.send(((MessageEvent) event).getRoute(), event); + private static ApplicationEventPublisher createEventPublisher(MessageBroker messageBroker) { + return event -> { + MessageEvent messageEvent = (MessageEvent) event; + + messageEvent.putMetadata(CURRENT_TENANT_ID, TenantContext.getCurrentTenantId()); + + messageBroker.send(((MessageEvent) event).getRoute(), event); + }; } private List getTaskCompletionHandlerFactories( @@ -161,10 +172,10 @@ public String getName() { private List getTaskDispatcherResolverFactories( ContextService contextService, CounterService counterService, Evaluator evaluator, JobService jobService, - SyncMessageBroker syncMessageBroker, TaskExecutionService taskExecutionService, + AsyncMessageBroker asyncMessageBroker, TaskExecutionService taskExecutionService, TaskFileStorage taskFileStorage) { - ApplicationEventPublisher eventPublisher = getEventPublisher(syncMessageBroker); + ApplicationEventPublisher eventPublisher = createEventPublisher(asyncMessageBroker); return List.of( (taskDispatcher) -> new WaitForApprovalTaskDispatcher(eventPublisher, jobService, taskExecutionService), diff --git a/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/src/main/java/com/bytechef/platform/workflow/test/executor/JobTestExecutor.java b/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/src/main/java/com/bytechef/platform/workflow/test/executor/JobTestExecutor.java index 123d7cc0b7a..1bb8a6b32e6 100644 --- a/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/src/main/java/com/bytechef/platform/workflow/test/executor/JobTestExecutor.java +++ b/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/src/main/java/com/bytechef/platform/workflow/test/executor/JobTestExecutor.java @@ -22,6 +22,7 @@ import com.bytechef.atlas.execution.domain.TaskExecution; import com.bytechef.atlas.execution.dto.JobParametersDTO; import com.bytechef.atlas.execution.service.ContextService; +import com.bytechef.atlas.execution.service.JobService; import com.bytechef.atlas.execution.service.TaskExecutionService; import com.bytechef.atlas.file.storage.TaskFileStorage; import com.bytechef.commons.util.CollectionUtils; @@ -50,6 +51,7 @@ public class JobTestExecutor { private final ComponentDefinitionService componentDefinitionService; private final ContextService contextService; private final Evaluator evaluator; + private final JobService jobService; private final JobSyncExecutor jobSyncExecutor; private final TaskDispatcherDefinitionService taskDispatcherDefinitionService; private final TaskExecutionService taskExecutionService; @@ -58,12 +60,14 @@ public class JobTestExecutor { @SuppressFBWarnings("EI") public JobTestExecutor( ComponentDefinitionService componentDefinitionService, ContextService contextService, Evaluator evaluator, - JobSyncExecutor jobSyncExecutor, TaskDispatcherDefinitionService taskDispatcherDefinitionService, - TaskExecutionService taskExecutionService, TaskFileStorage taskFileStorage) { + JobService jobService, JobSyncExecutor jobSyncExecutor, + TaskDispatcherDefinitionService taskDispatcherDefinitionService, TaskExecutionService taskExecutionService, + TaskFileStorage taskFileStorage) { this.componentDefinitionService = componentDefinitionService; this.contextService = contextService; this.evaluator = evaluator; + this.jobService = jobService; this.jobSyncExecutor = jobSyncExecutor; this.taskDispatcherDefinitionService = taskDispatcherDefinitionService; this.taskExecutionService = taskExecutionService; @@ -73,26 +77,30 @@ public JobTestExecutor( public JobDTO execute(JobParametersDTO jobParametersDTO) { Job job = jobSyncExecutor.execute(jobParametersDTO); - return new JobDTO( - job, getOutputs(job), - CollectionUtils.map( - taskExecutionService.getJobTaskExecutions(Validate.notNull(job.getId(), "id")), - taskExecution -> { - Map context = taskFileStorage.readContextValue( - contextService.peek( - Validate.notNull(taskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION)); - - WorkflowTask workflowTask = taskExecution.getWorkflowTask(); - DefinitionResult definitionResult = getDefinition(taskExecution); - - Object output = taskExecution.getOutput() == null - ? null - : taskFileStorage.readTaskExecutionOutput(taskExecution.getOutput()); - - return new TaskExecutionDTO( - taskExecution, definitionResult.title(), definitionResult.icon(), - workflowTask.evaluateParameters(context, evaluator), output); - })); + try { + return new JobDTO( + job, getOutputs(job), + CollectionUtils.map( + taskExecutionService.getJobTaskExecutions(Validate.notNull(job.getId(), "id")), + taskExecution -> { + Map context = taskFileStorage.readContextValue( + contextService.peek( + Validate.notNull(taskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION)); + + WorkflowTask workflowTask = taskExecution.getWorkflowTask(); + DefinitionResult definitionResult = getDefinition(taskExecution); + + Object output = taskExecution.getOutput() == null + ? null + : taskFileStorage.readTaskExecutionOutput(taskExecution.getOutput()); + + return new TaskExecutionDTO( + taskExecution, definitionResult.title(), definitionResult.icon(), + workflowTask.evaluateParameters(context, evaluator), output); + })); + } finally { + jobService.deleteJob(Validate.notNull(job.getId(), "id")); + } } private DefinitionResult getDefinition(TaskExecution taskExecution) { diff --git a/settings.gradle.kts b/settings.gradle.kts index 52d5f6511cf..a80604ea058 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -110,7 +110,7 @@ include("server:libs:core:message:message-broker:message-broker-api") include("server:libs:core:message:message-broker:message-broker-jms") include("server:libs:core:message:message-broker:message-broker-kafka") include("server:libs:core:message:message-broker:message-broker-redis") -include("server:libs:core:message:message-broker:message-broker-sync") +include("server:libs:core:message:message-broker:message-broker-memory") include("server:libs:core:message:message-event:message-event-api") include("server:libs:core:message:message-event:message-event-impl") include("server:libs:core:rest:rest-api")