Skip to content

Commit 64676c2

Browse files
committed
3258 - Fix OutOfMemoryError in the workflow editor when iterating with loop over large number of items
1 parent eafb3ab commit 64676c2

File tree

33 files changed

+456
-172
lines changed

33 files changed

+456
-172
lines changed

server/ee/apps/runtime-job-app/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ dependencies {
3636
implementation(project(":server:libs:core:evaluator:evaluator-impl"))
3737
implementation(project(":server:libs:core:file-storage:file-storage-base64-service"))
3838
implementation(project(":server:libs:core:file-storage:file-storage-filesystem-service"))
39-
implementation(project(":server:libs:core:message:message-broker:message-broker-sync"))
39+
implementation(project(":server:libs:core:message:message-broker:message-broker-memory"))
4040
implementation(project(":server:libs:core:message:message-event:message-event-impl"))
4141
implementation(project(":server:libs:platform:platform-component:platform-component-api"))
4242
implementation(project(":server:libs:platform:platform-component:platform-component-context:platform-component-context-service"))

server/ee/apps/runtime-job-app/src/main/java/com/bytechef/runtime/job/platform/connection/ConnectionContext.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77

88
package com.bytechef.runtime.job.platform.connection;
99

10-
import java.util.HashMap;
1110
import java.util.Map;
11+
import java.util.concurrent.ConcurrentHashMap;
1212
import java.util.concurrent.atomic.AtomicLong;
1313

1414
/**
@@ -19,22 +19,16 @@
1919
public class ConnectionContext {
2020

2121
private static final AtomicLong ATOMIC_CONNECTION_ID = new AtomicLong(1);
22-
23-
private static final ThreadLocal<Map<Long, Connection>> CURRENT_CONNECTION_MAP = ThreadLocal.withInitial(
24-
HashMap::new);
22+
private static final Map<Long, Connection> CONNECTION_MAP = new ConcurrentHashMap<>();
2523

2624
public static Map<String, ?> getConnectionParameters(long id) {
27-
Map<Long, Connection> parameterMap = CURRENT_CONNECTION_MAP.get();
28-
29-
return parameterMap.get(id).parameters;
25+
return CONNECTION_MAP.get(id).parameters;
3026
}
3127

3228
public static long putConnectionParameters(String name, Map<String, ?> parameters) {
33-
Map<Long, Connection> parameterMap = CURRENT_CONNECTION_MAP.get();
34-
3529
long connectionId = -1;
3630

37-
for (Map.Entry<Long, Connection> entry : parameterMap.entrySet()) {
31+
for (Map.Entry<Long, Connection> entry : CONNECTION_MAP.entrySet()) {
3832
Connection connection = entry.getValue();
3933

4034
if (connection.name.equals(name)) {
@@ -47,9 +41,7 @@ public static long putConnectionParameters(String name, Map<String, ?> parameter
4741
if (connectionId == -1) {
4842
connectionId = ATOMIC_CONNECTION_ID.getAndIncrement();
4943

50-
parameterMap.putIfAbsent(connectionId, new Connection(name, parameters));
51-
52-
CURRENT_CONNECTION_MAP.set(parameterMap);
44+
CONNECTION_MAP.putIfAbsent(connectionId, new Connection(name, parameters));
5345
}
5446

5547
return connectionId;

server/ee/libs/embedded/embedded-configuration/embedded-configuration-service/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ dependencies {
2424
testImplementation(project(":server:libs:core:evaluator:evaluator-impl"))
2525
testImplementation(project(":server:libs:platform:platform-category:platform-category-service"))
2626
testImplementation(project(":server:libs:platform:platform-configuration:platform-configuration-service"))
27+
testImplementation(project(":server:libs:platform:platform-security:platform-security-service"))
2728
testImplementation(project(":server:libs:platform:platform-tag:platform-tag-service"))
2829
testImplementation(project(":server:libs:test:test-int-support"))
2930
}

server/libs/atlas/atlas-coordinator/atlas-coordinator-impl/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ dependencies {
2020
testImplementation(project(":server:libs:atlas:atlas-execution:atlas-execution-service"))
2121
testImplementation(project(":server:libs:atlas:atlas-file-storage:atlas-file-storage-impl"))
2222
testImplementation(project(":server:libs:atlas:atlas-worker:atlas-worker-impl"))
23-
testImplementation(project(":server:libs:core:message:message-broker:message-broker-sync"))
23+
testImplementation(project(":server:libs:core:message:message-broker:message-broker-memory"))
2424
testImplementation(project(":server:libs:core:commons:commons-data"))
2525
testImplementation(project(":server:libs:core:evaluator:evaluator-impl"))
2626
testImplementation(project(":server:libs:core:file-storage:file-storage-base64-service"))

server/libs/atlas/atlas-coordinator/atlas-coordinator-impl/src/test/java/com/bytechef/atlas/coordinator/TaskCoordinatorIntTest.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import com.bytechef.file.storage.base64.service.Base64FileStorageService;
5353
import com.bytechef.jackson.config.JacksonConfiguration;
5454
import com.bytechef.liquibase.config.LiquibaseConfiguration;
55+
import com.bytechef.message.broker.memory.SyncMessageBroker;
5556
import com.bytechef.platform.coordinator.job.JobSyncExecutor;
5657
import com.bytechef.test.config.jdbc.AbstractIntTestJdbcConfiguration;
5758
import com.bytechef.test.config.testcontainers.PostgreSQLContainerConfiguration;
@@ -62,7 +63,6 @@
6263
import java.util.HashMap;
6364
import java.util.List;
6465
import java.util.Map;
65-
import java.util.Objects;
6666
import org.junit.jupiter.api.Assertions;
6767
import org.junit.jupiter.api.Test;
6868
import org.springframework.beans.factory.annotation.Autowired;
@@ -75,7 +75,6 @@
7575
import org.springframework.core.env.Environment;
7676
import org.springframework.core.io.support.ResourcePatternResolver;
7777
import org.springframework.data.jdbc.repository.config.EnableJdbcRepositories;
78-
import org.springframework.lang.Nullable;
7978

8079
/**
8180
* @author Arik Cohen
@@ -95,23 +94,18 @@ public class TaskCoordinatorIntTest {
9594
private final Evaluator evaluator = SpelEvaluator.create();
9695

9796
@Autowired
98-
@Nullable
9997
private Environment environment;
10098

10199
@Autowired
102-
@Nullable
103100
private ContextService contextService;
104101

105102
@Autowired
106-
@Nullable
107103
private JobService jobService;
108104

109105
@Autowired
110-
@Nullable
111106
private TaskExecutionService taskExecutionService;
112107

113108
@Autowired
114-
@Nullable
115109
private WorkflowService workflowService;
116110

117111
@Test
@@ -134,9 +128,8 @@ private Job executeWorkflow(String workflowId) {
134128
taskHandlerMap.put("randomHelper/v1/randomInt", taskExecution -> null);
135129

136130
JobSyncExecutor jobSyncExecutor = new JobSyncExecutor(
137-
Objects.requireNonNull(contextService), environment, evaluator, Objects.requireNonNull(jobService),
138-
List.of(), Objects.requireNonNull(taskExecutionService), taskHandlerMap::get, TASK_FILE_STORAGE,
139-
Objects.requireNonNull(workflowService));
131+
contextService, environment, evaluator, jobService, -1, SyncMessageBroker::new, List.of(),
132+
taskExecutionService, taskHandlerMap::get, TASK_FILE_STORAGE, -1, workflowService);
140133

141134
return jobSyncExecutor.execute(new JobParametersDTO(workflowId, Collections.singletonMap("yourName", "me")));
142135
}

server/libs/atlas/atlas-execution/atlas-execution-repository/atlas-execution-repository-memory/src/main/java/com/bytechef/atlas/execution/repository/memory/InMemoryJobRepository.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ public int countRunningJobs() {
7171

7272
@Override
7373
public void deleteById(Long id) {
74-
throw new UnsupportedOperationException();
74+
Cache cache = Objects.requireNonNull(cacheManager.getCache(CACHE));
75+
76+
cache.evict(TenantCacheKeyUtils.getKey(id));
7577
}
7678

7779
@Override

server/libs/atlas/atlas-worker/atlas-worker-impl/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@ dependencies {
1717
testImplementation(project(":server:libs:atlas:atlas-file-storage:atlas-file-storage-impl"))
1818
testImplementation(project(":server:libs:core:evaluator:evaluator-impl"))
1919
testImplementation(project(":server:libs:core:file-storage:file-storage-base64-service"))
20-
testImplementation(project(":server:libs:core:message:message-broker:message-broker-sync"))
20+
testImplementation(project(":server:libs:core:message:message-broker:message-broker-memory"))
2121
testImplementation(project(":server:libs:test:test-support"))
2222
}

server/libs/core/commons/commons-util/src/main/java/com/bytechef/commons/util/CollectionUtils.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,9 @@ public static <T, R> List<R> map(List<T> list, Function<? super T, R> mapper) {
214214
Validate.notNull(list, "'list' must not be null");
215215
Validate.notNull(mapper, "'mapper' must not be null");
216216

217-
return list.stream()
217+
// Snapshot to avoid ConcurrentModificationException when the source list is modified concurrently
218+
return new ArrayList<>(list)
219+
.stream()
218220
.map(mapper)
219221
.toList();
220222
}
@@ -223,7 +225,9 @@ public static <R, T> List<R> map(Set<T> set, Function<? super T, R> mapper) {
223225
Validate.notNull(set, "'set' must not be null");
224226
Validate.notNull(mapper, "'mapper' must not be null");
225227

226-
return set.stream()
228+
// Snapshot to avoid ConcurrentModificationException when the source set is modified concurrently
229+
return new ArrayList<>(set)
230+
.stream()
227231
.map(mapper)
228232
.toList();
229233
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2025 ByteChef
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytechef.message.broker.memory;
18+
19+
import com.bytechef.message.route.MessageRoute;
20+
import java.util.ArrayList;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
25+
/**
26+
* Abstract base implementation of the {@link MessageBroker} interface. This class provides a framework for managing
27+
* receivers and routing messages to them based on specified routes. Concrete subclasses need to implement the
28+
* {@code send} method to define the specific behavior for message delivery.
29+
*
30+
* @author Ivica Cardic
31+
*/
32+
public abstract class AbstractMessageBroker implements MemoryMessageBroker {
33+
34+
protected final Map<MessageRoute, List<SyncMessageBroker.Receiver>> receiverMap = new HashMap<>();
35+
36+
@Override
37+
public void receive(MessageRoute messageRoute, SyncMessageBroker.Receiver receiver) {
38+
List<SyncMessageBroker.Receiver> receivers = receiverMap.computeIfAbsent(messageRoute, k -> new ArrayList<>());
39+
40+
receivers.add(receiver);
41+
}
42+
}

0 commit comments

Comments
 (0)