Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
318bb8e
just call deserialized during dequeue
maxdml Oct 29, 2025
0259dd6
decode during recovery (instead of getting it decoded from list work…
maxdml Oct 29, 2025
bf35e78
lift encoding outside of the system db
maxdml Oct 29, 2025
9852279
encode in enqueue, client
maxdml Oct 29, 2025
f945e3b
fix
maxdml Oct 29, 2025
ecad764
decode on the concurrent execution fallback path
maxdml Oct 29, 2025
16f6aea
system db expects pointers to string, correctly set in serialize()
maxdml Oct 29, 2025
f05d0c9
not now
maxdml Oct 30, 2025
a3bd6f7
fix + awaitWorkflowResult returns *string
maxdml Oct 30, 2025
7c8211a
fixes
maxdml Oct 30, 2025
e87d0e6
nit
maxdml Oct 30, 2025
ab20c6a
custom serializer
maxdml Oct 27, 2025
2ae4129
wip
maxdml Oct 28, 2025
687842e
comment
maxdml Oct 28, 2025
0646fcf
udpate test
maxdml Oct 28, 2025
9e5fb44
test recovery path. Add JSON recast in wrapped functions and in step …
maxdml Oct 28, 2025
0700203
qeueue decode + test
maxdml Oct 28, 2025
7b96377
wip interfaces
maxdml Oct 29, 2025
550cd76
wip
maxdml Oct 29, 2025
8c08963
remove Gob encoder
maxdml Oct 30, 2025
e5844c9
simplify tests and extend coverage
maxdml Oct 30, 2025
7950464
test concrete signatures, test more send/recv set/get messages
maxdml Oct 30, 2025
c48821f
update tests
maxdml Oct 30, 2025
8bd98fc
fix
maxdml Oct 30, 2025
ae0f005
comments
maxdml Oct 30, 2025
0d9df4e
comment
maxdml Oct 30, 2025
03cb9a4
cleanup
maxdml Oct 30, 2025
9e7be97
fix
maxdml Oct 30, 2025
ad5fe3c
comments
maxdml Oct 30, 2025
38c4ff7
Merge remote-tracking branch 'origin/lift-serialization-to-typed-laye…
maxdml Oct 30, 2025
a2b0872
cleanup
maxdml Oct 31, 2025
4d7f09b
remove unwanted files
maxdml Oct 31, 2025
a9e2b85
remove unwanted files
maxdml Oct 31, 2025
69869d2
fix merge weirdness
maxdml Oct 31, 2025
7cc6ffb
cleanup
maxdml Oct 31, 2025
bcd86f3
cleanup
maxdml Oct 31, 2025
dd8d725
fix
maxdml Nov 1, 2025
033e794
fix for mocking
maxdml Nov 1, 2025
8576172
simplify
maxdml Nov 1, 2025
344149c
simplify
maxdml Nov 1, 2025
39568ee
simplify
maxdml Nov 1, 2025
d749433
typo
maxdml Nov 1, 2025
ae8af6a
WIP: test user provided gob encoder
maxdml Nov 3, 2025
ff6c8c6
cleanup
maxdml Nov 3, 2025
888e700
cleanup
maxdml Nov 3, 2025
fa59aa8
remove json serializer, no custom serializer
maxdml Nov 3, 2025
fb8c685
fix corner case with pointers and ListWorkflowSteps/GetWorkflowSteps
maxdml Nov 3, 2025
00ac18b
add empty string test
maxdml Nov 3, 2025
189b201
nit
maxdml Nov 3, 2025
2aff44e
cleanup
maxdml Nov 3, 2025
dec1a76
cleanup
maxdml Nov 3, 2025
2679e55
private
maxdml Nov 3, 2025
cf12908
more private
maxdml Nov 3, 2025
e81c208
simplify
maxdml Nov 3, 2025
8d74304
cleanup
maxdml Nov 3, 2025
800c0ed
more complex struct
maxdml Nov 4, 2025
9676054
shouldn't require user gob registration
maxdml Nov 4, 2025
14f73be
no interface signature
maxdml Nov 4, 2025
31ebaee
cleanup
maxdml Nov 4, 2025
ccb51f6
cleanup
maxdml Nov 4, 2025
a05aead
make the serializer generic to tighten the code, add a test with manu…
maxdml Nov 4, 2025
439a92c
cleanup + add pointer fields to test struct
maxdml Nov 4, 2025
61f727d
consolidate tests and always exercise recovery path
maxdml Nov 4, 2025
1a6a950
cannot store child 'steps' as nil in the db to run as step
maxdml Nov 4, 2025
370d33e
test some more signatures
maxdml Nov 4, 2025
de6488a
fix
maxdml Nov 4, 2025
5443e4f
nits
maxdml Nov 4, 2025
219a647
internal stepInfo with *string step output
maxdml Nov 5, 2025
eb4371a
detect nested pointers during wf registration and running steps
maxdml Nov 5, 2025
85b37cf
fix flaky test that more than flaky
maxdml Nov 5, 2025
e163953
fix another race in test
maxdml Nov 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions dbos/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
if params.priority > uint(math.MaxInt) {
return nil, fmt.Errorf("priority %d exceeds maximum allowed value %d", params.priority, math.MaxInt)
}

status := WorkflowStatus{
Name: params.workflowName,
ApplicationVersion: params.applicationVersion,
Expand All @@ -155,7 +156,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
CreatedAt: time.Now(),
Deadline: deadline,
Timeout: params.workflowTimeout,
Input: params.workflowInput,
Input: input,
QueueName: queueName,
DeduplicationID: params.deduplicationID,
Priority: int(params.priority),
Expand Down Expand Up @@ -240,20 +241,15 @@ func Enqueue[P any, R any](c Client, queueName, workflowName string, input P, op
return nil, errors.New("client cannot be nil")
}

// Register the input and outputs for gob encoding
var logger *slog.Logger
if cl, ok := c.(*client); ok {
if ctx, ok := cl.dbosCtx.(*dbosContext); ok {
logger = ctx.logger
}
// Serialize input
serializer := newGobSerializer[P]()
encodedInput, err := serializer.Encode(input)
if err != nil {
return nil, fmt.Errorf("failed to serialize workflow input: %w", err)
}
var typedInput P
safeGobRegister(typedInput, logger)
var typedOutput R
safeGobRegister(typedOutput, logger)

// Call the interface method with the same signature
handle, err := c.Enqueue(queueName, workflowName, input, opts...)
handle, err := c.Enqueue(queueName, workflowName, &encodedInput, opts...)
if err != nil {
return nil, err
}
Expand Down
9 changes: 1 addition & 8 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (c *dbosContext) Value(key any) any {
return c.ctx.Value(key)
}


// WithValue returns a copy of the DBOS context with the given key-value pair.
// This is similar to context.WithValue but maintains DBOS context capabilities.
// No-op if the provided context is not a concrete dbos.dbosContext.
Expand Down Expand Up @@ -354,14 +355,6 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
initExecutor.logger = config.Logger
initExecutor.logger.Info("Initializing DBOS context", "app_name", config.AppName, "dbos_version", getDBOSVersion())

// Register types we serialize with gob
var t time.Time
safeGobRegister(t, initExecutor.logger)
var ws []WorkflowStatus
safeGobRegister(ws, initExecutor.logger)
var si []StepInfo
safeGobRegister(si, initExecutor.logger)

// Initialize global variables from processed config (already handles env vars and defaults)
initExecutor.applicationVersion = config.ApplicationVersion
initExecutor.executorID = config.ExecutorID
Expand Down
22 changes: 2 additions & 20 deletions dbos/queue.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package dbos

import (
"bytes"
"context"
"encoding/base64"
"encoding/gob"
"log/slog"
"math"
"math/rand"
Expand Down Expand Up @@ -227,23 +224,8 @@ func (qr *queueRunner) run(ctx *dbosContext) {
continue
}

// Deserialize input
var input any
if len(workflow.input) > 0 {
inputBytes, err := base64.StdEncoding.DecodeString(workflow.input)
if err != nil {
qr.logger.Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
continue
}
buf := bytes.NewBuffer(inputBytes)
dec := gob.NewDecoder(buf)
if err := dec.Decode(&input); err != nil {
qr.logger.Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
continue
}
}

_, err := registeredWorkflow.wrappedFunction(ctx, input, WithWorkflowID(workflow.id))
// Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type
_, err = registeredWorkflow.wrappedFunction(ctx, workflow.input, WithWorkflowID(workflow.id))
if err != nil {
qr.logger.Error("Error running queued workflow", "error", err)
}
Expand Down
12 changes: 1 addition & 11 deletions dbos/recovery.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package dbos

import (
"strings"
)

func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]WorkflowHandle[any], error) {
workflowHandles := make([]WorkflowHandle[any], 0)
// List pending workflows for the executors
Expand All @@ -18,13 +14,6 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
}

for _, workflow := range pendingWorkflows {
if inputStr, ok := workflow.Input.(string); ok {
if strings.Contains(inputStr, "Failed to decode") {
ctx.logger.Warn("Skipping workflow recovery due to input decoding failure", "workflow_id", workflow.ID, "name", workflow.Name)
continue
}
}
Comment on lines -21 to -26
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused


if workflow.QueueName != "" {
cleared, err := ctx.systemDB.clearQueueAssignment(ctx, workflow.ID)
if err != nil {
Expand Down Expand Up @@ -59,6 +48,7 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
WithWorkflowID(workflow.ID),
}
// Create a workflow context from the executor context
// Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type
handle, err := registeredWorkflow.wrappedFunction(ctx, workflow.Input, opts...)
if err != nil {
return nil, err
Expand Down
186 changes: 150 additions & 36 deletions dbos/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,58 +5,33 @@ import (
"encoding/base64"
"encoding/gob"
"fmt"
"log/slog"
"reflect"
"strings"
)

func serialize(data any) (string, error) {
var inputBytes []byte
if data != nil {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(&data); err != nil {
return "", fmt.Errorf("failed to encode data: %w", err)
}
inputBytes = buf.Bytes()
}
return base64.StdEncoding.EncodeToString(inputBytes), nil
type serializer[T any] interface {
Encode(data T) (string, error)
Decode(data *string) (T, error)
}

func deserialize(data *string) (any, error) {
if data == nil || *data == "" {
return nil, nil
}

dataBytes, err := base64.StdEncoding.DecodeString(*data)
if err != nil {
return nil, fmt.Errorf("failed to decode data: %w", err)
}

var result any
buf := bytes.NewBuffer(dataBytes)
dec := gob.NewDecoder(buf)
if err := dec.Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode data: %w", err)
}

return result, nil
// gobValue is a wrapper type for gob encoding/decoding of any value
// It prevents encoding nil values directly, and helps us differentiate nil values and empty strings
type gobValue struct {
Value any
}

// safeGobRegister attempts to register a type with gob, recovering only from
// panics caused by duplicate type/name registrations (e.g., registering both T and *T).
// These specific conflicts don't affect encoding/decoding correctness, so they're safe to ignore.
// Other panics (like register `any`) are real errors and will propagate.
func safeGobRegister(value any, logger *slog.Logger) {
// These specific conflicts don't affect encoding/decoding correctness, so they aren't errors.
// Other panics (like registering `any`) are real errors and will propagate.
func safeGobRegister(value any) {
defer func() {
if r := recover(); r != nil {
if errStr, ok := r.(string); ok {
// Check if this is one of the two specific duplicate registration errors we want to ignore
// See https://cs.opensource.google/go/go/+/refs/tags/go1.25.1:src/encoding/gob/type.go;l=832
if strings.Contains(errStr, "gob: registering duplicate types for") ||
strings.Contains(errStr, "gob: registering duplicate names for") {
if logger != nil {
logger.Debug("gob registration conflict", "type", fmt.Sprintf("%T", value), "error", r)
}
return
}
}
Expand All @@ -66,3 +41,142 @@ func safeGobRegister(value any, logger *slog.Logger) {
}()
gob.Register(value)
}

// init registers the gobValue wrapper type with gob for gobSerializer
func init() {
// Register wrapper type - this is required for gob encoding/decoding to work
safeGobRegister(gobValue{})
}

type gobSerializer[T any] struct{}

func newGobSerializer[T any]() serializer[T] {
return &gobSerializer[T]{}
}

func (g *gobSerializer[T]) Encode(data T) (string, error) {
if isNilValue(data) {
// For nil values, encode an empty byte slice directly to base64
return base64.StdEncoding.EncodeToString([]byte{}), nil
}

// Register the type before encoding
safeGobRegister(data)

var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
wrapper := gobValue{Value: data}
if err := encoder.Encode(wrapper); err != nil {
return "", fmt.Errorf("failed to encode data: %w", err)
}
return base64.StdEncoding.EncodeToString(buf.Bytes()), nil
}

func (g *gobSerializer[T]) Decode(data *string) (T, error) {
zero := *new(T)

if data == nil || *data == "" {
return zero, nil
}

dataBytes, err := base64.StdEncoding.DecodeString(*data)
if err != nil {
return zero, fmt.Errorf("failed to decode base64 data: %w", err)
}

// If decoded data is empty, it represents a nil value
if len(dataBytes) == 0 {
return zero, nil
}

// Resolve the type of T
tType := reflect.TypeOf(zero)
if tType == nil {
// zero is nil, T is likely a pointer type or interface
// Get the type from a pointer to T's zero value
tType = reflect.TypeOf(&zero).Elem()
}

// Register type T before decoding
// This is required on the recovery path, where the process might not have been doing the encode/registering.
// This will panic if T is an non-registered interface type (which is not supported)
if tType != nil && tType.Kind() != reflect.Interface {
safeGobRegister(zero)
}

var wrapper gobValue
decoder := gob.NewDecoder(bytes.NewReader(dataBytes))
if err := decoder.Decode(&wrapper); err != nil {
return zero, fmt.Errorf("failed to decode gob data: %w", err)
}

decoded := wrapper.Value

// Gob stores pointed values directly, so we need to reconstruct the pointer type
if tType != nil && tType.Kind() == reflect.Pointer {
elemType := tType.Elem()
decodedType := reflect.TypeOf(decoded)

// Check if decoded value matches the element type (not the pointer type)
if decodedType != nil && decodedType == elemType {
// Create a new pointer to the decoded value
elemValue := reflect.New(elemType)
elemValue.Elem().Set(reflect.ValueOf(decoded))
return elemValue.Interface().(T), nil
}
// If decoded is already a pointer of the correct type, try direct assertion
if decodedType != nil && decodedType == tType {
typedResult, ok := decoded.(T)
if ok {
return typedResult, nil
}
}
// If decoded is nil and T is a pointer type, return nil pointer
if decoded == nil {
return zero, nil
}
}

// Not a pointer -- direct type assertion
typedResult, ok := decoded.(T)
if !ok {
return zero, fmt.Errorf("cannot convert decoded value of type %T to %T", decoded, zero)
}
return typedResult, nil
}

// isNilValue checks if a value is nil (for pointer types, slice, map, etc.)
func isNilValue(v any) bool {
val := reflect.ValueOf(v)
if !val.IsValid() {
return true
}
switch val.Kind() {
case reflect.Pointer, reflect.Slice, reflect.Map, reflect.Chan, reflect.Func:
return val.IsNil()
}
return false
}

// IsNestedPointer checks if a type is a nested pointer (e.g., **int, ***int).
// It returns false for non-pointer types and single-level pointers (*int).
// It returns true for nested pointers with depth > 1.
func IsNestedPointer(t reflect.Type) bool {
if t == nil {
return false
}

depth := 0
currentType := t

// Count pointer indirection levels, break early if depth > 1
for currentType != nil && currentType.Kind() == reflect.Pointer {
depth++
if depth > 1 {
return true
}
currentType = currentType.Elem()
}

return false
}
Loading
Loading