Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const WorkflowExecutionSheetAccordion = ({job, triggerExecution}: {job: Job; tri
const startTime = job?.startDate?.getTime();
const endTime = job?.endDate?.getTime();

const taskExecutionsCompleted = job?.taskExecutions?.every((taskExecution) => taskExecution.status === 'COMPLETED');
const taskExecutionsCompleted = job?.status === 'COMPLETED';
const triggerExecutionCompleted = !triggerExecution || triggerExecution?.status === 'COMPLETED';

let duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const WorkflowExecutionsTestOutputHeader = ({
const startTime = job?.startDate?.getTime();
const endTime = job?.endDate?.getTime();

const taskExecutionsCompleted = job?.taskExecutions?.every((taskExecution) => taskExecution.status === 'COMPLETED');
const taskExecutionsCompleted = job?.status === 'COMPLETED';
const triggerExecutionCompleted = !triggerExecution || triggerExecution?.status === 'COMPLETED';

let duration = 0;
Expand Down
1 change: 1 addition & 0 deletions server/apps/server-app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ dependencies {
implementation(project(":server:libs:modules:task-dispatchers:fork-join"))
implementation(project(":server:libs:modules:task-dispatchers:loop"))
implementation(project(":server:libs:modules:task-dispatchers:map"))
implementation(project(":server:libs:modules:task-dispatchers:on-error"))
implementation(project(":server:libs:modules:task-dispatchers:parallel"))
implementation(project(":server:libs:modules:task-dispatchers:subflow"))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2025 ByteChef
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytechef.atlas.coordinator.task.dispatcher;

import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent;
import com.bytechef.atlas.execution.domain.TaskExecution;
import com.bytechef.error.ExecutionError;
import java.util.Arrays;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.context.ApplicationEventPublisher;

/**
* @author Matija Petanjek
*/
public abstract class ErrorHandlingTaskDispatcher implements TaskDispatcher<TaskExecution> {

private final ApplicationEventPublisher eventPublisher;

public ErrorHandlingTaskDispatcher(ApplicationEventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}

public void dispatch(TaskExecution taskExecution) {
try {
doDispatch(taskExecution);
} catch (Exception exception) {
taskExecution.setError(
new ExecutionError(exception.getMessage(), Arrays.asList(ExceptionUtils.getStackFrames(exception))));

eventPublisher.publishEvent(new TaskExecutionErrorEvent(taskExecution));
}
}

public abstract void doDispatch(TaskExecution taskExecution);
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ LogTaskApplicationEventListener logTaskApplicationEventListener() {
}

@Bean
TaskExecutionErrorEventListener taskExecutionErrorHandler() {
TaskExecutionErrorEventListener taskExecutionErrorEventListener() {
return new TaskExecutionErrorEventListener(eventPublisher, jobService, taskDispatcher(), taskExecutionService);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,20 @@ public void onErrorEvent(ErrorEvent errorEvent) {
while (taskExecution.getParentId() != null) { // mark parent tasks as FAILED as well
taskExecution = taskExecutionService.getTaskExecution(taskExecution.getParentId());

// if it's an on-error dispatcher and its error is not set, navigate back to the on-error
// dispatcher to start executing the error branch
if (taskExecution.getType()
.startsWith("on-error/") && taskExecution.getError() == null) {
taskExecution.setError(error);
taskExecution.setStatus(TaskExecution.Status.CREATED);

taskExecution = taskExecutionService.update(taskExecution);

taskDispatcher.dispatch(taskExecution);

return;
}

taskExecution.setEndDate(Instant.now());
taskExecution.setStatus(TaskExecution.Status.FAILED);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package com.bytechef.error;

import edu.umd.cs.findbugs.annotations.Nullable;

/**
* An interface which marks an object as being able to provide {@link ExecutionError} status about itself.
*
Expand All @@ -27,5 +29,6 @@
public interface Errorable {

/** Returns the error associated with the object. */
@Nullable
ExecutionError getError();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.bytechef.atlas.configuration.domain.Task;
import com.bytechef.atlas.configuration.domain.WorkflowTask;
import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent;
import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent;
import com.bytechef.atlas.coordinator.task.dispatcher.ErrorHandlingTaskDispatcher;
import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcher;
import com.bytechef.atlas.coordinator.task.dispatcher.TaskDispatcherResolver;
import com.bytechef.atlas.execution.domain.Context.Classname;
Expand All @@ -37,18 +37,15 @@
import com.bytechef.atlas.execution.service.TaskExecutionService;
import com.bytechef.atlas.file.storage.TaskFileStorage;
import com.bytechef.commons.util.MapUtils;
import com.bytechef.error.ExecutionError;
import com.bytechef.evaluator.Evaluator;
import com.fasterxml.jackson.core.type.TypeReference;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.context.ApplicationEventPublisher;

/**
Expand All @@ -57,7 +54,7 @@
* @since Jun 3, 2017
* @see BranchTaskCompletionHandler
*/
public class BranchTaskDispatcher implements TaskDispatcher<TaskExecution>, TaskDispatcherResolver {
public class BranchTaskDispatcher extends ErrorHandlingTaskDispatcher implements TaskDispatcherResolver {

private final ContextService contextService;
private final Evaluator evaluator;
Expand All @@ -72,6 +69,8 @@ public BranchTaskDispatcher(
TaskDispatcher<? super Task> taskDispatcher, TaskExecutionService taskExecutionService,
TaskFileStorage taskFileStorage) {

super(eventPublisher);

this.evaluator = evaluator;
this.contextService = contextService;
this.eventPublisher = eventPublisher;
Expand All @@ -81,69 +80,64 @@ public BranchTaskDispatcher(
}

@Override
public void dispatch(TaskExecution taskExecution) {
public void doDispatch(TaskExecution taskExecution) {
taskExecution.setStartDate(Instant.now());
taskExecution.setStatus(TaskExecution.Status.STARTED);

taskExecution = taskExecutionService.update(taskExecution);

Map<String, ?> selectedCase = resolveCase(taskExecution);

try {
if (selectedCase.containsKey(TASKS)) {
List<WorkflowTask> subWorkflowTasks = MapUtils.getList(
selectedCase, TASKS, WorkflowTask.class, Collections.emptyList());
if (selectedCase.containsKey(TASKS)) {
List<WorkflowTask> subWorkflowTasks = MapUtils.getList(
selectedCase, TASKS, WorkflowTask.class, Collections.emptyList());

if (subWorkflowTasks.isEmpty()) {
taskExecution.setStartDate(Instant.now());
taskExecution.setEndDate(Instant.now());
taskExecution.setExecutionTime(0);
if (subWorkflowTasks.isEmpty()) {
taskExecution.setStartDate(Instant.now());
taskExecution.setEndDate(Instant.now());
taskExecution.setExecutionTime(0);

eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution));
} else {
WorkflowTask subWorkflowTask = subWorkflowTasks.get(0);
eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution));
} else {
WorkflowTask subWorkflowTask = subWorkflowTasks.get(0);

TaskExecution subTaskExecution = TaskExecution.builder()
.jobId(taskExecution.getJobId())
.parentId(taskExecution.getId())
.priority(taskExecution.getPriority())
.taskNumber(1)
.workflowTask(subWorkflowTask)
.build();
TaskExecution subTaskExecution = TaskExecution.builder()
.jobId(taskExecution.getJobId())
.parentId(taskExecution.getId())
.priority(taskExecution.getPriority())
.taskNumber(1)
.workflowTask(subWorkflowTask)
.build();

Map<String, ?> context = taskFileStorage.readContextValue(
contextService.peek(Validate.notNull(taskExecution.getId(), "id"), Classname.TASK_EXECUTION));
Map<String, ?> context = taskFileStorage.readContextValue(
contextService.peek(Validate.notNull(taskExecution.getId(), "id"), Classname.TASK_EXECUTION));

subTaskExecution.evaluate(context, evaluator);
subTaskExecution.evaluate(context, evaluator);

subTaskExecution = taskExecutionService.create(subTaskExecution);
subTaskExecution = taskExecutionService.create(subTaskExecution);

contextService.push(
Validate.notNull(subTaskExecution.getId(), "id"), Classname.TASK_EXECUTION,
taskFileStorage.storeContextValue(
Validate.notNull(subTaskExecution.getId(), "id"), Classname.TASK_EXECUTION, context));
contextService.push(
Validate.notNull(subTaskExecution.getId(), "id"), Classname.TASK_EXECUTION,
taskFileStorage.storeContextValue(
Validate.notNull(subTaskExecution.getId(), "id"), Classname.TASK_EXECUTION, context));

taskDispatcher.dispatch(subTaskExecution);
}
} else {
taskExecution.setStartDate(Instant.now());
taskExecution.setEndDate(Instant.now());
taskExecution.setExecutionTime(0);
// TODO check, it seems wrong

if (selectedCase.get("value") != null) {
taskExecution.setOutput(
taskFileStorage.storeTaskExecutionOutput(
Validate.notNull(taskExecution.getId(), "id"), selectedCase.get("value")));
}

eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution));
taskDispatcher.dispatch(subTaskExecution);
}
} else {
taskExecution.setStartDate(Instant.now());
taskExecution.setEndDate(Instant.now());
taskExecution.setExecutionTime(0);
// TODO check, it seems wrong

if (selectedCase.get("value") != null) {
taskExecution.setOutput(
taskFileStorage.storeTaskExecutionOutput(
Validate.notNull(taskExecution.getId(), "id"), selectedCase.get("value")));
}
} catch (Exception e) {
taskExecution.setError(new ExecutionError(e.getMessage(), Arrays.asList(ExceptionUtils.getStackFrames(e))));

eventPublisher.publishEvent(new TaskExecutionErrorEvent(taskExecution));
eventPublisher.publishEvent(new TaskExecutionCompleteEvent(taskExecution));
}

}

@Override
Expand Down
Loading
Loading