Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,46 +1,46 @@
[versions]
checkstyle = "11.1.0"
com-google-auto-service = "1.1.1"
graalvm = "25.0.0"
graalvm = "25.0.1"
jackson = "2.19.2"
jacoco = "0.8.13"
java = "25"
org-mapstruct = "1.6.3"
org-mapstruct-extensions-spring = "1.1.3"
org-springdoc = "2.8.13"
org-springdoc = "2.8.14"
pmd = "7.17.0"
spotbugs = "4.9.7"
spring-ai = "1.1.0-RC1"
spotbugs = "4.9.8"
spring-ai = "1.1.0"
spring-boot = "3.5.7"
spring-cloud-aws = "3.4.0"
spring-cloud-dependencies = "2025.0.0"
spring-shell = "3.4.1"

[libraries]
com-github-miachm-sods-sods = "com.github.miachm.sods:SODS:1.7.0"
com-github-mizosoft-methanol = "com.github.mizosoft.methanol:methanol:1.8.3"
com-github-mizosoft-methanol = "com.github.mizosoft.methanol:methanol:1.8.4"
com-github-spotbugs-spotbugs-annotations = { module = "com.github.spotbugs:spotbugs-annotations", version.ref = "spotbugs" }
com-google-auto-service-auto-service = { module = "com.google.auto.service:auto-service", version.ref = "com-google-auto-service" }
com-google-auto-service-auto-service-annotations = { module = "com.google.auto.service:auto-service-annotations", version.ref = "com-google-auto-service" }
com-squareup-javapoet = "com.squareup:javapoet:1.13.0"
commons-validator = "commons-validator:commons-validator:1.10.0"
io-swagger-core-v3-swagger-annotations = "io.swagger.core.v3:swagger-annotations:2.2.38"
io-swagger-parser-v3-swagger-parser = "io.swagger.parser.v3:swagger-parser:2.1.34"
loki-logback-appender = "com.github.loki4j:loki-logback-appender:2.0.0"
org-apache-activemq-artemis-jakarta-server = "org.apache.activemq:artemis-jakarta-server:2.42.0"
io-swagger-core-v3-swagger-annotations = "io.swagger.core.v3:swagger-annotations:2.2.40"
io-swagger-parser-v3-swagger-parser = "io.swagger.parser.v3:swagger-parser:2.1.35"
loki-logback-appender = "com.github.loki4j:loki-logback-appender:1.6.0"
org-apache-activemq-artemis-jakarta-server = "org.apache.activemq:artemis-jakarta-server:2.44.0"
org-apache-poi-poi-ooxml = "org.apache.poi:poi-ooxml:5.4.1"
org-eclipse-jgit-org-eclipse-jgit = "org.eclipse.jgit:org.eclipse.jgit:7.3.0.202506031305-r"
org-eclipse-jgit-org-eclipse-jgit = "org.eclipse.jgit:org.eclipse.jgit:7.4.0.202509020913-r"
org-graalvm-polyglot-java = { module = "org.graalvm.polyglot:java", version.ref = "graalvm" }
org-graalvm-polyglot-js = { module = "org.graalvm.polyglot:js", version.ref = "graalvm" }
org-graalvm-polyglot-polyglot = { module = "org.graalvm.polyglot:polyglot", version.ref = "graalvm" }
org-graalvm-polyglot-python = { module = "org.graalvm.polyglot:python", version.ref = "graalvm" }
org-graalvm-polyglot-ruby = { module = "org.graalvm.polyglot:ruby", version.ref = "graalvm" }
org-graalvm-polyglot-ruby = "org.graalvm.polyglot:ruby:25.0.0"
org-json = "org.json:json:20250517"
org-mapstruct = { module = "org.mapstruct:mapstruct", version.ref = "org-mapstruct" }
org-mapstruct-extensions-spring-mapstruct-spring-annotations = { module = "org.mapstruct.extensions.spring:mapstruct-spring-annotations", version.ref = "org-mapstruct-extensions-spring" }
org-mapstruct-extensions-spring-mapstruct-spring-extensions = { module = "org.mapstruct.extensions.spring:mapstruct-spring-extensions", version.ref = "org-mapstruct-extensions-spring" }
org-mapstruct-mapstruct-processor = { module = "org.mapstruct:mapstruct-processor", version.ref = "org-mapstruct" }
org-openapitools-jackson-databind-nullable = "org.openapitools:jackson-databind-nullable:0.2.7"
org-openapitools-jackson-databind-nullable = "org.openapitools:jackson-databind-nullable:0.2.8"
org-springdoc-springdoc-openapi-starter-common = { module = "org.springdoc:springdoc-openapi-starter-common", version.ref = "org-springdoc" }
org-springdoc-springdoc-openapi-starter-webmvc-ui = { module = "org.springdoc:springdoc-openapi-starter-webmvc-ui", version.ref = "org-springdoc" }
org-wiremock-wiremock = "org.wiremock:wiremock-standalone:3.13.1"
Expand All @@ -51,8 +51,8 @@ com-github-ben-manes-versions = "com.github.ben-manes.versions:0.53.0"
gradle-git-properties = "com.gorylenko.gradle-git-properties:2.5.2"
jib = "com.google.cloud.tools.jib:3.4.5"
nl-littlerobots-version-catalog-update = "nl.littlerobots.version-catalog-update:1.0.1"
org-graalvm-buildtools-native = "org.graalvm.buildtools.native:0.11.1"
org-openapi-generator = "org.openapi.generator:7.16.0"
org-sonarqube = "org.sonarqube:7.0.0.6105"
spotbugs = "com.github.spotbugs:6.4.3"
org-graalvm-buildtools-native = "org.graalvm.buildtools.native:0.11.3"
org-openapi-generator = "org.openapi.generator:7.17.0"
org-sonarqube = "org.sonarqube:7.0.1.6134"
spotbugs = "com.github.spotbugs:6.4.5"
spotless = "com.diffplug.gradle.spotless:8.0.0"
2 changes: 1 addition & 1 deletion server/ee/apps/runtime-job-app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies {
implementation(project(":server:libs:core:evaluator:evaluator-impl"))
implementation(project(":server:libs:core:file-storage:file-storage-base64-service"))
implementation(project(":server:libs:core:file-storage:file-storage-filesystem-service"))
implementation(project(":server:libs:core:message:message-broker:message-broker-sync"))
implementation(project(":server:libs:core:message:message-broker:message-broker-memory"))
implementation(project(":server:libs:core:message:message-event:message-event-impl"))
implementation(project(":server:libs:platform:platform-component:platform-component-api"))
implementation(project(":server:libs:platform:platform-component:platform-component-context:platform-component-context-service"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

package com.bytechef.runtime.job.platform.connection;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -19,22 +19,16 @@
public class ConnectionContext {

private static final AtomicLong ATOMIC_CONNECTION_ID = new AtomicLong(1);

private static final ThreadLocal<Map<Long, Connection>> CURRENT_CONNECTION_MAP = ThreadLocal.withInitial(
HashMap::new);
private static final Map<Long, Connection> CONNECTION_MAP = new ConcurrentHashMap<>();

public static Map<String, ?> getConnectionParameters(long id) {
Map<Long, Connection> parameterMap = CURRENT_CONNECTION_MAP.get();

return parameterMap.get(id).parameters;
return CONNECTION_MAP.get(id).parameters;
}

public static long putConnectionParameters(String name, Map<String, ?> parameters) {
Map<Long, Connection> parameterMap = CURRENT_CONNECTION_MAP.get();

long connectionId = -1;

for (Map.Entry<Long, Connection> entry : parameterMap.entrySet()) {
for (Map.Entry<Long, Connection> entry : CONNECTION_MAP.entrySet()) {
Connection connection = entry.getValue();

if (connection.name.equals(name)) {
Expand All @@ -47,9 +41,7 @@ public static long putConnectionParameters(String name, Map<String, ?> parameter
if (connectionId == -1) {
connectionId = ATOMIC_CONNECTION_ID.getAndIncrement();

parameterMap.putIfAbsent(connectionId, new Connection(name, parameters));

CURRENT_CONNECTION_MAP.set(parameterMap);
CONNECTION_MAP.putIfAbsent(connectionId, new Connection(name, parameters));
}

return connectionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ bytechef:
host:
port: 25
message-broker:
# Messaging provider between Coordinator and Workers (local) default: local
provider: local
# Messaging provider between Coordinator and Workers (memory) default: memory
provider: memory
# When the worker is enabled, subscribe to the default "default" queue with 10 concurrent consumers.
# You may also route workflow tasks to other arbitrarily named task queues by specifying the "node"
# property on any given task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ public AiCopilotConfiguration(ApplicationProperties applicationProperties) {

this.model = options.getModel();
this.openAiApiKey = openAi.getApiKey();
this.pgVector = copilot.getVectorstore()
.getPgVector();

ApplicationProperties.Ai.Copilot.Vectorstore vectorstore = copilot.getVectorstore();

this.pgVector = vectorstore.getPgVector();
this.temperature = options.getTemperature();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public void deleteJob(long id) {
throw new UnsupportedOperationException();
}

@Override
public Optional<Job> fetchJob(Long id) {
throw new UnsupportedOperationException();
}

@Override
public Optional<Job> fetchLastJob() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dependencies {
testImplementation(project(":server:libs:atlas:atlas-execution:atlas-execution-service"))
testImplementation(project(":server:libs:atlas:atlas-file-storage:atlas-file-storage-impl"))
testImplementation(project(":server:libs:atlas:atlas-worker:atlas-worker-impl"))
testImplementation(project(":server:libs:core:message:message-broker:message-broker-sync"))
testImplementation(project(":server:libs:core:message:message-broker:message-broker-memory"))
testImplementation(project(":server:libs:core:commons:commons-data"))
testImplementation(project(":server:libs:core:evaluator:evaluator-impl"))
testImplementation(project(":server:libs:core:file-storage:file-storage-base64-service"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.bytechef.file.storage.base64.service.Base64FileStorageService;
import com.bytechef.jackson.config.JacksonConfiguration;
import com.bytechef.liquibase.config.LiquibaseConfiguration;
import com.bytechef.message.broker.memory.SyncMessageBroker;
import com.bytechef.platform.coordinator.job.JobSyncExecutor;
import com.bytechef.test.config.jdbc.AbstractIntTestJdbcConfiguration;
import com.bytechef.test.config.testcontainers.PostgreSQLContainerConfiguration;
Expand All @@ -62,7 +63,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -75,7 +75,6 @@
import org.springframework.core.env.Environment;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.data.jdbc.repository.config.EnableJdbcRepositories;
import org.springframework.lang.Nullable;

/**
* @author Arik Cohen
Expand All @@ -96,23 +95,18 @@ public class TaskCoordinatorIntTest {
private final Evaluator evaluator = SpelEvaluator.create();

@Autowired
@Nullable
private Environment environment;

@Autowired
@Nullable
private ContextService contextService;

@Autowired
@Nullable
private JobService jobService;

@Autowired
@Nullable
private TaskExecutionService taskExecutionService;

@Autowired
@Nullable
private WorkflowService workflowService;

@Test
Expand All @@ -135,9 +129,8 @@ private Job executeWorkflow(String workflowId) {
taskHandlerMap.put("randomHelper/v1/randomInt", taskExecution -> null);

JobSyncExecutor jobSyncExecutor = new JobSyncExecutor(
Objects.requireNonNull(contextService), environment, evaluator, Objects.requireNonNull(jobService),
List.of(), Objects.requireNonNull(taskExecutionService), taskHandlerMap::get, TASK_FILE_STORAGE,
Objects.requireNonNull(workflowService));
contextService, environment, evaluator, jobService, -1, SyncMessageBroker::new, List.of(),
taskExecutionService, taskHandlerMap::get, TASK_FILE_STORAGE, -1, workflowService);

return jobSyncExecutor.execute(new JobParametersDTO(workflowId, Collections.singletonMap("yourName", "me")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public interface JobService {

void deleteJob(long id);

Optional<Job> fetchJob(Long id);

Optional<Job> fetchLastJob();

Optional<Job> fetchLastWorkflowJob(String workflowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ public int countRunningJobs() {

@Override
public void deleteById(Long id) {
throw new UnsupportedOperationException();
Cache cache = Objects.requireNonNull(cacheManager.getCache(CACHE));

cache.evict(TenantCacheKeyUtils.getKey(id));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public void deleteJob(long id) {
jobRepository.deleteById(id);
}

@Override
public Optional<Job> fetchJob(Long id) {
return jobRepository.findById(id);
}

@Override
@Transactional(readOnly = true)
public Optional<Job> fetchLastJob() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ dependencies {
testImplementation(project(":server:libs:atlas:atlas-file-storage:atlas-file-storage-impl"))
testImplementation(project(":server:libs:core:evaluator:evaluator-impl"))
testImplementation(project(":server:libs:core:file-storage:file-storage-base64-service"))
testImplementation(project(":server:libs:core:message:message-broker:message-broker-sync"))
testImplementation(project(":server:libs:core:message:message-broker:message-broker-memory"))
testImplementation(project(":server:libs:test:test-support"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import com.bytechef.evaluator.Evaluator;
import com.bytechef.evaluator.SpelEvaluator;
import com.bytechef.file.storage.base64.service.Base64FileStorageService;
import com.bytechef.message.broker.sync.SyncMessageBroker;
import com.bytechef.message.broker.memory.SyncMessageBroker;
import com.bytechef.message.event.MessageEvent;
import com.bytechef.test.extension.ObjectMapperSetupExtension;
import java.io.File;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ public SharedTemplateServiceImpl(SharedTemplateRepository sharedTemplateReposito
@Override
@Transactional(readOnly = true)
public Optional<SharedTemplate> fetchSharedTemplate(UUID uuid) {
return TenantUtils.callWithTenantId(TenantContext.DEFAULT_TENANT_ID,
() -> sharedTemplateRepository.findByUuid(uuid));
return TenantUtils.callWithTenantId(
TenantContext.DEFAULT_TENANT_ID, () -> sharedTemplateRepository.findByUuid(uuid));
}

@Override
@Transactional(readOnly = true)
public SharedTemplate getSharedTemplate(UUID uuid) {
return TenantUtils.callWithTenantId(TenantContext.DEFAULT_TENANT_ID,
return TenantUtils.callWithTenantId(
TenantContext.DEFAULT_TENANT_ID,
() -> sharedTemplateRepository.findByUuid(uuid)
.orElseThrow(() -> new IllegalArgumentException("Shared template not found for uuid: " + uuid)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1756,7 +1756,7 @@ public void setRequired(boolean required) {
public static class MessageBroker {

public enum Provider {
AMQP, AWS, JMS, KAFKA, LOCAL, REDIS
AMQP, AWS, JMS, KAFKA, MEMORY, REDIS
}

private Provider provider = Provider.JMS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ public static <T, R> List<R> map(List<T> list, Function<? super T, R> mapper) {
Validate.notNull(list, "'list' must not be null");
Validate.notNull(mapper, "'mapper' must not be null");

return list.stream()
return new ArrayList<>(list)
.stream()
.map(mapper)
.toList();
}
Expand All @@ -223,7 +224,8 @@ public static <R, T> List<R> map(Set<T> set, Function<? super T, R> mapper) {
Validate.notNull(set, "'set' must not be null");
Validate.notNull(mapper, "'mapper' must not be null");

return set.stream()
return new ArrayList<>(set)
.stream()
.map(mapper)
.toList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@
@Target({
ElementType.TYPE, ElementType.METHOD
})
@ConditionalOnProperty(prefix = "bytechef", name = "message-broker.provider", havingValue = "local")
public @interface ConditionalOnMessageBrokerLocal {
@ConditionalOnProperty(prefix = "bytechef", name = "message-broker.provider", havingValue = "memory")
public @interface ConditionalOnMessageBrokerMemory {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2025 ByteChef
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytechef.message.broker.memory;

import com.bytechef.message.route.MessageRoute;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Abstract base implementation of the {@link MessageBroker} interface. This class provides a framework for managing
* receivers and routing messages to them based on specified routes. Concrete subclasses need to implement the
* {@code send} method to define the specific behavior for message delivery.
*
* @author Ivica Cardic
*/
public abstract class AbstractMessageBroker implements MemoryMessageBroker {

protected final Map<MessageRoute, List<Receiver>> receiverMap = new HashMap<>();

@Override
public void receive(MessageRoute messageRoute, Receiver receiver) {
List<Receiver> receivers = receiverMap.computeIfAbsent(messageRoute, k -> new ArrayList<>());

receivers.add(receiver);
}
}
Loading
Loading