From 35ac37196c0025d09cd573fe8d93ed864054c9a9 Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Mon, 10 Nov 2025 10:33:30 +0100 Subject: [PATCH 01/17] 515 - add initial implementation of error handling dispatcher --- .../on/error/OnErrorTaskDispatcher.java | 126 +++++++++++++++ ...nErrorTaskDispatcherDefinitionFactory.java | 55 +++++++ .../OnErrorTaskCompletionHandler.java | 150 ++++++++++++++++++ .../OnErrorTaskDispatcherConstants.java | 27 ++++ 4 files changed, 358 insertions(+) create mode 100644 server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcher.java create mode 100644 server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcherDefinitionFactory.java create mode 100644 server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/completition/OnErrorTaskCompletionHandler.java create mode 100644 server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/constants/OnErrorTaskDispatcherConstants.java diff --git a/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcher.java b/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcher.java new file mode 100644 index 0000000000..3a5a9bc6da --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcher.java @@ -0,0 +1,126 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytechef.task.dispatcher.on.error; + +import static com.bytechef.task.dispatcher.on.error.constants.OnErrorTaskDispatcherConstants.MAIN_BRANCH; +import static com.bytechef.task.dispatcher.on.error.constants.OnErrorTaskDispatcherConstants.ON_ERROR; +import static com.bytechef.task.dispatcher.on.error.constants.OnErrorTaskDispatcherConstants.ON_ERROR_BRANCH; + +import com.bytechef.atlas.configuration.domain.Task; +import com.bytechef.atlas.configuration.domain.WorkflowTask; +import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent; +import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcher; +import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherResolver; +import com.bytechef.atlas.execution.domain.Context; +import com.bytechef.atlas.execution.domain.TaskExecution; +import com.bytechef.atlas.execution.service.ContextService; +import com.bytechef.atlas.execution.service.TaskExecutionService; +import com.bytechef.atlas.file.storage.TaskFileStorage; +import com.bytechef.commons.util.MapUtils; +import com.bytechef.evaluator.Evaluator; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.commons.lang3.Validate; +import org.springframework.context.ApplicationEventPublisher; + +/** + * @author Matija Petanjek + */ +public class OnErrorTaskDispatcher implements TaskDispatcher, TaskDispatcherResolver { + + private final ContextService contextService; + private final Evaluator evaluator; + private final ApplicationEventPublisher eventPublisher; + private final TaskDispatcher taskDispatcher; + private final TaskExecutionService taskExecutionService; + private final TaskFileStorage taskFileStorage; + + public OnErrorTaskDispatcher( + ContextService contextService, Evaluator evaluator, ApplicationEventPublisher eventPublisher, + TaskDispatcher taskDispatcher, TaskExecutionService taskExecutionService, TaskFileStorage taskFileStorage) { + + this.contextService = contextService; + this.evaluator = evaluator; + this.eventPublisher = eventPublisher; + this.taskDispatcher = taskDispatcher; + this.taskExecutionService = taskExecutionService; + this.taskFileStorage = taskFileStorage; + } + + @Override + public void dispatch(TaskExecution taskExecution) { + taskExecution.setStartDate(Instant.now()); + taskExecution.setStatus(TaskExecution.Status.STARTED); + + taskExecution = taskExecutionService.update(taskExecution); + + List subWorkflowTasks; + + if (taskExecution.getError() == null) { + subWorkflowTasks = MapUtils.getList( + taskExecution.getParameters(), MAIN_BRANCH, WorkflowTask.class, Collections.emptyList()); + } else { + subWorkflowTasks = MapUtils.getList( + taskExecution.getParameters(), ON_ERROR_BRANCH, WorkflowTask.class, Collections.emptyList()); + } + + if (!subWorkflowTasks.isEmpty()) { + WorkflowTask subWorkflowTask = subWorkflowTasks.get(0); + + TaskExecution subTaskExecution = TaskExecution.builder() + .jobId(taskExecution.getJobId()) + .parentId(taskExecution.getId()) + .priority(taskExecution.getPriority()) + .taskNumber(1) + .workflowTask(subWorkflowTask) + .build(); + + Map context = taskFileStorage.readContextValue( + contextService.peek(Validate.notNull(taskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION)); + + subTaskExecution.evaluate(context, evaluator); + + subTaskExecution = taskExecutionService.create(subTaskExecution); + + contextService.push( + Validate.notNull(subTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, + taskFileStorage.storeContextValue( + Validate.notNull(subTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, context)); + + taskDispatcher.dispatch(subTaskExecution); + } else { + taskExecution.setStartDate(Instant.now()); + taskExecution.setEndDate(Instant.now()); + taskExecution.setExecutionTime(0); + + eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); + } + + } + + @Override + public TaskDispatcher resolve(Task task) { + if (Objects.equals(task.getType(), ON_ERROR + "/v1")) { + return this; + } + + return null; + } +} diff --git a/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcherDefinitionFactory.java b/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcherDefinitionFactory.java new file mode 100644 index 0000000000..e31582c752 --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcherDefinitionFactory.java @@ -0,0 +1,55 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytechef.task.dispatcher.on.error; + +import static com.bytechef.platform.workflow.task.dispatcher.definition.TaskDispatcherDsl.array; +import static com.bytechef.platform.workflow.task.dispatcher.definition.TaskDispatcherDsl.task; +import static com.bytechef.platform.workflow.task.dispatcher.definition.TaskDispatcherDsl.taskDispatcher; +import static com.bytechef.task.dispatcher.on.error.constants.OnErrorTaskDispatcherConstants.MAIN_BRANCH; +import static com.bytechef.task.dispatcher.on.error.constants.OnErrorTaskDispatcherConstants.ON_ERROR; +import static com.bytechef.task.dispatcher.on.error.constants.OnErrorTaskDispatcherConstants.ON_ERROR_BRANCH; + +import com.bytechef.platform.workflow.task.dispatcher.TaskDispatcherDefinitionFactory; +import com.bytechef.platform.workflow.task.dispatcher.definition.TaskDispatcherDefinition; +import org.springframework.stereotype.Component; + +/** + * @author Matija Petanjek + */ +@Component +public class OnErrorTaskDispatcherDefinitionFactory implements TaskDispatcherDefinitionFactory { + + private static final TaskDispatcherDefinition ON_ERROR_TASK_DISPATCHER_DEFINITION = + taskDispatcher(ON_ERROR) + .title("Error Handler") + .description("Triggers an error branch with an error object if an exception occurs in the main branch.") + .icon("path:assets/onError.svg") + .taskProperties( + array(MAIN_BRANCH) + .description( + "The list of tasks to execute that will trigger on error branch in case of exception.") + .items(task()), + array(ON_ERROR_BRANCH) + .description( + "The list of tasks to execute when exception occurs in the main branch.") + .items(task())); + + @Override + public TaskDispatcherDefinition getDefinition() { + return ON_ERROR_TASK_DISPATCHER_DEFINITION; + } +} diff --git a/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/completition/OnErrorTaskCompletionHandler.java b/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/completition/OnErrorTaskCompletionHandler.java new file mode 100644 index 0000000000..6c777b2d4e --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/completition/OnErrorTaskCompletionHandler.java @@ -0,0 +1,150 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytechef.task.dispatcher.on.error.completition; + +import static com.bytechef.task.dispatcher.on.error.constants.OnErrorTaskDispatcherConstants.MAIN_BRANCH; +import static com.bytechef.task.dispatcher.on.error.constants.OnErrorTaskDispatcherConstants.ON_ERROR; +import static com.bytechef.task.dispatcher.on.error.constants.OnErrorTaskDispatcherConstants.ON_ERROR_BRANCH; + +import com.bytechef.atlas.configuration.domain.Task; +import com.bytechef.atlas.configuration.domain.WorkflowTask; +import com.bytechef.atlas.coordinator.task.completion.TaskCompletionHandler; +import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcher; +import com.bytechef.atlas.execution.domain.Context; +import com.bytechef.atlas.execution.domain.TaskExecution; +import com.bytechef.atlas.execution.service.ContextService; +import com.bytechef.atlas.execution.service.TaskExecutionService; +import com.bytechef.atlas.file.storage.TaskFileStorage; +import com.bytechef.commons.util.MapUtils; +import com.bytechef.evaluator.Evaluator; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * @author Matija Petanjek + */ +public class OnErrorTaskCompletionHandler implements TaskCompletionHandler { + + private final ContextService contextService; + private final Evaluator evaluator; + private final TaskCompletionHandler taskCompletionHandler; + private final TaskDispatcher taskDispatcher; + private final TaskExecutionService taskExecutionService; + private final TaskFileStorage taskFileStorage; + + public OnErrorTaskCompletionHandler( + ContextService contextService, Evaluator evaluator, TaskCompletionHandler taskCompletionHandler, + TaskDispatcher taskDispatcher, TaskExecutionService taskExecutionService, + TaskFileStorage taskFileStorage) { + this.contextService = contextService; + this.evaluator = evaluator; + this.taskCompletionHandler = taskCompletionHandler; + this.taskDispatcher = taskDispatcher; + this.taskExecutionService = taskExecutionService; + this.taskFileStorage = taskFileStorage; + } + + @Override + public void handle(TaskExecution taskExecution) { + taskExecution.setStatus(TaskExecution.Status.COMPLETED); + + taskExecution = taskExecutionService.update(taskExecution); + + TaskExecution onErrorTaskExecution = taskExecutionService.getTaskExecution( + Objects.requireNonNull(taskExecution.getParentId())); + + long id = Objects.requireNonNull(onErrorTaskExecution.getId()); + + if (taskExecution.getName() != null) { + Map newContext = new HashMap<>( + taskFileStorage.readContextValue(contextService.peek(id, Context.Classname.TASK_EXECUTION))); + + if (taskExecution.getOutput() != null) { + newContext.put( + taskExecution.getName(), taskFileStorage.readTaskExecutionOutput(taskExecution.getOutput())); + } else { + newContext.put(taskExecution.getName(), null); + } + + contextService.push( + id, Context.Classname.TASK_EXECUTION, + taskFileStorage.storeContextValue(id, Context.Classname.TASK_EXECUTION, newContext)); + } + + List subWorkflowTasks; + + if (onErrorTaskExecution.getError() == null) { + subWorkflowTasks = MapUtils.getList( + onErrorTaskExecution.getParameters(), MAIN_BRANCH, WorkflowTask.class, Collections.emptyList()); + } else { + subWorkflowTasks = MapUtils.getList( + onErrorTaskExecution.getParameters(), ON_ERROR_BRANCH, WorkflowTask.class, Collections.emptyList()); + } + + if (taskExecution.getTaskNumber() < subWorkflowTasks.size()) { + WorkflowTask subWorkflowTask = subWorkflowTasks.get(taskExecution.getTaskNumber()); + + TaskExecution subTaskExecution = TaskExecution.builder() + .jobId(onErrorTaskExecution.getJobId()) + .parentId(onErrorTaskExecution.getId()) + .priority(onErrorTaskExecution.getPriority()) + .taskNumber(taskExecution.getTaskNumber() + 1) + .workflowTask(subWorkflowTask) + .build(); + + Map context = taskFileStorage.readContextValue( + contextService.peek(id, Context.Classname.TASK_EXECUTION)); + + subTaskExecution.evaluate(context, evaluator); + + subTaskExecution = taskExecutionService.create(subTaskExecution); + + long subTaskExecutionId = Objects.requireNonNull(subTaskExecution.getId()); + + contextService.push( + subTaskExecutionId, Context.Classname.TASK_EXECUTION, + taskFileStorage.storeContextValue(subTaskExecutionId, Context.Classname.TASK_EXECUTION, context)); + + taskDispatcher.dispatch(subTaskExecution); + } + // no more tasks to execute -- complete the condition + else { + onErrorTaskExecution.setEndDate(Instant.now()); + + taskCompletionHandler.handle(onErrorTaskExecution); + } + } + + @Override + public boolean canHandle(TaskExecution taskExecution) { + Long parentId = taskExecution.getParentId(); + + if (parentId != null) { + TaskExecution parentTaskExecution = taskExecutionService.getTaskExecution(parentId); + + String type = parentTaskExecution.getType(); + + return type.equals(ON_ERROR + "/v1"); + } + + return false; + } +} diff --git a/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/constants/OnErrorTaskDispatcherConstants.java b/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/constants/OnErrorTaskDispatcherConstants.java new file mode 100644 index 0000000000..78709bb207 --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/constants/OnErrorTaskDispatcherConstants.java @@ -0,0 +1,27 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytechef.task.dispatcher.on.error.constants; + +/** + * @author Matija Petanjek + */ +public class OnErrorTaskDispatcherConstants { + + public static final String MAIN_BRANCH = "main-branch"; + public static final String ON_ERROR = "on-error"; + public static final String ON_ERROR_BRANCH = "on-error-branch"; +} From f8301ade00c034a93181d4cbe89bfe2253df0d46 Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Mon, 10 Nov 2025 10:43:27 +0100 Subject: [PATCH 02/17] 515 - add error handling navigation. When error occurs inside onError dispatcher (and no previous error exists, meaning we are in the main-branch), navigate back to onError dispatcher to start executing error-branch --- .../listener/TaskExecutionErrorEventListener.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/server/libs/atlas/atlas-coordinator/atlas-coordinator-impl/src/main/java/com/bytechef/atlas/coordinator/event/listener/TaskExecutionErrorEventListener.java b/server/libs/atlas/atlas-coordinator/atlas-coordinator-impl/src/main/java/com/bytechef/atlas/coordinator/event/listener/TaskExecutionErrorEventListener.java index bf6c6f544c..43a0919b11 100644 --- a/server/libs/atlas/atlas-coordinator/atlas-coordinator-impl/src/main/java/com/bytechef/atlas/coordinator/event/listener/TaskExecutionErrorEventListener.java +++ b/server/libs/atlas/atlas-coordinator/atlas-coordinator-impl/src/main/java/com/bytechef/atlas/coordinator/event/listener/TaskExecutionErrorEventListener.java @@ -96,6 +96,20 @@ public void onErrorEvent(ErrorEvent errorEvent) { while (taskExecution.getParentId() != null) { // mark parent tasks as FAILED as well taskExecution = taskExecutionService.getTaskExecution(taskExecution.getParentId()); + // if it's an on-error dispatcher and its error is not set, navigate back to the on-error + // dispatcher to start executing the error branch + if (taskExecution.getType() + .startsWith("on-error/") && taskExecution.getError() == null) { + taskExecution.setError(error); + taskExecution.setStatus(TaskExecution.Status.CREATED); + + taskExecution = taskExecutionService.update(taskExecution); + + taskDispatcher.dispatch(taskExecution); + + return; + } + taskExecution.setEndDate(Instant.now()); taskExecution.setStatus(TaskExecution.Status.FAILED); From b5683c3b61882468b44592bf0b997f501d18c3a3 Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Mon, 10 Nov 2025 10:44:01 +0100 Subject: [PATCH 03/17] 515 - initialize onError dispatcher --- .../OnErrorTaskDispatcherConfiguration.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/config/OnErrorTaskDispatcherConfiguration.java diff --git a/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/config/OnErrorTaskDispatcherConfiguration.java b/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/config/OnErrorTaskDispatcherConfiguration.java new file mode 100644 index 0000000000..5d630ed6fc --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/config/OnErrorTaskDispatcherConfiguration.java @@ -0,0 +1,64 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytechef.task.dispatcher.on.error.config; + +import com.bytechef.atlas.coordinator.task.completion.TaskCompletionHandlerFactory; +import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherResolverFactory; +import com.bytechef.atlas.execution.service.ContextService; +import com.bytechef.atlas.execution.service.TaskExecutionService; +import com.bytechef.atlas.file.storage.TaskFileStorage; +import com.bytechef.evaluator.Evaluator; +import com.bytechef.task.dispatcher.on.error.OnErrorTaskDispatcher; +import com.bytechef.task.dispatcher.on.error.completition.OnErrorTaskCompletionHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Matija Petanjek + */ +@Configuration +public class OnErrorTaskDispatcherConfiguration { + + @Autowired + private Evaluator evaluator; + + @Autowired + private ApplicationEventPublisher eventPublisher; + + @Autowired + private ContextService contextService; + + @Autowired + private TaskExecutionService taskExecutionService; + + @Autowired + private TaskFileStorage taskFileStorage; + + @Bean("onErrorTaskCompletionHandlerFactory_v1") + TaskCompletionHandlerFactory onErrorTaskCompletionHandlerFactory() { + return (taskCompletionHandler, taskDispatcher) -> new OnErrorTaskCompletionHandler( + contextService, evaluator, taskCompletionHandler, taskDispatcher, taskExecutionService, taskFileStorage); + } + + @Bean("onErrorTaskDispatcherResolverFactory_v1") + TaskDispatcherResolverFactory onErrorTaskDispatcherResolverFactory() { + return (taskDispatcher) -> new OnErrorTaskDispatcher( + contextService, evaluator, eventPublisher, taskDispatcher, taskExecutionService, taskFileStorage); + } +} From 38408f5eeeda48ab8bbe2397a06345ba8d413d38 Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Mon, 10 Nov 2025 10:44:20 +0100 Subject: [PATCH 04/17] 515 - gradle --- server/apps/server-app/build.gradle.kts | 1 + server/libs/modules/task-dispatchers/on-error/build.gradle.kts | 1 + settings.gradle.kts | 1 + 3 files changed, 3 insertions(+) create mode 100644 server/libs/modules/task-dispatchers/on-error/build.gradle.kts diff --git a/server/apps/server-app/build.gradle.kts b/server/apps/server-app/build.gradle.kts index 6b5514e2c2..f11894763b 100644 --- a/server/apps/server-app/build.gradle.kts +++ b/server/apps/server-app/build.gradle.kts @@ -338,6 +338,7 @@ dependencies { implementation(project(":server:libs:modules:task-dispatchers:fork-join")) implementation(project(":server:libs:modules:task-dispatchers:loop")) implementation(project(":server:libs:modules:task-dispatchers:map")) + implementation(project(":server:libs:modules:task-dispatchers:on-error")) implementation(project(":server:libs:modules:task-dispatchers:parallel")) implementation(project(":server:libs:modules:task-dispatchers:subflow")) diff --git a/server/libs/modules/task-dispatchers/on-error/build.gradle.kts b/server/libs/modules/task-dispatchers/on-error/build.gradle.kts new file mode 100644 index 0000000000..190b578b89 --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/build.gradle.kts @@ -0,0 +1 @@ +version="1.0" diff --git a/settings.gradle.kts b/settings.gradle.kts index a80604ea05..5ae0387709 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -409,6 +409,7 @@ include("server:libs:modules:task-dispatchers:each") include("server:libs:modules:task-dispatchers:fork-join") include("server:libs:modules:task-dispatchers:loop") include("server:libs:modules:task-dispatchers:map") +include("server:libs:modules:task-dispatchers:on-error") include("server:libs:modules:task-dispatchers:parallel") include("server:libs:modules:task-dispatchers:subflow") From dc635f64e10fe37c072a4c63557742fafec088c8 Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Mon, 10 Nov 2025 10:44:35 +0100 Subject: [PATCH 05/17] 515 - svg icon --- .../on-error/src/main/resources/assets/onError.svg | 1 + 1 file changed, 1 insertion(+) create mode 100644 server/libs/modules/task-dispatchers/on-error/src/main/resources/assets/onError.svg diff --git a/server/libs/modules/task-dispatchers/on-error/src/main/resources/assets/onError.svg b/server/libs/modules/task-dispatchers/on-error/src/main/resources/assets/onError.svg new file mode 100644 index 0000000000..096e2bf9f9 --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/src/main/resources/assets/onError.svg @@ -0,0 +1 @@ + \ No newline at end of file From 62c7ad430a38d8d690d3af8c38b1e424124a7f56 Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Mon, 10 Nov 2025 10:45:52 +0100 Subject: [PATCH 06/17] 515 - definition factory unit test --- ...orTaskDispatcherDefinitionFactoryTest.java | 32 +++++++ .../resources/definition/on-error_v1.json | 83 +++++++++++++++++++ 2 files changed, 115 insertions(+) create mode 100644 server/libs/modules/task-dispatchers/on-error/src/test/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcherDefinitionFactoryTest.java create mode 100644 server/libs/modules/task-dispatchers/on-error/src/test/resources/definition/on-error_v1.json diff --git a/server/libs/modules/task-dispatchers/on-error/src/test/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcherDefinitionFactoryTest.java b/server/libs/modules/task-dispatchers/on-error/src/test/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcherDefinitionFactoryTest.java new file mode 100644 index 0000000000..df45d9905b --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/src/test/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcherDefinitionFactoryTest.java @@ -0,0 +1,32 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytechef.task.dispatcher.on.error; + +import com.bytechef.test.jsonasssert.JsonFileAssert; +import org.junit.jupiter.api.Test; + +/** + * @author Matija Petanjek + */ +public class OnErrorTaskDispatcherDefinitionFactoryTest { + + @Test + public void testGetTaskDispatcherDefinition() { + JsonFileAssert.assertEquals("definition/on-error_v1.json", + new OnErrorTaskDispatcherDefinitionFactory().getDefinition()); + } +} diff --git a/server/libs/modules/task-dispatchers/on-error/src/test/resources/definition/on-error_v1.json b/server/libs/modules/task-dispatchers/on-error/src/test/resources/definition/on-error_v1.json new file mode 100644 index 0000000000..762ae5d4d9 --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/src/test/resources/definition/on-error_v1.json @@ -0,0 +1,83 @@ +{ + "description": "Triggers an error branch with an error object if an exception occurs in the main branch.", + "help": null, + "icon": "path:assets/onError.svg", + "name": "on-error", + "outputDefinition": null, + "properties": null, + "resources": null, + "taskProperties": [ { + "advancedOption": null, + "controlType": "ARRAY_BUILDER", + "defaultValue": null, + "description": "The list of tasks to execute that will trigger on error branch in case of exception.", + "displayCondition": null, + "exampleValue": null, + "expressionEnabled": null, + "hidden": null, + "items": [ { + "advancedOption": null, + "controlType": null, + "defaultValue": null, + "description": null, + "displayCondition": null, + "exampleValue": null, + "expressionEnabled": null, + "hidden": null, + "label": null, + "metadata": { }, + "name": null, + "placeholder": null, + "required": null, + "type": "TASK" + } ], + "label": null, + "maxItems": null, + "metadata": { }, + "minItems": null, + "multipleValues": null, + "name": "main-branch", + "options": null, + "placeholder": null, + "required": null, + "type": "ARRAY" + }, { + "advancedOption": null, + "controlType": "ARRAY_BUILDER", + "defaultValue": null, + "description": "The list of tasks to execute when exception occurs in the main branch.", + "displayCondition": null, + "exampleValue": null, + "expressionEnabled": null, + "hidden": null, + "items": [ { + "advancedOption": null, + "controlType": null, + "defaultValue": null, + "description": null, + "displayCondition": null, + "exampleValue": null, + "expressionEnabled": null, + "hidden": null, + "label": null, + "metadata": { }, + "name": null, + "placeholder": null, + "required": null, + "type": "TASK" + } ], + "label": null, + "maxItems": null, + "metadata": { }, + "minItems": null, + "multipleValues": null, + "name": "on-error-branch", + "options": null, + "placeholder": null, + "required": null, + "type": "ARRAY" + } ], + "title": "Error Handler", + "variableProperties": null, + "version": 1 +} \ No newline at end of file From 1c9842111dd0cf0e8d00cb8aca21fd0cd272f636 Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Tue, 11 Nov 2025 16:00:11 +0100 Subject: [PATCH 07/17] 515 - rename (no bean qualifier for taskExecutionErrorHandler) --- .../atlas/coordinator/config/TaskCoordinatorConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/libs/atlas/atlas-coordinator/atlas-coordinator-config/src/main/java/com/bytechef/atlas/coordinator/config/TaskCoordinatorConfiguration.java b/server/libs/atlas/atlas-coordinator/atlas-coordinator-config/src/main/java/com/bytechef/atlas/coordinator/config/TaskCoordinatorConfiguration.java index 0875afb7e3..7f32e993e6 100644 --- a/server/libs/atlas/atlas-coordinator/atlas-coordinator-config/src/main/java/com/bytechef/atlas/coordinator/config/TaskCoordinatorConfiguration.java +++ b/server/libs/atlas/atlas-coordinator/atlas-coordinator-config/src/main/java/com/bytechef/atlas/coordinator/config/TaskCoordinatorConfiguration.java @@ -121,7 +121,7 @@ LogTaskApplicationEventListener logTaskApplicationEventListener() { } @Bean - TaskExecutionErrorEventListener taskExecutionErrorHandler() { + TaskExecutionErrorEventListener taskExecutionErrorEventListener() { return new TaskExecutionErrorEventListener(eventPublisher, jobService, taskDispatcher(), taskExecutionService); } From 72ab49d5c8195ab5139e916cd6c12be712c80c8e Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Tue, 11 Nov 2025 16:55:39 +0100 Subject: [PATCH 08/17] 515 - update error handling in "editor" executions and tests, let actual ErrorEventListener handle error instead of just logging them. --- .../coordinator/job/JobSyncExecutor.java | 36 +++++-------------- 1 file changed, 8 insertions(+), 28 deletions(-) diff --git a/server/libs/platform/platform-coordinator/src/main/java/com/bytechef/platform/coordinator/job/JobSyncExecutor.java b/server/libs/platform/platform-coordinator/src/main/java/com/bytechef/platform/coordinator/job/JobSyncExecutor.java index d3fdb5fe0f..b9880c4286 100644 --- a/server/libs/platform/platform-coordinator/src/main/java/com/bytechef/platform/coordinator/job/JobSyncExecutor.java +++ b/server/libs/platform/platform-coordinator/src/main/java/com/bytechef/platform/coordinator/job/JobSyncExecutor.java @@ -24,9 +24,9 @@ import com.bytechef.atlas.coordinator.event.ApplicationEvent; import com.bytechef.atlas.coordinator.event.ErrorEvent; import com.bytechef.atlas.coordinator.event.JobStatusApplicationEvent; +import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent; import com.bytechef.atlas.coordinator.event.StartJobEvent; import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent; -import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent; import com.bytechef.atlas.coordinator.event.listener.ApplicationEventListener; import com.bytechef.atlas.coordinator.event.listener.TaskExecutionErrorEventListener; import com.bytechef.atlas.coordinator.event.listener.TaskStartedApplicationEventListener; @@ -84,8 +84,6 @@ import java.util.function.Supplier; import java.util.stream.Stream; import org.apache.commons.lang3.Validate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.thread.Threading; import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.env.Environment; @@ -97,8 +95,6 @@ */ public class JobSyncExecutor { - private static final Logger logger = LoggerFactory.getLogger(JobSyncExecutor.class); - private static final List WEBHOOK_COMPONENTS = List.of("apiPlatform", "chat", "webhook"); private static final int NO_TIMEOUT = -1; private static final int UNLIMITED_TASK_EXECUTIONS = -1; @@ -152,29 +148,7 @@ public JobSyncExecutor( this.timeout = timeout; this.workflowService = workflowService; - TaskExecutionErrorEventListener taskExecutionErrorEventListener = new TaskExecutionErrorEventListener( - eventPublisher, jobService, null, taskExecutionService); - - receive( - memoryMessageBroker, TaskCoordinatorMessageRoute.ERROR_EVENTS, - event -> { - ErrorEvent errorEvent = (ErrorEvent) event; - - if (errorEvent instanceof TaskExecutionErrorEvent taskExecutionErrorEvent) { - TaskExecution taskExecution = taskExecutionErrorEvent.getTaskExecution(); - - Optional jobOptional = jobService.fetchJob( - Validate.notNull(taskExecution.getJobId(), "jobId")); - - if (jobOptional.isEmpty()) { - return; - } - } - - taskExecutionErrorEventListener.onErrorEvent(errorEvent); - }); - - receive(memoryMessageBroker, TaskCoordinatorMessageRoute.JOB_STOP_EVENTS, event -> {}); + syncMessageBroker.receive(TaskCoordinatorMessageRoute.JOB_STOP_EVENTS, event -> {}); TaskHandlerResolverChain taskHandlerResolverChain = new TaskHandlerResolverChain(); @@ -225,6 +199,12 @@ public JobSyncExecutor( new DefaultTaskDispatcher( createEventPublisher(coordinatorMessageBroker), taskDispatcherPreSendProcessors)))); + TaskExecutionErrorEventListener taskExecutionErrorEventListener = new TaskExecutionErrorEventListener( + eventPublisher, jobService, taskDispatcherChain, taskExecutionService); + + syncMessageBroker.receive(TaskCoordinatorMessageRoute.ERROR_EVENTS, + event -> taskExecutionErrorEventListener.onErrorEvent((ErrorEvent) event)); + JobExecutor jobExecutor = new JobExecutor( contextService, evaluator, taskDispatcherChain, taskExecutionService, taskFileStorage, workflowService); From 1dcf01ce3c7060ce2e2566fcd95d71e845845081 Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Tue, 11 Nov 2025 16:08:27 +0100 Subject: [PATCH 09/17] 515 - add new dispatcher in test configuration --- .../workflow/test/config/TestExecutorConfiguration.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/src/main/java/com/bytechef/platform/workflow/test/config/TestExecutorConfiguration.java b/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/src/main/java/com/bytechef/platform/workflow/test/config/TestExecutorConfiguration.java index 63d4888344..3057f8f900 100644 --- a/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/src/main/java/com/bytechef/platform/workflow/test/config/TestExecutorConfiguration.java +++ b/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/src/main/java/com/bytechef/platform/workflow/test/config/TestExecutorConfiguration.java @@ -65,6 +65,8 @@ import com.bytechef.task.dispatcher.loop.completion.LoopTaskCompletionHandler; import com.bytechef.task.dispatcher.map.MapTaskDispatcher; import com.bytechef.task.dispatcher.map.completion.MapTaskCompletionHandler; +import com.bytechef.task.dispatcher.on.error.OnErrorTaskDispatcher; +import com.bytechef.task.dispatcher.on.error.completition.OnErrorTaskCompletionHandler; import com.bytechef.task.dispatcher.parallel.ParallelTaskDispatcher; import com.bytechef.task.dispatcher.parallel.completion.ParallelTaskCompletionHandler; import com.bytechef.tenant.TenantContext; @@ -148,6 +150,9 @@ private List getTaskCompletionHandlerFactories( (taskCompletionHandler, taskDispatcher) -> new MapTaskCompletionHandler( contextService, counterService, evaluator, taskDispatcher, taskCompletionHandler, taskExecutionService, taskFileStorage), + (taskCompletionHandler, taskDispatcher) -> new OnErrorTaskCompletionHandler( + contextService, evaluator, taskCompletionHandler, taskDispatcher, taskExecutionService, + taskFileStorage), (taskCompletionHandler, taskDispatcher) -> new ParallelTaskCompletionHandler( counterService, taskCompletionHandler, taskExecutionService)); } @@ -195,6 +200,8 @@ private List getTaskDispatcherResolverFactories( (taskDispatcher) -> new MapTaskDispatcher( contextService, counterService, evaluator, eventPublisher, taskDispatcher, taskExecutionService, taskFileStorage), + (taskDispatcher) -> new OnErrorTaskDispatcher( + contextService, evaluator, eventPublisher, taskDispatcher, taskExecutionService, taskFileStorage), (taskDispatcher) -> new ParallelTaskDispatcher( contextService, counterService, eventPublisher, taskDispatcher, taskExecutionService, taskFileStorage)); From 5e592b913350515b1d4ede85de0841670cbc7b47 Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Tue, 11 Nov 2025 16:08:42 +0100 Subject: [PATCH 10/17] 515 - test gradle files --- .../libs/modules/task-dispatchers/on-error/build.gradle.kts | 4 ++++ .../platform-workflow-test-service/build.gradle.kts | 1 + 2 files changed, 5 insertions(+) diff --git a/server/libs/modules/task-dispatchers/on-error/build.gradle.kts b/server/libs/modules/task-dispatchers/on-error/build.gradle.kts index 190b578b89..b2af24dbe6 100644 --- a/server/libs/modules/task-dispatchers/on-error/build.gradle.kts +++ b/server/libs/modules/task-dispatchers/on-error/build.gradle.kts @@ -1 +1,5 @@ version="1.0" + +dependencies { + testImplementation(project(":server:libs:modules:task-dispatchers:loop")) +} diff --git a/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/build.gradle.kts b/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/build.gradle.kts index 18c87e4b00..fc026d7615 100644 --- a/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/build.gradle.kts +++ b/server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-service/build.gradle.kts @@ -25,6 +25,7 @@ dependencies { implementation(project(":server:libs:modules:task-dispatchers:fork-join")) implementation(project(":server:libs:modules:task-dispatchers:loop")) implementation(project(":server:libs:modules:task-dispatchers:map")) + implementation(project(":server:libs:modules:task-dispatchers:on-error")) implementation(project(":server:libs:modules:task-dispatchers:parallel")) implementation(project(":server:libs:modules:task-dispatchers:subflow")) } From a743f3740a60a3aec6c83042a37c064c4ad52ec6 Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Tue, 11 Nov 2025 16:09:59 +0100 Subject: [PATCH 11/17] 515 - integration tests for on-error dispatcher --- .../error/OnErrorTaskDispatcherIntTest.java | 196 ++++++++++++++++++ ..._v1-exception-error-branch-in-loop_v1.yaml | 60 ++++++ .../on-error_v1-exception-error-branch.yaml | 48 +++++ ...r_v1-exception-main-branch-in-loop_v1.yaml | 44 ++++ .../on-error_v1-exception-main-branch.yaml | 36 ++++ .../workflows/on-error_v1-no-exception.yaml | 20 ++ 6 files changed, 404 insertions(+) create mode 100644 server/libs/modules/task-dispatchers/on-error/src/test/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcherIntTest.java create mode 100644 server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-error-branch-in-loop_v1.yaml create mode 100644 server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-error-branch.yaml create mode 100644 server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-main-branch-in-loop_v1.yaml create mode 100644 server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-main-branch.yaml create mode 100644 server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-no-exception.yaml diff --git a/server/libs/modules/task-dispatchers/on-error/src/test/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcherIntTest.java b/server/libs/modules/task-dispatchers/on-error/src/test/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcherIntTest.java new file mode 100644 index 0000000000..6ec6a87c3e --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/src/test/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcherIntTest.java @@ -0,0 +1,196 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytechef.task.dispatcher.on.error; + +import com.bytechef.atlas.coordinator.task.completion.TaskCompletionHandlerFactory; +import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherResolverFactory; +import com.bytechef.atlas.execution.service.ContextService; +import com.bytechef.atlas.execution.service.CounterService; +import com.bytechef.atlas.execution.service.TaskExecutionService; +import com.bytechef.atlas.file.storage.TaskFileStorage; +import com.bytechef.atlas.worker.exception.TaskExecutionException; +import com.bytechef.atlas.worker.task.handler.TaskHandler; +import com.bytechef.commons.util.EncodingUtils; +import com.bytechef.evaluator.SpelEvaluator; +import com.bytechef.exception.ExecutionException; +import com.bytechef.platform.workflow.task.dispatcher.test.annotation.TaskDispatcherIntTest; +import com.bytechef.platform.workflow.task.dispatcher.test.task.handler.TestVarTaskHandler; +import com.bytechef.platform.workflow.task.dispatcher.test.workflow.TaskDispatcherJobTestExecutor; +import com.bytechef.task.dispatcher.loop.LoopTaskDispatcher; +import com.bytechef.task.dispatcher.loop.completion.LoopTaskCompletionHandler; +import com.bytechef.task.dispatcher.on.error.completition.OnErrorTaskCompletionHandler; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; + +/** + * @author Matija Petanjek + */ +@TaskDispatcherIntTest +public class OnErrorTaskDispatcherIntTest { + + private TestVarTaskHandler testVarTaskHandler; + + @Autowired + protected ContextService contextService; + + @Autowired + protected TaskExecutionService taskExecutionService; + + @Autowired + private TaskDispatcherJobTestExecutor taskDispatcherJobTestExecutor; + + @Autowired + private TaskFileStorage taskFileStorage; + + @BeforeEach + void beforeEach() { + testVarTaskHandler = new TestVarTaskHandler<>(Map::put); + } + + @Test + public void testOnErrorTaskDispatcherWhenNoException() { + taskDispatcherJobTestExecutor.execute( + EncodingUtils.base64EncodeToString("on-error_v1-no-exception".getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap(), + this::getTaskCompletionHandlerFactories, + this::getTaskDispatcherResolverFactories, + this::getTaskHandlerMap); + + Assertions.assertEquals("main branch", testVarTaskHandler.get("mainBranchVar")); + Assertions.assertNull(testVarTaskHandler.get("errorBranchVar")); + Assertions.assertEquals("end", testVarTaskHandler.get("endVar")); + } + + @Test + public void testOnErrorTaskDispatcherWhenExceptionInMainBranch() { + taskDispatcherJobTestExecutor.execute( + EncodingUtils.base64EncodeToString("on-error_v1-exception-main-branch".getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap(), + this::getTaskCompletionHandlerFactories, + this::getTaskDispatcherResolverFactories, + this::getTaskHandlerMap); + + Assertions.assertEquals( + "main branch before exception", testVarTaskHandler.get("beforeExceptionMainBranchVar")); + Assertions.assertNull(testVarTaskHandler.get("afterExceptionMainBranchVar")); + Assertions.assertEquals("error branch", testVarTaskHandler.get("errorBranchVar")); + Assertions.assertEquals("end", testVarTaskHandler.get("endVar")); + } + + @Test + public void testOnErrorTaskDispatcherWhenExceptionInOnErrorBranch() { + ExecutionException executionException = Assertions.assertThrows( + ExecutionException.class, () -> taskDispatcherJobTestExecutor.execute( + EncodingUtils + .base64EncodeToString("on-error_v1-exception-error-branch".getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap(), + this::getTaskCompletionHandlerFactories, + this::getTaskDispatcherResolverFactories, + this::getTaskHandlerMap)); + + Assertions.assertEquals("test exception", executionException.getMessage()); + + Assertions.assertEquals("main branch", testVarTaskHandler.get("mainBranchVar")); + Assertions.assertEquals( + "before exception in error branch", testVarTaskHandler.get("beforeExceptionErrorBranchVar")); + Assertions.assertNull(testVarTaskHandler.get("afterExceptionErrorBranchVar")); + Assertions.assertNull(testVarTaskHandler.get("endVar")); + } + + @Test + public void testOnErrorTaskDispatcherInLoopTaskDispatcherWhenExceptionInMainBranch() { + taskDispatcherJobTestExecutor.execute( + EncodingUtils + .base64EncodeToString("on-error_v1-exception-main-branch-in-loop_v1".getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap(), + this::getTaskCompletionHandlerFactories, + this::getTaskDispatcherResolverFactories, + this::getTaskHandlerMap); + + Assertions.assertEquals( + "main branch before exception, iteration number 2", + testVarTaskHandler.get("beforeExceptionMainBranchVar")); + Assertions.assertNull(testVarTaskHandler.get("afterExceptionMainBranchVar")); + Assertions.assertEquals("error branch iteration number 2", testVarTaskHandler.get("errorBranchVar")); + Assertions.assertEquals("end", testVarTaskHandler.get("endVar")); + } + + @Test + public void testOnErrorTaskDispatcherInLoopTaskDispatcherWhenExceptionInErrorBranch() { + ExecutionException executionException = Assertions.assertThrows( + ExecutionException.class, () -> taskDispatcherJobTestExecutor.execute( + EncodingUtils.base64EncodeToString( + "on-error_v1-exception-error-branch-in-loop_v1".getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap(), + this::getTaskCompletionHandlerFactories, + this::getTaskDispatcherResolverFactories, + this::getTaskHandlerMap)); + + Assertions.assertEquals("test exception", executionException.getMessage()); + + Assertions.assertEquals( + "main branch before exception, iteration number 1", + testVarTaskHandler.get("beforeExceptionMainBranchVar")); + Assertions.assertNull(testVarTaskHandler.get("afterExceptionMainBranchVar")); + Assertions.assertEquals( + "error branch before exception, iteration number 1", + testVarTaskHandler.get("beforeExceptionErrorBranchVar")); + Assertions.assertNull(testVarTaskHandler.get("afterExceptionErrorBranchVar")); + Assertions.assertNull(testVarTaskHandler.get("endVar")); + } + + @SuppressWarnings("PMD") + private List getTaskCompletionHandlerFactories( + CounterService counterService, TaskExecutionService taskExecutionService) { + + return List.of( + (taskCompletionHandler, taskDispatcher) -> new LoopTaskCompletionHandler( + contextService, SpelEvaluator.create(), taskCompletionHandler, taskDispatcher, taskExecutionService, + taskFileStorage), + (taskCompletionHandler, taskDispatcher) -> new OnErrorTaskCompletionHandler( + contextService, SpelEvaluator.create(), taskCompletionHandler, taskDispatcher, taskExecutionService, + taskFileStorage)); + } + + @SuppressWarnings("PMD") + private List getTaskDispatcherResolverFactories( + ApplicationEventPublisher eventPublisher, ContextService contextService, + CounterService counterService, TaskExecutionService taskExecutionService) { + + return List.of((taskDispatcher) -> new LoopTaskDispatcher( + contextService, SpelEvaluator.create(), eventPublisher, taskDispatcher, taskExecutionService, + taskFileStorage), + (taskDispatcher) -> new OnErrorTaskDispatcher( + contextService, SpelEvaluator.create(), eventPublisher, taskDispatcher, taskExecutionService, + taskFileStorage)); + } + + private Map> getTaskHandlerMap() { + return Map.of( + "var/v1/set", testVarTaskHandler, + "httpClient/v1/get", taskExecution -> { + throw new TaskExecutionException("test exception"); + }); + } +} diff --git a/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-error-branch-in-loop_v1.yaml b/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-error-branch-in-loop_v1.yaml new file mode 100644 index 0000000000..e47c71d3ea --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-error-branch-in-loop_v1.yaml @@ -0,0 +1,60 @@ +--- +label: "On Error Task Dispatcher" +tasks: +- name: "loop_1" + type: "loop/v1" + parameters: + loopForever: false + items: + - 1 + - 2 + iteratee: + - name: "on_error_1" + type: "on-error/v1" + parameters: + main-branch: + - name: "beforeExceptionMainBranchVar" + type: "var/v1/set" + parameters: + value: "main branch before exception, iteration number ${loop_1.item}" + - name: "httpClient_error" + type: "httpClient/v1/get" + parameters: + allowUnauthorizedCerts: false + responseType: "JSON" + responseContentType: "application/octet-stream" + fullResponse: false + followAllRedirects: false + followRedirect: false + ignoreResponseCode: false + timeout: 1000 + uri: "https://localhost:8080" + - name: "afterExceptionMainBranchVar" + type: "var/v1/set" + parameters: + value: "main branch after exception, iteration number ${loop_1.item}" + on-error-branch: + - name: "beforeExceptionErrorBranchVar" + type: "var/v1/set" + parameters: + value: "error branch before exception, iteration number ${loop_1.item}" + - name: "httpClient_error" + type: "httpClient/v1/get" + parameters: + allowUnauthorizedCerts: false + responseType: "JSON" + responseContentType: "application/octet-stream" + fullResponse: false + followAllRedirects: false + followRedirect: false + ignoreResponseCode: false + timeout: 1000 + uri: "https://localhost:8080" + - name: "afterExceptionErrorBranchVar" + type: "var/v1/set" + parameters: + value: "error branch after exception, iteration number ${loop_1.item}" +- name: "endVar" + type: "var/v1/set" + parameters: + value: "end" diff --git a/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-error-branch.yaml b/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-error-branch.yaml new file mode 100644 index 0000000000..b9d1ac0d4f --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-error-branch.yaml @@ -0,0 +1,48 @@ +--- +label: "On Error Task Dispatcher" +tasks: +- name: "on_error_1" + type: "on-error/v1" + parameters: + main-branch: + - name: "mainBranchVar" + type: "var/v1/set" + parameters: + value: "main branch" + - name: "httpClient_error_1" + type: "httpClient/v1/get" + parameters: + allowUnauthorizedCerts: false + responseType: "JSON" + responseContentType: "application/octet-stream" + fullResponse: false + followAllRedirects: false + followRedirect: false + ignoreResponseCode: false + timeout: 1000 + uri: "https://localhost:8080" + on-error-branch: + - name: "beforeExceptionErrorBranchVar" + type: "var/v1/set" + parameters: + value: "before exception in error branch" + - name: "httpClient_error_2" + type: "httpClient/v1/get" + parameters: + allowUnauthorizedCerts: false + responseType: "JSON" + responseContentType: "application/octet-stream" + fullResponse: false + followAllRedirects: false + followRedirect: false + ignoreResponseCode: false + timeout: 1000 + uri: "https://localhost:8080" + - name: "afterExceptionErrorBranchVar" + type: "var/v1/set" + parameters: + value: "you should not see this" +- name: "endVar" + type: "var/v1/set" + parameters: + value: "end" diff --git a/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-main-branch-in-loop_v1.yaml b/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-main-branch-in-loop_v1.yaml new file mode 100644 index 0000000000..d3a335f216 --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-main-branch-in-loop_v1.yaml @@ -0,0 +1,44 @@ +--- +label: "On Error Task Dispatcher" +tasks: +- name: "loop_1" + type: "loop/v1" + parameters: + loopForever: false + items: + - 1 + - 2 + iteratee: + - name: "on_error_1" + type: "on-error/v1" + parameters: + main-branch: + - name: "beforeExceptionMainBranchVar" + type: "var/v1/set" + parameters: + value: "main branch before exception, iteration number ${loop_1.item}" + - name: "httpClient_error" + type: "httpClient/v1/get" + parameters: + allowUnauthorizedCerts: false + responseType: "JSON" + responseContentType: "application/octet-stream" + fullResponse: false + followAllRedirects: false + followRedirect: false + ignoreResponseCode: false + timeout: 1000 + uri: "https://localhost:8080" + - name: "afterExceptionMainBranchVar" + type: "var/v1/set" + parameters: + value: "main branch after exception, iteration number ${loop_1.item}" + on-error-branch: + - name: "errorBranchVar" + type: "var/v1/set" + parameters: + value: "error branch iteration number ${loop_1.item}" +- name: "endVar" + type: "var/v1/set" + parameters: + value: "end" diff --git a/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-main-branch.yaml b/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-main-branch.yaml new file mode 100644 index 0000000000..47e622ad1a --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-exception-main-branch.yaml @@ -0,0 +1,36 @@ +--- +label: "On Error Task Dispatcher" +tasks: +- name: "on_error_1" + type: "on-error/v1" + parameters: + main-branch: + - name: "beforeExceptionMainBranchVar" + type: "var/v1/set" + parameters: + value: "main branch before exception" + - name: "httpClient_error" + type: "httpClient/v1/get" + parameters: + allowUnauthorizedCerts: false + responseType: "JSON" + responseContentType: "application/octet-stream" + fullResponse: false + followAllRedirects: false + followRedirect: false + ignoreResponseCode: false + timeout: 1000 + uri: "https://localhost:8080" + - name: "afterExceptionMainBranchVar" + type: "var/v1/set" + parameters: + value: "main branch after exception" + on-error-branch: + - name: "errorBranchVar" + type: "var/v1/set" + parameters: + value: "error branch" +- name: "endVar" + type: "var/v1/set" + parameters: + value: "end" diff --git a/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-no-exception.yaml b/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-no-exception.yaml new file mode 100644 index 0000000000..e53e632f9e --- /dev/null +++ b/server/libs/modules/task-dispatchers/on-error/src/test/resources/workflows/on-error_v1-no-exception.yaml @@ -0,0 +1,20 @@ +--- +label: "On Error Task Dispatcher" +tasks: +- name: "on_error_1" + type: "on-error/v1" + parameters: + main-branch: + - name: "mainBranchVar" + type: "var/v1/set" + parameters: + value: "main branch" + error-branch: + - name: "errorBranchVar" + type: "var/v1/set" + parameters: + value: "error branch" +- name: "endVar" + type: "var/v1/set" + parameters: + value: "end" From 88bf8ea7d92118cc96af7f3ab995f719699518a9 Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Tue, 11 Nov 2025 16:46:23 +0100 Subject: [PATCH 12/17] 515 - use job status, now task executions can have failed status but the job is still completed --- .../WorkflowExecutionSheetAccordion.tsx | 2 +- .../workflow-editor/components/WorkflowExecutionsTestOutput.tsx | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client/src/pages/automation/workflow-executions/components/workflow-execution-sheet/WorkflowExecutionSheetAccordion.tsx b/client/src/pages/automation/workflow-executions/components/workflow-execution-sheet/WorkflowExecutionSheetAccordion.tsx index a8dc617d20..b49abc777c 100644 --- a/client/src/pages/automation/workflow-executions/components/workflow-execution-sheet/WorkflowExecutionSheetAccordion.tsx +++ b/client/src/pages/automation/workflow-executions/components/workflow-execution-sheet/WorkflowExecutionSheetAccordion.tsx @@ -10,7 +10,7 @@ const WorkflowExecutionSheetAccordion = ({job, triggerExecution}: {job: Job; tri const startTime = job?.startDate?.getTime(); const endTime = job?.endDate?.getTime(); - const taskExecutionsCompleted = job?.taskExecutions?.every((taskExecution) => taskExecution.status === 'COMPLETED'); + const taskExecutionsCompleted = job?.status === 'COMPLETED'; const triggerExecutionCompleted = !triggerExecution || triggerExecution?.status === 'COMPLETED'; let duration; diff --git a/client/src/pages/platform/workflow-editor/components/WorkflowExecutionsTestOutput.tsx b/client/src/pages/platform/workflow-editor/components/WorkflowExecutionsTestOutput.tsx index ebae1b9efc..ab0ae01521 100644 --- a/client/src/pages/platform/workflow-editor/components/WorkflowExecutionsTestOutput.tsx +++ b/client/src/pages/platform/workflow-editor/components/WorkflowExecutionsTestOutput.tsx @@ -19,7 +19,7 @@ const WorkflowExecutionsTestOutputHeader = ({ const startTime = job?.startDate?.getTime(); const endTime = job?.endDate?.getTime(); - const taskExecutionsCompleted = job?.taskExecutions?.every((taskExecution) => taskExecution.status === 'COMPLETED'); + const taskExecutionsCompleted = job?.status === 'COMPLETED'; const triggerExecutionCompleted = !triggerExecution || triggerExecution?.status === 'COMPLETED'; let duration = 0; From 041aff1cdfcf73019ad3a98fdd501e1617ecab18 Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Fri, 14 Nov 2025 15:06:33 +0100 Subject: [PATCH 13/17] 515 - spotBugs & spotlessJavaCheck --- .../error-api/src/main/java/com/bytechef/error/Errorable.java | 3 +++ .../task/dispatcher/on/error/OnErrorTaskDispatcher.java | 2 ++ .../on/error/completition/OnErrorTaskCompletionHandler.java | 2 ++ 3 files changed, 7 insertions(+) diff --git a/server/libs/core/error/error-api/src/main/java/com/bytechef/error/Errorable.java b/server/libs/core/error/error-api/src/main/java/com/bytechef/error/Errorable.java index 7555a7dc78..91ea5a186b 100644 --- a/server/libs/core/error/error-api/src/main/java/com/bytechef/error/Errorable.java +++ b/server/libs/core/error/error-api/src/main/java/com/bytechef/error/Errorable.java @@ -18,6 +18,8 @@ package com.bytechef.error; +import edu.umd.cs.findbugs.annotations.Nullable; + /** * An interface which marks an object as being able to provide {@link ExecutionError} status about itself. * @@ -27,5 +29,6 @@ public interface Errorable { /** Returns the error associated with the object. */ + @Nullable ExecutionError getError(); } diff --git a/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcher.java b/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcher.java index 3a5a9bc6da..44ec277c04 100644 --- a/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcher.java +++ b/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/OnErrorTaskDispatcher.java @@ -32,6 +32,7 @@ import com.bytechef.atlas.file.storage.TaskFileStorage; import com.bytechef.commons.util.MapUtils; import com.bytechef.evaluator.Evaluator; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Instant; import java.util.Collections; import java.util.List; @@ -52,6 +53,7 @@ public class OnErrorTaskDispatcher implements TaskDispatcher, Tas private final TaskExecutionService taskExecutionService; private final TaskFileStorage taskFileStorage; + @SuppressFBWarnings("EI") public OnErrorTaskDispatcher( ContextService contextService, Evaluator evaluator, ApplicationEventPublisher eventPublisher, TaskDispatcher taskDispatcher, TaskExecutionService taskExecutionService, TaskFileStorage taskFileStorage) { diff --git a/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/completition/OnErrorTaskCompletionHandler.java b/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/completition/OnErrorTaskCompletionHandler.java index 6c777b2d4e..c40adc4a7f 100644 --- a/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/completition/OnErrorTaskCompletionHandler.java +++ b/server/libs/modules/task-dispatchers/on-error/src/main/java/com/bytechef/task/dispatcher/on/error/completition/OnErrorTaskCompletionHandler.java @@ -31,6 +31,7 @@ import com.bytechef.atlas.file.storage.TaskFileStorage; import com.bytechef.commons.util.MapUtils; import com.bytechef.evaluator.Evaluator; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Instant; import java.util.Collections; import java.util.HashMap; @@ -50,6 +51,7 @@ public class OnErrorTaskCompletionHandler implements TaskCompletionHandler { private final TaskExecutionService taskExecutionService; private final TaskFileStorage taskFileStorage; + @SuppressFBWarnings("EI") public OnErrorTaskCompletionHandler( ContextService contextService, Evaluator evaluator, TaskCompletionHandler taskCompletionHandler, TaskDispatcher taskDispatcher, TaskExecutionService taskExecutionService, From 2d12d97ebdbaa829fbd13cacc354369e558ac7dd Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Fri, 14 Nov 2025 12:14:04 +0100 Subject: [PATCH 14/17] 515 - missing name parameter causing endless loop in the test workflow execution --- .../loop/src/test/resources/workflows/loop_v1_6.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/libs/modules/task-dispatchers/loop/src/test/resources/workflows/loop_v1_6.yaml b/server/libs/modules/task-dispatchers/loop/src/test/resources/workflows/loop_v1_6.yaml index 5dc06c493b..2c44684e6e 100644 --- a/server/libs/modules/task-dispatchers/loop/src/test/resources/workflows/loop_v1_6.yaml +++ b/server/libs/modules/task-dispatchers/loop/src/test/resources/workflows/loop_v1_6.yaml @@ -20,7 +20,7 @@ tasks: value2: 5 combineOperation: "ANY" caseTrue: - - name: "loopBreak" + - name: "loopBreak_1" type: "loopBreak/v1" caseFalse: - name: "sumVar2" From 64c75229dcbfa20a11043927f5ea02ab5ce1ed29 Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Mon, 17 Nov 2025 12:34:17 +0100 Subject: [PATCH 15/17] 515 - after conflict resolution --- .../platform/coordinator/job/JobSyncExecutor.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/server/libs/platform/platform-coordinator/src/main/java/com/bytechef/platform/coordinator/job/JobSyncExecutor.java b/server/libs/platform/platform-coordinator/src/main/java/com/bytechef/platform/coordinator/job/JobSyncExecutor.java index b9880c4286..f002b45aa3 100644 --- a/server/libs/platform/platform-coordinator/src/main/java/com/bytechef/platform/coordinator/job/JobSyncExecutor.java +++ b/server/libs/platform/platform-coordinator/src/main/java/com/bytechef/platform/coordinator/job/JobSyncExecutor.java @@ -24,9 +24,9 @@ import com.bytechef.atlas.coordinator.event.ApplicationEvent; import com.bytechef.atlas.coordinator.event.ErrorEvent; import com.bytechef.atlas.coordinator.event.JobStatusApplicationEvent; -import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent; import com.bytechef.atlas.coordinator.event.StartJobEvent; import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent; +import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent; import com.bytechef.atlas.coordinator.event.listener.ApplicationEventListener; import com.bytechef.atlas.coordinator.event.listener.TaskExecutionErrorEventListener; import com.bytechef.atlas.coordinator.event.listener.TaskStartedApplicationEventListener; @@ -84,6 +84,8 @@ import java.util.function.Supplier; import java.util.stream.Stream; import org.apache.commons.lang3.Validate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.thread.Threading; import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.env.Environment; @@ -95,6 +97,8 @@ */ public class JobSyncExecutor { + private static final Logger logger = LoggerFactory.getLogger(JobSyncExecutor.class); + private static final List WEBHOOK_COMPONENTS = List.of("apiPlatform", "chat", "webhook"); private static final int NO_TIMEOUT = -1; private static final int UNLIMITED_TASK_EXECUTIONS = -1; @@ -148,7 +152,7 @@ public JobSyncExecutor( this.timeout = timeout; this.workflowService = workflowService; - syncMessageBroker.receive(TaskCoordinatorMessageRoute.JOB_STOP_EVENTS, event -> {}); + receive(memoryMessageBroker, TaskCoordinatorMessageRoute.JOB_STOP_EVENTS, event -> {}); TaskHandlerResolverChain taskHandlerResolverChain = new TaskHandlerResolverChain(); @@ -202,7 +206,7 @@ public JobSyncExecutor( TaskExecutionErrorEventListener taskExecutionErrorEventListener = new TaskExecutionErrorEventListener( eventPublisher, jobService, taskDispatcherChain, taskExecutionService); - syncMessageBroker.receive(TaskCoordinatorMessageRoute.ERROR_EVENTS, + receive(memoryMessageBroker, TaskCoordinatorMessageRoute.ERROR_EVENTS, event -> taskExecutionErrorEventListener.onErrorEvent((ErrorEvent) event)); JobExecutor jobExecutor = new JobExecutor( From 874029f7443edc5b815a9ac191aae341134e98ae Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Mon, 17 Nov 2025 15:58:34 +0100 Subject: [PATCH 16/17] 2465 - extract duplicated code to template method (+ required field validations should be part of try block) --- .../ErrorHandlingTaskDispatcher.java | 49 ++++++++++ .../branch/BranchTaskDispatcher.java | 94 +++++++++--------- .../condition/ConditionTaskDispatcher.java | 96 +++++++++--------- .../dispatcher/each/EachTaskDispatcher.java | 76 +++++++------- .../fork/join/ForkJoinTaskDispatcher.java | 91 ++++++++--------- .../dispatcher/loop/LoopTaskDispatcher.java | 78 +++++++-------- .../dispatcher/map/MapTaskDispatcher.java | 98 +++++++++---------- .../parallel/ParallelTaskDispatcher.java | 52 +++++----- 8 files changed, 319 insertions(+), 315 deletions(-) create mode 100644 server/libs/atlas/atlas-coordinator/atlas-coordinator-api/src/main/java/com/bytechef/atlas/coordinator/task/dispatcher/ErrorHandlingTaskDispatcher.java diff --git a/server/libs/atlas/atlas-coordinator/atlas-coordinator-api/src/main/java/com/bytechef/atlas/coordinator/task/dispatcher/ErrorHandlingTaskDispatcher.java b/server/libs/atlas/atlas-coordinator/atlas-coordinator-api/src/main/java/com/bytechef/atlas/coordinator/task/dispatcher/ErrorHandlingTaskDispatcher.java new file mode 100644 index 0000000000..85fa47a597 --- /dev/null +++ b/server/libs/atlas/atlas-coordinator/atlas-coordinator-api/src/main/java/com/bytechef/atlas/coordinator/task/dispatcher/ErrorHandlingTaskDispatcher.java @@ -0,0 +1,49 @@ +/* + * Copyright 2025 ByteChef + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytechef.atlas.coordinator.task.dispatcher; + +import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent; +import com.bytechef.atlas.execution.domain.TaskExecution; +import com.bytechef.error.ExecutionError; +import java.util.Arrays; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.springframework.context.ApplicationEventPublisher; + +/** + * @author Matija Petanjek + */ +public abstract class ErrorHandlingTaskDispatcher implements TaskDispatcher { + + private final ApplicationEventPublisher eventPublisher; + + public ErrorHandlingTaskDispatcher(ApplicationEventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + } + + public void dispatch(TaskExecution taskExecution) { + try { + doDispatch(taskExecution); + } catch (Exception exception) { + taskExecution.setError( + new ExecutionError(exception.getMessage(), Arrays.asList(ExceptionUtils.getStackFrames(exception)))); + + eventPublisher.publishEvent(new TaskExecutionErrorEvent(taskExecution)); + } + } + + public abstract void doDispatch(TaskExecution taskExecution); +} diff --git a/server/libs/modules/task-dispatchers/branch/src/main/java/com/bytechef/task/dispatcher/branch/BranchTaskDispatcher.java b/server/libs/modules/task-dispatchers/branch/src/main/java/com/bytechef/task/dispatcher/branch/BranchTaskDispatcher.java index 9008367612..c5536c5149 100644 --- a/server/libs/modules/task-dispatchers/branch/src/main/java/com/bytechef/task/dispatcher/branch/BranchTaskDispatcher.java +++ b/server/libs/modules/task-dispatchers/branch/src/main/java/com/bytechef/task/dispatcher/branch/BranchTaskDispatcher.java @@ -28,7 +28,7 @@ import com.bytechef.atlas.configuration.domain.Task; import com.bytechef.atlas.configuration.domain.WorkflowTask; import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent; -import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent; +import com.bytechef.atlas.coordinator.task.dispatcher.ErrorHandlingTaskDispatcher; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcher; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherResolver; import com.bytechef.atlas.execution.domain.Context.Classname; @@ -37,18 +37,15 @@ import com.bytechef.atlas.execution.service.TaskExecutionService; import com.bytechef.atlas.file.storage.TaskFileStorage; import com.bytechef.commons.util.MapUtils; -import com.bytechef.error.ExecutionError; import com.bytechef.evaluator.Evaluator; import com.fasterxml.jackson.core.type.TypeReference; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Instant; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.commons.lang3.Validate; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.context.ApplicationEventPublisher; /** @@ -57,7 +54,7 @@ * @since Jun 3, 2017 * @see BranchTaskCompletionHandler */ -public class BranchTaskDispatcher implements TaskDispatcher, TaskDispatcherResolver { +public class BranchTaskDispatcher extends ErrorHandlingTaskDispatcher implements TaskDispatcherResolver { private final ContextService contextService; private final Evaluator evaluator; @@ -72,6 +69,8 @@ public BranchTaskDispatcher( TaskDispatcher taskDispatcher, TaskExecutionService taskExecutionService, TaskFileStorage taskFileStorage) { + super(eventPublisher); + this.evaluator = evaluator; this.contextService = contextService; this.eventPublisher = eventPublisher; @@ -81,7 +80,7 @@ public BranchTaskDispatcher( } @Override - public void dispatch(TaskExecution taskExecution) { + public void doDispatch(TaskExecution taskExecution) { taskExecution.setStartDate(Instant.now()); taskExecution.setStatus(TaskExecution.Status.STARTED); @@ -89,61 +88,56 @@ public void dispatch(TaskExecution taskExecution) { Map selectedCase = resolveCase(taskExecution); - try { - if (selectedCase.containsKey(TASKS)) { - List subWorkflowTasks = MapUtils.getList( - selectedCase, TASKS, WorkflowTask.class, Collections.emptyList()); + if (selectedCase.containsKey(TASKS)) { + List subWorkflowTasks = MapUtils.getList( + selectedCase, TASKS, WorkflowTask.class, Collections.emptyList()); - if (subWorkflowTasks.isEmpty()) { - taskExecution.setStartDate(Instant.now()); - taskExecution.setEndDate(Instant.now()); - taskExecution.setExecutionTime(0); + if (subWorkflowTasks.isEmpty()) { + taskExecution.setStartDate(Instant.now()); + taskExecution.setEndDate(Instant.now()); + taskExecution.setExecutionTime(0); - eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); - } else { - WorkflowTask subWorkflowTask = subWorkflowTasks.get(0); + eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); + } else { + WorkflowTask subWorkflowTask = subWorkflowTasks.get(0); - TaskExecution subTaskExecution = TaskExecution.builder() - .jobId(taskExecution.getJobId()) - .parentId(taskExecution.getId()) - .priority(taskExecution.getPriority()) - .taskNumber(1) - .workflowTask(subWorkflowTask) - .build(); + TaskExecution subTaskExecution = TaskExecution.builder() + .jobId(taskExecution.getJobId()) + .parentId(taskExecution.getId()) + .priority(taskExecution.getPriority()) + .taskNumber(1) + .workflowTask(subWorkflowTask) + .build(); - Map context = taskFileStorage.readContextValue( - contextService.peek(Validate.notNull(taskExecution.getId(), "id"), Classname.TASK_EXECUTION)); + Map context = taskFileStorage.readContextValue( + contextService.peek(Validate.notNull(taskExecution.getId(), "id"), Classname.TASK_EXECUTION)); - subTaskExecution.evaluate(context, evaluator); + subTaskExecution.evaluate(context, evaluator); - subTaskExecution = taskExecutionService.create(subTaskExecution); + subTaskExecution = taskExecutionService.create(subTaskExecution); - contextService.push( - Validate.notNull(subTaskExecution.getId(), "id"), Classname.TASK_EXECUTION, - taskFileStorage.storeContextValue( - Validate.notNull(subTaskExecution.getId(), "id"), Classname.TASK_EXECUTION, context)); + contextService.push( + Validate.notNull(subTaskExecution.getId(), "id"), Classname.TASK_EXECUTION, + taskFileStorage.storeContextValue( + Validate.notNull(subTaskExecution.getId(), "id"), Classname.TASK_EXECUTION, context)); - taskDispatcher.dispatch(subTaskExecution); - } - } else { - taskExecution.setStartDate(Instant.now()); - taskExecution.setEndDate(Instant.now()); - taskExecution.setExecutionTime(0); - // TODO check, it seems wrong - - if (selectedCase.get("value") != null) { - taskExecution.setOutput( - taskFileStorage.storeTaskExecutionOutput( - Validate.notNull(taskExecution.getId(), "id"), selectedCase.get("value"))); - } - - eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); + taskDispatcher.dispatch(subTaskExecution); + } + } else { + taskExecution.setStartDate(Instant.now()); + taskExecution.setEndDate(Instant.now()); + taskExecution.setExecutionTime(0); + // TODO check, it seems wrong + + if (selectedCase.get("value") != null) { + taskExecution.setOutput( + taskFileStorage.storeTaskExecutionOutput( + Validate.notNull(taskExecution.getId(), "id"), selectedCase.get("value"))); } - } catch (Exception e) { - taskExecution.setError(new ExecutionError(e.getMessage(), Arrays.asList(ExceptionUtils.getStackFrames(e)))); - eventPublisher.publishEvent(new TaskExecutionErrorEvent(taskExecution)); + eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); } + } @Override diff --git a/server/libs/modules/task-dispatchers/condition/src/main/java/com/bytechef/task/dispatcher/condition/ConditionTaskDispatcher.java b/server/libs/modules/task-dispatchers/condition/src/main/java/com/bytechef/task/dispatcher/condition/ConditionTaskDispatcher.java index 4b945aba49..afc44fcd1b 100644 --- a/server/libs/modules/task-dispatchers/condition/src/main/java/com/bytechef/task/dispatcher/condition/ConditionTaskDispatcher.java +++ b/server/libs/modules/task-dispatchers/condition/src/main/java/com/bytechef/task/dispatcher/condition/ConditionTaskDispatcher.java @@ -23,7 +23,7 @@ import com.bytechef.atlas.configuration.domain.Task; import com.bytechef.atlas.configuration.domain.WorkflowTask; import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent; -import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent; +import com.bytechef.atlas.coordinator.task.dispatcher.ErrorHandlingTaskDispatcher; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcher; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherResolver; import com.bytechef.atlas.execution.domain.Context; @@ -32,25 +32,22 @@ import com.bytechef.atlas.execution.service.TaskExecutionService; import com.bytechef.atlas.file.storage.TaskFileStorage; import com.bytechef.commons.util.MapUtils; -import com.bytechef.error.ExecutionError; import com.bytechef.evaluator.Evaluator; import com.bytechef.task.dispatcher.condition.util.ConditionTaskUtils; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Instant; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.commons.lang3.Validate; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.context.ApplicationEventPublisher; /** * @author Ivica Cardic * @author Matija Petanjek */ -public class ConditionTaskDispatcher implements TaskDispatcher, TaskDispatcherResolver { +public class ConditionTaskDispatcher extends ErrorHandlingTaskDispatcher implements TaskDispatcherResolver { private final ContextService contextService; private final Evaluator evaluator; @@ -65,6 +62,8 @@ public ConditionTaskDispatcher( TaskDispatcher taskDispatcher, TaskExecutionService taskExecutionService, TaskFileStorage taskFileStorage) { + super(eventPublisher); + this.contextService = contextService; this.evaluator = evaluator; this.eventPublisher = eventPublisher; @@ -74,7 +73,7 @@ public ConditionTaskDispatcher( } @Override - public void dispatch(TaskExecution taskExecution) { + public void doDispatch(TaskExecution taskExecution) { taskExecution.setStartDate(Instant.now()); taskExecution.setStatus(TaskExecution.Status.STARTED); @@ -82,52 +81,47 @@ public void dispatch(TaskExecution taskExecution) { List subWorkflowTasks; - try { - if (ConditionTaskUtils.resolveCase(taskExecution)) { - subWorkflowTasks = MapUtils.getList( - taskExecution.getParameters(), CASE_TRUE, WorkflowTask.class, Collections.emptyList()); - } else { - subWorkflowTasks = MapUtils.getList( - taskExecution.getParameters(), CASE_FALSE, WorkflowTask.class, Collections.emptyList()); - } - - if (!subWorkflowTasks.isEmpty()) { - WorkflowTask subWorkflowTask = subWorkflowTasks.get(0); - - TaskExecution subTaskExecution = TaskExecution.builder() - .jobId(taskExecution.getJobId()) - .parentId(taskExecution.getId()) - .priority(taskExecution.getPriority()) - .taskNumber(1) - .workflowTask(subWorkflowTask) - .build(); - - Map context = taskFileStorage.readContextValue( - contextService.peek(Validate.notNull(taskExecution.getId(), "id"), - Context.Classname.TASK_EXECUTION)); - - subTaskExecution.evaluate(context, evaluator); - - subTaskExecution = taskExecutionService.create(subTaskExecution); - - contextService.push( - Validate.notNull(subTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, - taskFileStorage.storeContextValue( - Validate.notNull(subTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, context)); - - taskDispatcher.dispatch(subTaskExecution); - } else { - taskExecution.setStartDate(Instant.now()); - taskExecution.setEndDate(Instant.now()); - taskExecution.setExecutionTime(0); - - eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); - } - } catch (Exception e) { - taskExecution.setError(new ExecutionError(e.getMessage(), Arrays.asList(ExceptionUtils.getStackFrames(e)))); - - eventPublisher.publishEvent(new TaskExecutionErrorEvent(taskExecution)); + if (ConditionTaskUtils.resolveCase(taskExecution)) { + subWorkflowTasks = MapUtils.getList( + taskExecution.getParameters(), CASE_TRUE, WorkflowTask.class, Collections.emptyList()); + } else { + subWorkflowTasks = MapUtils.getList( + taskExecution.getParameters(), CASE_FALSE, WorkflowTask.class, Collections.emptyList()); } + + if (!subWorkflowTasks.isEmpty()) { + WorkflowTask subWorkflowTask = subWorkflowTasks.get(0); + + TaskExecution subTaskExecution = TaskExecution.builder() + .jobId(taskExecution.getJobId()) + .parentId(taskExecution.getId()) + .priority(taskExecution.getPriority()) + .taskNumber(1) + .workflowTask(subWorkflowTask) + .build(); + + Map context = taskFileStorage.readContextValue( + contextService.peek(Validate.notNull(taskExecution.getId(), "id"), + Context.Classname.TASK_EXECUTION)); + + subTaskExecution.evaluate(context, evaluator); + + subTaskExecution = taskExecutionService.create(subTaskExecution); + + contextService.push( + Validate.notNull(subTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, + taskFileStorage.storeContextValue( + Validate.notNull(subTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, context)); + + taskDispatcher.dispatch(subTaskExecution); + } else { + taskExecution.setStartDate(Instant.now()); + taskExecution.setEndDate(Instant.now()); + taskExecution.setExecutionTime(0); + + eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); + } + } @Override diff --git a/server/libs/modules/task-dispatchers/each/src/main/java/com/bytechef/task/dispatcher/each/EachTaskDispatcher.java b/server/libs/modules/task-dispatchers/each/src/main/java/com/bytechef/task/dispatcher/each/EachTaskDispatcher.java index 672b5aa4e8..147c5b9eda 100644 --- a/server/libs/modules/task-dispatchers/each/src/main/java/com/bytechef/task/dispatcher/each/EachTaskDispatcher.java +++ b/server/libs/modules/task-dispatchers/each/src/main/java/com/bytechef/task/dispatcher/each/EachTaskDispatcher.java @@ -26,7 +26,7 @@ import com.bytechef.atlas.configuration.domain.Task; import com.bytechef.atlas.configuration.domain.WorkflowTask; import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent; -import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent; +import com.bytechef.atlas.coordinator.task.dispatcher.ErrorHandlingTaskDispatcher; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcher; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherResolver; import com.bytechef.atlas.execution.domain.Context; @@ -36,18 +36,15 @@ import com.bytechef.atlas.execution.service.TaskExecutionService; import com.bytechef.atlas.file.storage.TaskFileStorage; import com.bytechef.commons.util.MapUtils; -import com.bytechef.error.ExecutionError; import com.bytechef.evaluator.Evaluator; import com.bytechef.task.dispatcher.each.constant.EachTaskDispatcherConstants; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Instant; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.commons.lang3.Validate; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.context.ApplicationEventPublisher; /** @@ -57,7 +54,7 @@ * @author Arik Cohen * @since Apr 25, 2017 */ -public class EachTaskDispatcher implements TaskDispatcher, TaskDispatcherResolver { +public class EachTaskDispatcher extends ErrorHandlingTaskDispatcher implements TaskDispatcherResolver { private final ContextService contextService; private final CounterService counterService; @@ -73,6 +70,8 @@ public EachTaskDispatcher( ApplicationEventPublisher eventPublisher, TaskDispatcher taskDispatcher, TaskExecutionService taskExecutionService, TaskFileStorage taskFileStorage) { + super(eventPublisher); + this.contextService = contextService; this.counterService = counterService; this.evaluator = evaluator; @@ -83,7 +82,7 @@ public EachTaskDispatcher( } @Override - public void dispatch(TaskExecution taskExecution) { + public void doDispatch(TaskExecution taskExecution) { WorkflowTask iteratee = MapUtils.getRequired(taskExecution.getParameters(), ITERATEE, WorkflowTask.class); List items = MapUtils.getRequiredList(taskExecution.getParameters(), ITEMS, Object.class); @@ -92,52 +91,47 @@ public void dispatch(TaskExecution taskExecution) { taskExecution = taskExecutionService.update(taskExecution); - try { - if (items.isEmpty()) { - taskExecution.setStartDate(Instant.now()); - taskExecution.setEndDate(Instant.now()); - taskExecution.setExecutionTime(0); + if (items.isEmpty()) { + taskExecution.setStartDate(Instant.now()); + taskExecution.setEndDate(Instant.now()); + taskExecution.setExecutionTime(0); - eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); - } else { - counterService.set(Validate.notNull(taskExecution.getId(), "id"), items.size()); + eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); + } else { + counterService.set(Validate.notNull(taskExecution.getId(), "id"), items.size()); - for (int i = 0; i < items.size(); i++) { - Object item = items.get(i); - TaskExecution iterateeTaskExecution = TaskExecution.builder() - .jobId(taskExecution.getJobId()) - .parentId(taskExecution.getId()) - .priority(taskExecution.getPriority()) - .taskNumber(i + 1) - .workflowTask(iteratee) - .build(); + for (int i = 0; i < items.size(); i++) { + Object item = items.get(i); + TaskExecution iterateeTaskExecution = TaskExecution.builder() + .jobId(taskExecution.getJobId()) + .parentId(taskExecution.getId()) + .priority(taskExecution.getPriority()) + .taskNumber(i + 1) + .workflowTask(iteratee) + .build(); - Map newContext = new HashMap<>( - taskFileStorage.readContextValue( - contextService.peek( - Validate.notNull(taskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION))); + Map newContext = new HashMap<>( + taskFileStorage.readContextValue( + contextService.peek( + Validate.notNull(taskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION))); - WorkflowTask workflowTask = taskExecution.getWorkflowTask(); + WorkflowTask workflowTask = taskExecution.getWorkflowTask(); - newContext.put(workflowTask.getName(), Map.of(ITEM, item, INDEX, i)); + newContext.put(workflowTask.getName(), Map.of(ITEM, item, INDEX, i)); - iterateeTaskExecution = taskExecutionService.create( - iterateeTaskExecution.evaluate(newContext, evaluator)); + iterateeTaskExecution = taskExecutionService.create( + iterateeTaskExecution.evaluate(newContext, evaluator)); - contextService.push( + contextService.push( + Validate.notNull(iterateeTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, + taskFileStorage.storeContextValue( Validate.notNull(iterateeTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, - taskFileStorage.storeContextValue( - Validate.notNull(iterateeTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, - newContext)); + newContext)); - taskDispatcher.dispatch(iterateeTaskExecution); - } + taskDispatcher.dispatch(iterateeTaskExecution); } - } catch (Exception e) { - taskExecution.setError(new ExecutionError(e.getMessage(), Arrays.asList(ExceptionUtils.getStackFrames(e)))); - - eventPublisher.publishEvent(new TaskExecutionErrorEvent(taskExecution)); } + } @Override diff --git a/server/libs/modules/task-dispatchers/fork-join/src/main/java/com/bytechef/task/dispatcher/fork/join/ForkJoinTaskDispatcher.java b/server/libs/modules/task-dispatchers/fork-join/src/main/java/com/bytechef/task/dispatcher/fork/join/ForkJoinTaskDispatcher.java index 83398f4e2a..4740e54cb0 100644 --- a/server/libs/modules/task-dispatchers/fork-join/src/main/java/com/bytechef/task/dispatcher/fork/join/ForkJoinTaskDispatcher.java +++ b/server/libs/modules/task-dispatchers/fork-join/src/main/java/com/bytechef/task/dispatcher/fork/join/ForkJoinTaskDispatcher.java @@ -22,7 +22,7 @@ import com.bytechef.atlas.configuration.domain.Task; import com.bytechef.atlas.configuration.domain.WorkflowTask; import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent; -import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent; +import com.bytechef.atlas.coordinator.task.dispatcher.ErrorHandlingTaskDispatcher; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcher; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherResolver; import com.bytechef.atlas.execution.domain.Context.Classname; @@ -32,18 +32,15 @@ import com.bytechef.atlas.execution.service.TaskExecutionService; import com.bytechef.atlas.file.storage.TaskFileStorage; import com.bytechef.commons.util.MapUtils; -import com.bytechef.error.ExecutionError; import com.bytechef.evaluator.Evaluator; import com.bytechef.task.dispatcher.fork.join.constant.ForkJoinTaskDispatcherConstants; import com.fasterxml.jackson.core.type.TypeReference; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Instant; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.commons.lang3.Validate; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.context.ApplicationEventPublisher; /** @@ -84,7 +81,7 @@ * @since May 11, 2017 * @see ForkJoinTaskCompletionHandler */ -public class ForkJoinTaskDispatcher implements TaskDispatcher, TaskDispatcherResolver { +public class ForkJoinTaskDispatcher extends ErrorHandlingTaskDispatcher implements TaskDispatcherResolver { private final ContextService contextService; private final CounterService counterService; @@ -100,6 +97,8 @@ public ForkJoinTaskDispatcher( ApplicationEventPublisher eventPublisher, TaskDispatcher taskDispatcher, TaskExecutionService taskExecutionService, TaskFileStorage taskFileStorage) { + super(eventPublisher); + this.contextService = contextService; this.counterService = counterService; this.evaluator = evaluator; @@ -110,7 +109,7 @@ public ForkJoinTaskDispatcher( } @Override - public void dispatch(TaskExecution taskExecution) { + public void doDispatch(TaskExecution taskExecution) { List> branchesWorkflowTasks = MapUtils.getRequiredList( taskExecution.getParameters(), ForkJoinTaskDispatcherConstants.BRANCHES, new TypeReference<>() {}); @@ -119,63 +118,57 @@ public void dispatch(TaskExecution taskExecution) { taskExecution = taskExecutionService.update(taskExecution); - try { - if (branchesWorkflowTasks.isEmpty()) { - taskExecution.setStartDate(Instant.now()); - taskExecution.setEndDate(Instant.now()); - taskExecution.setExecutionTime(0); + if (branchesWorkflowTasks.isEmpty()) { + taskExecution.setStartDate(Instant.now()); + taskExecution.setEndDate(Instant.now()); + taskExecution.setExecutionTime(0); - eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); - } else { - long taskExecutionId = Validate.notNull(taskExecution.getId(), "id"); + eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); + } else { + long taskExecutionId = Validate.notNull(taskExecution.getId(), "id"); - counterService.set(taskExecutionId, branchesWorkflowTasks.size()); + counterService.set(taskExecutionId, branchesWorkflowTasks.size()); - long taskExecutionJobId = Validate.notNull( - taskExecution.getJobId(), "'taskExecution.jobId' must not be null"); + long taskExecutionJobId = Validate.notNull( + taskExecution.getJobId(), "'taskExecution.jobId' must not be null"); - for (int i = 0; i < branchesWorkflowTasks.size(); i++) { - List branchWorkflowTasks = branchesWorkflowTasks.get(i); + for (int i = 0; i < branchesWorkflowTasks.size(); i++) { + List branchWorkflowTasks = branchesWorkflowTasks.get(i); - Validate.isTrue(!branchWorkflowTasks.isEmpty(), "branch " + i + " does not contain any tasks"); + Validate.isTrue(!branchWorkflowTasks.isEmpty(), "branch " + i + " does not contain any tasks"); - WorkflowTask branchWorkflowTask = branchWorkflowTasks.getFirst(); + WorkflowTask branchWorkflowTask = branchWorkflowTasks.getFirst(); - TaskExecution branchTaskExecution = TaskExecution.builder() - .jobId(taskExecutionJobId) - .parentId(taskExecution.getId()) - .priority(taskExecution.getPriority()) - .taskNumber(1) - .workflowTask( - new WorkflowTask( - MapUtils.append( - branchWorkflowTask.toMap(), WorkflowConstants.PARAMETERS, - Map.of(ForkJoinTaskDispatcherConstants.BRANCH, i)))) - .build(); + TaskExecution branchTaskExecution = TaskExecution.builder() + .jobId(taskExecutionJobId) + .parentId(taskExecution.getId()) + .priority(taskExecution.getPriority()) + .taskNumber(1) + .workflowTask( + new WorkflowTask( + MapUtils.append( + branchWorkflowTask.toMap(), WorkflowConstants.PARAMETERS, + Map.of(ForkJoinTaskDispatcherConstants.BRANCH, i)))) + .build(); - Map context = taskFileStorage.readContextValue( - contextService.peek(taskExecutionId, Classname.TASK_EXECUTION)); + Map context = taskFileStorage.readContextValue( + contextService.peek(taskExecutionId, Classname.TASK_EXECUTION)); - branchTaskExecution.evaluate(context, evaluator); + branchTaskExecution.evaluate(context, evaluator); - branchTaskExecution = taskExecutionService.create(branchTaskExecution); + branchTaskExecution = taskExecutionService.create(branchTaskExecution); - long branchTaskExecutionId = Validate.notNull(branchTaskExecution.getId(), "id"); + long branchTaskExecutionId = Validate.notNull(branchTaskExecution.getId(), "id"); - contextService.push( - branchTaskExecutionId, Classname.TASK_EXECUTION, - taskFileStorage.storeContextValue(branchTaskExecutionId, Classname.TASK_EXECUTION, context)); + contextService.push( + branchTaskExecutionId, Classname.TASK_EXECUTION, + taskFileStorage.storeContextValue(branchTaskExecutionId, Classname.TASK_EXECUTION, context)); - contextService.push( - taskExecutionId, i, Classname.TASK_EXECUTION, - taskFileStorage.storeContextValue(taskExecutionId, i, Classname.TASK_EXECUTION, context)); - taskDispatcher.dispatch(branchTaskExecution); - } + contextService.push( + taskExecutionId, i, Classname.TASK_EXECUTION, + taskFileStorage.storeContextValue(taskExecutionId, i, Classname.TASK_EXECUTION, context)); + taskDispatcher.dispatch(branchTaskExecution); } - } catch (Exception e) { - taskExecution.setError(new ExecutionError(e.getMessage(), Arrays.asList(ExceptionUtils.getStackFrames(e)))); - - eventPublisher.publishEvent(new TaskExecutionErrorEvent(taskExecution)); } } diff --git a/server/libs/modules/task-dispatchers/loop/src/main/java/com/bytechef/task/dispatcher/loop/LoopTaskDispatcher.java b/server/libs/modules/task-dispatchers/loop/src/main/java/com/bytechef/task/dispatcher/loop/LoopTaskDispatcher.java index e8dbb32f69..4bb5b544a9 100644 --- a/server/libs/modules/task-dispatchers/loop/src/main/java/com/bytechef/task/dispatcher/loop/LoopTaskDispatcher.java +++ b/server/libs/modules/task-dispatchers/loop/src/main/java/com/bytechef/task/dispatcher/loop/LoopTaskDispatcher.java @@ -26,7 +26,7 @@ import com.bytechef.atlas.configuration.domain.Task; import com.bytechef.atlas.configuration.domain.WorkflowTask; import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent; -import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent; +import com.bytechef.atlas.coordinator.task.dispatcher.ErrorHandlingTaskDispatcher; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcher; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherResolver; import com.bytechef.atlas.execution.domain.Context; @@ -35,18 +35,15 @@ import com.bytechef.atlas.execution.service.TaskExecutionService; import com.bytechef.atlas.file.storage.TaskFileStorage; import com.bytechef.commons.util.MapUtils; -import com.bytechef.error.ExecutionError; import com.bytechef.evaluator.Evaluator; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Instant; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.commons.lang3.Validate; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.context.ApplicationEventPublisher; /** @@ -56,7 +53,7 @@ * @author Ivica Cardic * @author Igor Beslic */ -public class LoopTaskDispatcher implements TaskDispatcher, TaskDispatcherResolver { +public class LoopTaskDispatcher extends ErrorHandlingTaskDispatcher implements TaskDispatcherResolver { private final ContextService contextService; private final Evaluator evaluator; @@ -71,6 +68,8 @@ public LoopTaskDispatcher( TaskDispatcher taskDispatcher, TaskExecutionService taskExecutionService, TaskFileStorage taskFileStorage) { + super(eventPublisher); + this.contextService = contextService; this.evaluator = evaluator; this.eventPublisher = eventPublisher; @@ -80,7 +79,7 @@ public LoopTaskDispatcher( } @Override - public void dispatch(TaskExecution taskExecution) { + public void doDispatch(TaskExecution taskExecution) { boolean loopForever = MapUtils.getBoolean(taskExecution.getParameters(), LOOP_FOREVER, false); List iterateeWorkflowTasks = MapUtils.getRequiredList( taskExecution.getParameters(), ITERATEE, WorkflowTask.class); @@ -91,54 +90,49 @@ public void dispatch(TaskExecution taskExecution) { taskExecution = taskExecutionService.update(taskExecution); - try { - if (loopForever || !items.isEmpty()) { - TaskExecution subTaskExecution = TaskExecution.builder() - .jobId(taskExecution.getJobId()) - .parentId(taskExecution.getId()) - .priority(taskExecution.getPriority()) - .taskNumber(0) - .workflowTask(iterateeWorkflowTasks.getFirst()) - .build(); + if (loopForever || !items.isEmpty()) { + TaskExecution subTaskExecution = TaskExecution.builder() + .jobId(taskExecution.getJobId()) + .parentId(taskExecution.getId()) + .priority(taskExecution.getPriority()) + .taskNumber(0) + .workflowTask(iterateeWorkflowTasks.getFirst()) + .build(); - Map newContext = new HashMap<>( - taskFileStorage.readContextValue( - contextService.peek( - Validate.notNull(taskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION))); + Map newContext = new HashMap<>( + taskFileStorage.readContextValue( + contextService.peek( + Validate.notNull(taskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION))); - Map workflowTaskNameMap = new HashMap<>(); + Map workflowTaskNameMap = new HashMap<>(); - if (!items.isEmpty()) { - workflowTaskNameMap.put(ITEM, items.getFirst()); - } + if (!items.isEmpty()) { + workflowTaskNameMap.put(ITEM, items.getFirst()); + } - workflowTaskNameMap.put(INDEX, 0); + workflowTaskNameMap.put(INDEX, 0); - WorkflowTask loopWorkflowTask = taskExecution.getWorkflowTask(); + WorkflowTask loopWorkflowTask = taskExecution.getWorkflowTask(); - newContext.put(loopWorkflowTask.getName(), workflowTaskNameMap); + newContext.put(loopWorkflowTask.getName(), workflowTaskNameMap); - subTaskExecution = taskExecutionService.create(subTaskExecution.evaluate(newContext, evaluator)); + subTaskExecution = taskExecutionService.create(subTaskExecution.evaluate(newContext, evaluator)); - contextService.push( + contextService.push( + Validate.notNull(subTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, + taskFileStorage.storeContextValue( Validate.notNull(subTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, - taskFileStorage.storeContextValue( - Validate.notNull(subTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, - newContext)); + newContext)); - taskDispatcher.dispatch(subTaskExecution); - } else { - taskExecution.setStartDate(Instant.now()); - taskExecution.setEndDate(Instant.now()); - taskExecution.setExecutionTime(0); + taskDispatcher.dispatch(subTaskExecution); + } else { + taskExecution.setStartDate(Instant.now()); + taskExecution.setEndDate(Instant.now()); + taskExecution.setExecutionTime(0); - eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); - } - } catch (Exception e) { - taskExecution.setError(new ExecutionError(e.getMessage(), Arrays.asList(ExceptionUtils.getStackFrames(e)))); - - eventPublisher.publishEvent(new TaskExecutionErrorEvent(taskExecution)); + eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); } + } @Override diff --git a/server/libs/modules/task-dispatchers/map/src/main/java/com/bytechef/task/dispatcher/map/MapTaskDispatcher.java b/server/libs/modules/task-dispatchers/map/src/main/java/com/bytechef/task/dispatcher/map/MapTaskDispatcher.java index 0343eaae9a..a36be213d7 100644 --- a/server/libs/modules/task-dispatchers/map/src/main/java/com/bytechef/task/dispatcher/map/MapTaskDispatcher.java +++ b/server/libs/modules/task-dispatchers/map/src/main/java/com/bytechef/task/dispatcher/map/MapTaskDispatcher.java @@ -28,7 +28,7 @@ import com.bytechef.atlas.configuration.domain.Task; import com.bytechef.atlas.configuration.domain.WorkflowTask; import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent; -import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent; +import com.bytechef.atlas.coordinator.task.dispatcher.ErrorHandlingTaskDispatcher; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcher; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherResolver; import com.bytechef.atlas.execution.domain.Context.Classname; @@ -38,25 +38,22 @@ import com.bytechef.atlas.execution.service.TaskExecutionService; import com.bytechef.atlas.file.storage.TaskFileStorage; import com.bytechef.commons.util.MapUtils; -import com.bytechef.error.ExecutionError; import com.bytechef.evaluator.Evaluator; import com.bytechef.task.dispatcher.map.constant.MapTaskDispatcherConstants; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Instant; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.commons.lang3.Validate; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.context.ApplicationEventPublisher; /** * @author Arik Cohen * @since Jun 4, 2017 */ -public class MapTaskDispatcher implements TaskDispatcher, TaskDispatcherResolver { +public class MapTaskDispatcher extends ErrorHandlingTaskDispatcher implements TaskDispatcherResolver { private final ContextService contextService; private final CounterService counterService; @@ -72,6 +69,8 @@ public MapTaskDispatcher( ApplicationEventPublisher eventPublisher, TaskDispatcher taskDispatcher, TaskExecutionService taskExecutionService, TaskFileStorage taskFileStorage) { + super(eventPublisher); + this.contextService = contextService; this.counterService = counterService; this.evaluator = evaluator; @@ -82,7 +81,7 @@ public MapTaskDispatcher( } @Override - public void dispatch(TaskExecution taskExecution) { + public void doDispatch(TaskExecution taskExecution) { List items = MapUtils.getRequiredList(taskExecution.getParameters(), ITEMS, Object.class); List iterateeWorkflowTasks = MapUtils.getRequiredList( taskExecution.getParameters(), ITERATEE, WorkflowTask.class); @@ -91,68 +90,63 @@ public void dispatch(TaskExecution taskExecution) { taskExecution = taskExecutionService.update(taskExecution); - try { - if (items.isEmpty()) { - taskExecution.setStartDate(Instant.now()); - taskExecution.setEndDate(Instant.now()); - taskExecution.setExecutionTime(0); - - eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); - } else { - long taskExecutionId = Validate.notNull(taskExecution.getId(), "id"); + if (items.isEmpty()) { + taskExecution.setStartDate(Instant.now()); + taskExecution.setEndDate(Instant.now()); + taskExecution.setExecutionTime(0); - counterService.set(taskExecutionId, items.size()); + eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); + } else { + long taskExecutionId = Validate.notNull(taskExecution.getId(), "id"); - long taskExecutionJobId = Validate.notNull( - taskExecution.getJobId(), "'taskExecution.jobId' must not be null"); + counterService.set(taskExecutionId, items.size()); - for (int i = 0; i < items.size(); i++) { - Object item = items.get(i); - WorkflowTask iterateeWorkflowTask = iterateeWorkflowTasks.getFirst(); + long taskExecutionJobId = Validate.notNull( + taskExecution.getJobId(), "'taskExecution.jobId' must not be null"); - TaskExecution iterateeTaskExecution = TaskExecution.builder() - .jobId(taskExecutionJobId) - .parentId(taskExecution.getId()) - .priority(taskExecution.getPriority()) - .taskNumber(1) - .workflowTask( - new WorkflowTask( - MapUtils.append( - iterateeWorkflowTask.toMap(), WorkflowConstants.PARAMETERS, - Map.of(MapTaskDispatcherConstants.ITERATION, i)))) - .build(); + for (int i = 0; i < items.size(); i++) { + Object item = items.get(i); + WorkflowTask iterateeWorkflowTask = iterateeWorkflowTasks.getFirst(); - Map context = new HashMap<>( - taskFileStorage - .readContextValue(contextService.peek(taskExecutionId, Classname.TASK_EXECUTION))); + TaskExecution iterateeTaskExecution = TaskExecution.builder() + .jobId(taskExecutionJobId) + .parentId(taskExecution.getId()) + .priority(taskExecution.getPriority()) + .taskNumber(1) + .workflowTask( + new WorkflowTask( + MapUtils.append( + iterateeWorkflowTask.toMap(), WorkflowConstants.PARAMETERS, + Map.of(MapTaskDispatcherConstants.ITERATION, i)))) + .build(); - WorkflowTask workflowTask = taskExecution.getWorkflowTask(); + Map context = new HashMap<>( + taskFileStorage + .readContextValue(contextService.peek(taskExecutionId, Classname.TASK_EXECUTION))); - context.put(workflowTask.getName(), Map.of(ITEM, item, INDEX, i)); + WorkflowTask workflowTask = taskExecution.getWorkflowTask(); - iterateeTaskExecution.evaluate(context, evaluator); + context.put(workflowTask.getName(), Map.of(ITEM, item, INDEX, i)); - iterateeTaskExecution = taskExecutionService.create(iterateeTaskExecution); + iterateeTaskExecution.evaluate(context, evaluator); - long iterateeTaskExecutionId = Validate.notNull(iterateeTaskExecution.getId(), "id"); + iterateeTaskExecution = taskExecutionService.create(iterateeTaskExecution); - contextService.push( - iterateeTaskExecutionId, Classname.TASK_EXECUTION, - taskFileStorage.storeContextValue(iterateeTaskExecutionId, Classname.TASK_EXECUTION, context)); + long iterateeTaskExecutionId = Validate.notNull(iterateeTaskExecution.getId(), "id"); - contextService.push( - taskExecutionId, i, Classname.TASK_EXECUTION, - taskFileStorage.storeContextValue(taskExecutionId, i, Classname.TASK_EXECUTION, context)); - taskDispatcher.dispatch(iterateeTaskExecution); - } + contextService.push( + iterateeTaskExecutionId, Classname.TASK_EXECUTION, + taskFileStorage.storeContextValue(iterateeTaskExecutionId, Classname.TASK_EXECUTION, context)); - counterService.set(Validate.notNull(taskExecution.getId(), "id"), items.size()); + contextService.push( + taskExecutionId, i, Classname.TASK_EXECUTION, + taskFileStorage.storeContextValue(taskExecutionId, i, Classname.TASK_EXECUTION, context)); + taskDispatcher.dispatch(iterateeTaskExecution); } - } catch (Exception e) { - taskExecution.setError(new ExecutionError(e.getMessage(), Arrays.asList(ExceptionUtils.getStackFrames(e)))); - eventPublisher.publishEvent(new TaskExecutionErrorEvent(taskExecution)); + counterService.set(Validate.notNull(taskExecution.getId(), "id"), items.size()); } + } @Override diff --git a/server/libs/modules/task-dispatchers/parallel/src/main/java/com/bytechef/task/dispatcher/parallel/ParallelTaskDispatcher.java b/server/libs/modules/task-dispatchers/parallel/src/main/java/com/bytechef/task/dispatcher/parallel/ParallelTaskDispatcher.java index 05e2344c99..f31508545f 100644 --- a/server/libs/modules/task-dispatchers/parallel/src/main/java/com/bytechef/task/dispatcher/parallel/ParallelTaskDispatcher.java +++ b/server/libs/modules/task-dispatchers/parallel/src/main/java/com/bytechef/task/dispatcher/parallel/ParallelTaskDispatcher.java @@ -24,7 +24,7 @@ import com.bytechef.atlas.configuration.domain.Task; import com.bytechef.atlas.configuration.domain.WorkflowTask; import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent; -import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent; +import com.bytechef.atlas.coordinator.task.dispatcher.ErrorHandlingTaskDispatcher; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcher; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherResolver; import com.bytechef.atlas.execution.domain.Context; @@ -34,16 +34,13 @@ import com.bytechef.atlas.execution.service.TaskExecutionService; import com.bytechef.atlas.file.storage.TaskFileStorage; import com.bytechef.commons.util.MapUtils; -import com.bytechef.error.ExecutionError; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Instant; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.commons.lang3.Validate; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.context.ApplicationEventPublisher; /** @@ -54,7 +51,7 @@ * @author Arik Cohen * @since May 12, 2017 */ -public class ParallelTaskDispatcher implements TaskDispatcher, TaskDispatcherResolver { +public class ParallelTaskDispatcher extends ErrorHandlingTaskDispatcher implements TaskDispatcherResolver { private final ContextService contextService; private final CounterService counterService; @@ -69,6 +66,8 @@ public ParallelTaskDispatcher( TaskDispatcher taskDispatcher, TaskExecutionService taskExecutionService, TaskFileStorage taskFileStorage) { + super(eventPublisher); + this.contextService = contextService; this.counterService = counterService; this.eventPublisher = eventPublisher; @@ -78,7 +77,7 @@ public ParallelTaskDispatcher( } @Override - public void dispatch(TaskExecution taskExecution) { + public void doDispatch(TaskExecution taskExecution) { List workflowTasks = Validate.notNull( MapUtils.getList(taskExecution.getParameters(), TASKS, WorkflowTask.class, Collections.emptyList()), "'workflowTasks' property must not be null"); @@ -90,35 +89,28 @@ public void dispatch(TaskExecution taskExecution) { eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution)); } else { - try { - counterService.set(Validate.notNull(taskExecution.getId(), "id"), workflowTasks.size()); + counterService.set(Validate.notNull(taskExecution.getId(), "id"), workflowTasks.size()); - for (WorkflowTask workflowTask : workflowTasks) { - TaskExecution parallelTaskExecution = TaskExecution.builder() - .jobId(taskExecution.getJobId()) - .parentId(taskExecution.getId()) - .priority(taskExecution.getPriority()) - .workflowTask(workflowTask) - .build(); + for (WorkflowTask workflowTask : workflowTasks) { + TaskExecution parallelTaskExecution = TaskExecution.builder() + .jobId(taskExecution.getJobId()) + .parentId(taskExecution.getId()) + .priority(taskExecution.getPriority()) + .workflowTask(workflowTask) + .build(); - parallelTaskExecution = taskExecutionService.create(parallelTaskExecution); + parallelTaskExecution = taskExecutionService.create(parallelTaskExecution); - Map context = taskFileStorage.readContextValue( - contextService.peek( - Validate.notNull(taskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION)); + Map context = taskFileStorage.readContextValue( + contextService.peek( + Validate.notNull(taskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION)); - contextService.push( + contextService.push( + Validate.notNull(parallelTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, + taskFileStorage.storeContextValue( Validate.notNull(parallelTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, - taskFileStorage.storeContextValue( - Validate.notNull(parallelTaskExecution.getId(), "id"), Context.Classname.TASK_EXECUTION, - context)); - taskDispatcher.dispatch(parallelTaskExecution); - } - } catch (Exception e) { - taskExecution - .setError(new ExecutionError(e.getMessage(), Arrays.asList(ExceptionUtils.getStackFrames(e)))); - - eventPublisher.publishEvent(new TaskExecutionErrorEvent(taskExecution)); + context)); + taskDispatcher.dispatch(parallelTaskExecution); } } } From dc2c1921963d3d16970e33186b2039570d2d9920 Mon Sep 17 00:00:00 2001 From: Matija Petanjek Date: Wed, 19 Nov 2025 11:19:28 +0100 Subject: [PATCH 17/17] 2465 - update test, dispatcher don't throw exceptions but they are handled through ExecutionErrorEventListener --- .../each/EachTaskDispatcherTest.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/server/libs/modules/task-dispatchers/each/src/test/java/com/bytechef/task/dispatcher/each/EachTaskDispatcherTest.java b/server/libs/modules/task-dispatchers/each/src/test/java/com/bytechef/task/dispatcher/each/EachTaskDispatcherTest.java index e8d5536234..39fdcff126 100644 --- a/server/libs/modules/task-dispatchers/each/src/test/java/com/bytechef/task/dispatcher/each/EachTaskDispatcherTest.java +++ b/server/libs/modules/task-dispatchers/each/src/test/java/com/bytechef/task/dispatcher/each/EachTaskDispatcherTest.java @@ -29,6 +29,7 @@ import com.bytechef.atlas.configuration.domain.Task; import com.bytechef.atlas.configuration.domain.WorkflowTask; import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent; +import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent; import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcher; import com.bytechef.atlas.execution.domain.Context; import com.bytechef.atlas.execution.domain.TaskExecution; @@ -46,7 +47,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.boot.jackson.JsonComponentModule; import org.springframework.context.ApplicationEventPublisher; @@ -72,16 +72,16 @@ public class EachTaskDispatcherTest { } @Test - public void testDispatch1() { - Assertions.assertThrows(NullPointerException.class, () -> { - EachTaskDispatcher dispatcher = new EachTaskDispatcher( - contextService, counterService, SpelEvaluator.create(), eventPublisher, taskDispatcher, - taskExecutionService, taskFileStorage); - - dispatcher.dispatch(TaskExecution.builder() - .workflowTask(new WorkflowTask(Map.of(WorkflowConstants.NAME, "name", WorkflowConstants.TYPE, "type"))) - .build()); - }); + public void testEachTaskDispatcherWhenMissingRequiredParameter() { + + EachTaskDispatcher dispatcher = new EachTaskDispatcher( + contextService, counterService, SpelEvaluator.create(), eventPublisher, taskDispatcher, + taskExecutionService, taskFileStorage); + + dispatcher.dispatch(TaskExecution.builder() + .workflowTask(new WorkflowTask(Map.of(WorkflowConstants.NAME, "name", WorkflowConstants.TYPE, "type"))) + .build()); + verify(eventPublisher, times(1)).publishEvent(any(TaskExecutionErrorEvent.class)); } @Test