State Machine Best Practices
This guide covers best practices, patterns, and techniques for building robust state machines with mkunion. Whether you're building simple state machines or complex distributed systems, these practices will help you create maintainable and scalable solutions.
Best Practices
When building state machines with mkunion, following these practices will help you create maintainable and robust systems:
File Organization
Organize your state machine code across files for better maintainability:
-
model.go
: State and command definitions -
machine.go
: Core state machine logicpackage order //go:generate mkunion watch -g . //go:generate moq -skip-ensure -out machine_mock.go . Dependency // Dependency interface - moq will generate DependencyMock from this type Dependency interface { TimeNow() *time.Time StockService() StockService PaymentService() PaymentService } // Common errors var ( ErrInvalidTransition = errors.New("invalid state transition") ErrOrderNotFound = errors.New("order not found") ) // Machine constructor func NewMachine(deps Dependency, state State) *machine.Machine[Dependency, Command, State] { if state == nil { state = &OrderPending{} // Default initial state } return machine.NewMachine(deps, Transition, state) } // Transition function func Transition(ctx context.Context, deps Dependency, cmd Command, state State) (State, error) { // Implementation }
-
machine_test.go
: Tests and state diagrams machine_database_test.go
: Persistence examples- Generated files (created by
go generate
): *_union_gen.go
- Union type definitions from mkunion*_shape_gen.go
- Shape definitions for introspectionmachine_mock.go
- Mock implementation of Dependency interface from moq
Naming Conventions
- States: Use descriptive nouns that clearly indicate the state (e.g.,
OrderPending
,PaymentProcessing
) - Commands: Suffix with
CMD
for clarity (e.g.,CreateOrderCMD
,CancelOrderCMD
) - Packages: Keep state machines in dedicated packages named after the domain (e.g.,
order
,payment
)
State Design
- Keep States Focused: Each state should represent one clear condition
- Immutable Data: States should contain immutable data; create new states instead of modifying
- Minimal State Data: Only store data that's essential for the state's identity
- Use Zero Values: Design states so Go's zero values are meaningful defaults
Command Validation
Centralizing validation in the Transition function provides significant benefits:
- Single source of truth: All business rules and validation logic live in one place
- Atomic validation: Commands are validated together with state checks, preventing invalid transitions
- Testability: Easy to test all validation rules through the state machine tests
- Maintainability: When rules change, you only update one location
Basic Validation
func Transition(ctx context.Context, deps Dependencies, cmd Command, state State) (State, error) {
return MatchCommandR2(cmd,
func(c *CreateOrderCMD) (State, error) {
// Validate command first
if c.CustomerID == "" {
return nil, fmt.Errorf("customer ID is required")
}
if len(c.Items) == 0 {
return nil, fmt.Errorf("order must contain at least one item")
}
// Then check state
// ...
},
)
}
Advanced Validation with go-validate
For complex validation requirements, combine the Transition function with validation libraries:
import "github.com/go-playground/validator/v10"
//go:tag mkunion:"Command"
type (
CreateOrderCMD struct {
CustomerID string `validate:"required,uuid"`
Items []OrderItem `validate:"required,min=1,dive"`
Email string `validate:"required,email"`
Phone string `validate:"omitempty,e164"`
}
OrderItem struct {
SKU string `validate:"required,alphanum"`
Quantity int `validate:"required,min=1,max=100"`
Price float64 `validate:"required,min=0.01"`
}
)
type Dependencies interface {
Validator() *validator.Validate
CustomerService() CustomerService
}
func Transition(ctx context.Context, deps Dependencies, cmd Command, state State) (State, error) {
return MatchCommandR2(cmd,
func(c *CreateOrderCMD) (State, error) {
// 1. Structural validation with go-validate
if err := deps.Validator().Struct(c); err != nil {
return nil, fmt.Errorf("validation failed: %w", err)
}
// 2. Business rule validation
totalAmount := 0.0
for _, item := range c.Items {
totalAmount += item.Price * float64(item.Quantity)
}
if totalAmount > 10000 {
return nil, fmt.Errorf("order total %.2f exceeds maximum allowed", totalAmount)
}
// 3. External validation (e.g., customer exists)
customer, err := deps.CustomerService().Get(ctx, c.CustomerID)
if err != nil {
return nil, fmt.Errorf("customer validation failed: %w", err)
}
if !customer.Active {
return nil, fmt.Errorf("customer %s is not active", c.CustomerID)
}
// 4. State-based validation
switch state.(type) {
case nil, *OrderPending:
// Valid initial states
default:
return nil, fmt.Errorf("cannot create order in current state %T", state)
}
// All validations passed, create new state
return &OrderPending{
OrderID: generateOrderID(),
CustomerID: c.CustomerID,
Items: c.Items,
CreatedAt: deps.TimeNow(),
}, nil
},
)
}
This approach scales well because:
- Structural validation is declarative (struct tags)
- Business rules are explicit and testable
- External validations are isolated in dependencies
- State validations ensure valid transitions
- All validation happens before any state change
Dependency Management
- Define Clear Interfaces: Dependencies should be interfaces, not concrete types
- Keep Dependencies Minimal: Only inject what's absolutely necessary
- Generate Mocks with moq: Use
//go:generate moq
to automatically generate mocks
Running go generate ./...
creates machine_mock.go
with a DependencyMock
type (generated from the Dependency
interface shown in the File Organization section above). This mock can then be used in tests:
func TestStateMachine(t *testing.T) {
// DependencyMock is generated from the Dependency interface
dep := &DependencyMock{
TimeNowFunc: func() *time.Time {
now := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
return &now
},
StockServiceFunc: func() StockService {
return &StockServiceMock{
ReserveFunc: func(items []Item) error {
return nil
},
}
},
}
machine := NewMachine(dep, nil)
// Test the machine...
}
Benefits of using moq:
- Reduces boilerplate: No need to manually write mock implementations
- Type safety: Generated mocks always match the interface
- Easy maintenance: Mocks automatically update when interface changes
- Better test readability: Focus on behavior, not mock implementation
Common Pitfalls
Avoid these common mistakes when implementing state machines:
1. State Explosion
Problem: Creating too many states for every minor variation
// Bad: Too granular
type (
OrderPendingWithOneItem struct{}
OrderPendingWithTwoItems struct{}
OrderPendingWithThreeItems struct{}
// ... and so on
)
Solution: Use state data instead
2. Circular Dependencies
Problem: States that can transition in circles without progress
Solution: Ensure each transition represents meaningful progress or explicitly document allowed cycles
3. Missing Error States
Problem: Not modeling error conditions as explicit states
Solution: Model error conditions as states when they need handling. Crucially, store both the command that failed and the previous valid state to enable recovery or debugging:
// Best practice: Error state with command and previous state
type OrderError struct {
// Error metadata
Reason string
FailedAt time.Time
RetryCount int
// Critical for debugging and recovery
FailedCommand Command // The exact command that failed
PreviousState State // State before the failed transition
}
// This pattern enables:
// 1. Perfect reproduction of the failure
// 2. Automatic retry with the same command
// 3. Debugging with full context
// 4. Recovery to previous valid state
// Or use a shared BaseState pattern like in workflow
type BaseState struct {
ID string
Metadata map[string]string
UpdatedAt time.Time
}
type (
PaymentPending struct {
BaseState
Amount float64
}
PaymentFailed struct {
BaseState
Reason string
PreviousAmount float64 // Store critical data from previous state
}
)
// Recovery becomes straightforward
func Transition(ctx context.Context, deps Dependencies, cmd Command, state State) (State, error) {
return MatchCommandR2(cmd,
func(c *RetryFailedCMD) (State, error) {
switch s := state.(type) {
case *OrderError:
// Retry the exact command that failed
return Transition(ctx, deps, s.FailedCommand, s.PreviousState)
case *PaymentFailed:
// Can access previous state data for retry
if s.PreviousState != nil {
// Retry with original state
return processPayment(s.PreviousState)
}
// Or use BaseState data if using that pattern
return &PaymentPending{
BaseState: s.BaseState,
Amount: s.PreviousAmount,
}, nil
}
return nil, fmt.Errorf("can only retry from error states")
},
)
}
This approach preserves critical information needed for recovery without losing the context of what failed.
Real Implementation Example
The order state machine example demonstrates this pattern perfectly. See how OrderError
in example/state/model.go
stores both ProblemCommand
and ProblemState
. The retry logic in machine.go
shows how to use this information to retry the exact failed operation.
4. Ignoring Concurrency
Problem: Misunderstanding the state machine concurrency model
// Wrong: Sharing a machine instance across goroutines
sharedMachine := NewMachine(deps, currentState)
go sharedMachine.Handle(ctx, cmd1) // Goroutine 1
go sharedMachine.Handle(ctx, cmd2) // Goroutine 2 - DON'T DO THIS!
Solution: State machines are designed to be created per request. This isolation prevents accidental state mutations:
// Correct: Create a new machine instance for each operation
func ProcessCommand(ctx context.Context, deps Dependencies, cmd Command) error {
// 1. Load current state from storage
record, err := repo.Get(ctx, orderID)
if err != nil {
return err
}
// 2. Create a fresh machine instance with the current state
machine := NewMachine(deps, record.State)
// 3. Handle the command
if err := machine.Handle(ctx, cmd); err != nil {
return err
}
// 4. Save the new state (with optimistic concurrency control)
record.Data = machine.State()
return repo.Update(ctx, record)
}
This pattern ensures:
- Each request gets an isolated machine instance
- No shared mutable state between concurrent operations
- Failures don't affect other requests
- Retries start with a clean machine instance
For handling concurrent updates to the same entity, see the Optimistic Concurrency Control section below.
5. Overloading Transitions
Problem: Putting too much business logic in transition functions
// Bad: Transition function doing too much
func Transition(...) (State, error) {
// Send emails
// Update inventory
// Calculate prices
// Log to external systems
// ... 200 lines later
}
Solution: Keep transitions focused on state changes; delegate side effects to dependencies
Advanced Patterns
State Machine Composition
For complex systems, compose multiple state machines as a service layer:
// Each domain has its own package with machine constructor
// order/machine.go
func NewMachine(deps OrderDeps, state OrderState) *machine.Machine[OrderDeps, OrderCommand, OrderState] {
return machine.NewMachine(deps, Transition, state)
}
// order/service.go - Domain service encapsulates repository and machine logic
type OrderService struct {
repo schemaless.Repository[OrderState]
deps OrderDeps
}
func NewOrderService(repo schemaless.Repository[OrderState], deps OrderDeps) *OrderService {
return &OrderService{repo: repo, deps: deps}
}
func (s *OrderService) HandleCommand(ctx context.Context, cmd OrderCommand) (OrderState, error) {
// Extract order ID from command for state loading
orderID := extractOrderID(cmd)
// Load current state
record, err := s.repo.Get(ctx, orderID)
if err != nil && !errors.Is(err, schemaless.ErrNotFound) {
return nil, err
}
var currentState OrderState
if record != nil {
currentState = record.Data
}
// Create fresh machine instance
machine := NewMachine(s.deps, currentState)
// Handle command
if err := machine.Handle(ctx, cmd); err != nil {
return nil, err
}
// Save new state with optimistic concurrency control
newState := machine.State()
record.Data = newState
_, err = s.repo.UpdateRecords(schemaless.Save(*record))
return newState, err
}
// payment/service.go - Similar pattern for payment domain
type PaymentService struct {
repo schemaless.Repository[PaymentState]
deps PaymentDeps
}
func (s *PaymentService) HandleCommand(ctx context.Context, cmd PaymentCommand) (PaymentState, error) {
// Same pattern as OrderService...
}
// Composed e-commerce service using domain services
type ECommerceService struct {
orderService *OrderService
paymentService *PaymentService
}
func NewECommerceService(orderSvc *OrderService, paymentSvc *PaymentService) *ECommerceService {
return &ECommerceService{
orderService: orderSvc,
paymentService: paymentSvc,
}
}
func (s *ECommerceService) ProcessOrder(ctx context.Context, orderCmd OrderCommand) error {
// 1. Handle order command through order service
newOrderState, err := s.orderService.HandleCommand(ctx, orderCmd)
if err != nil {
return err
}
// 2. If order is confirmed, trigger payment through payment service
if confirmed, ok := newOrderState.(*OrderConfirmed); ok {
_, err := s.paymentService.HandleCommand(ctx, &InitiatePaymentCMD{
OrderID: confirmed.OrderID,
Amount: confirmed.TotalAmount,
})
return err
}
return nil
}
Key principles:
- Domain services: Each domain encapsulates its repository, dependencies, and machine logic
- Schemaless repositories: Use
schemaless.Repository[StateType]
for type-safe state storage - Service composition: Compose domain services, avoiding direct repository/machine access
- Single responsibility: Each service handles one domain's state machine lifecycle
- Optimistic concurrency: Built-in through
schemaless.Repository
version handling - No duplication: State loading, machine creation, and saving logic exists once per domain
Async Operations with Callbacks
Handle long-running operations without blocking using a state-first approach:
//go:tag mkunion:"AsyncState"
type (
OperationPending struct {
ID string
CallbackID string // For async completion
StartedAt time.Time
TimeoutAt time.Time
}
OperationComplete struct {
ID string
Result interface{}
Duration time.Duration
}
OperationError struct {
ID string
Reason string
Code string // "TIMEOUT", "WORKER_FAILED", etc.
}
)
//go:tag mkunion:"AsyncCommand"
type (
StartAsyncCMD struct {
ID string
}
CallbackCMD struct {
CallbackID string
Result interface{}
Error string
}
)
// Pure transition function - NO side effects
func Transition(ctx context.Context, deps AsyncDeps, cmd AsyncCommand, state AsyncState) (AsyncState, error) {
return MatchAsyncCommandR2(cmd,
func(c *StartAsyncCMD) (AsyncState, error) {
// ONLY return new state - no async operations here
return &OperationPending{
ID: c.ID,
CallbackID: deps.GenerateCallbackID(),
StartedAt: time.Now(),
TimeoutAt: time.Now().Add(5 * time.Minute),
}, nil
},
func(c *CallbackCMD) (AsyncState, error) {
switch s := state.(type) {
case *OperationPending:
if c.Error != "" {
return &OperationError{
ID: s.ID,
Reason: c.Error,
Code: "WORKER_FAILED",
}, nil
}
return &OperationComplete{
ID: s.ID,
Result: c.Result,
Duration: time.Since(s.StartedAt),
}, nil
}
return nil, fmt.Errorf("invalid state for callback: %T", state)
},
)
}
// Service layer handles async operations AFTER state persistence
func (s *AsyncService) HandleCommand(ctx context.Context, cmd AsyncCommand) (AsyncState, error) {
// 1. Load current state
record, err := s.repo.Get(ctx, extractID(cmd))
// ... error handling
// 2. Apply command to state machine
machine := NewMachine(s.deps, record.Data)
if err := machine.Handle(ctx, cmd); err != nil {
return nil, err
}
// 3. PERSIST STATE FIRST
newState := machine.State()
record.Data = newState
_, err = s.repo.UpdateRecords(schemaless.Save(*record))
if err != nil {
return nil, err
}
// 4. ONLY AFTER successful persistence, trigger async work
if pending, ok := newState.(*OperationPending); ok {
// Enqueue for background processing
s.asyncQueue.Enqueue(AsyncWorkItem{
OperationID: pending.ID,
CallbackID: pending.CallbackID,
TimeoutAt: pending.TimeoutAt,
})
}
return newState, nil
}
// Background processor handles actual async work
func (processor *AsyncProcessor) ProcessWork(ctx context.Context, item AsyncWorkItem) {
// Perform the actual async work
result, err := processor.worker.DoWork(ctx, item.OperationID)
// Create callback command
callbackCMD := &CallbackCMD{
CallbackID: item.CallbackID,
Result: result,
}
if err != nil {
callbackCMD.Error = err.Error()
}
// Send callback through proper channel (HTTP endpoint, queue, etc.)
processor.callbackHandler.HandleCallback(ctx, callbackCMD)
}
Key principles: - Pure transitions: No side effects in transition functions - State-first persistence: Save state before triggering async work - Background processing: Separate system handles async operations - Callback mechanism: Async completion creates new commands - Timeout handling: Built into state for automatic cleanup - No race conditions: State is always consistent with async operation status
Time-Based Transitions
Handle timeouts properly based on the operation context:
Request-scoped operations: machine.Handle(ctx, cmd)
and the Transition
function respect context cancellation for standard Go timeout handling.
Long-running process timeouts: Model timeouts as explicit states for operations that exceed request boundaries:
//go:tag mkunion:"State"
type (
// States that can timeout should track when timeout expires
AwaitingApproval struct {
OrderID string
ExpectedTimeoutAt time.Time // When this will timeout
BaseState
}
// Use Error state with standardized timeout code
ProcessError struct {
Code string // "TIMEOUT", "API_ERROR", etc.
Reason string
FailedCommand Command
PreviousState State
RetryCount int
BaseState
}
)
// Background process finds and transitions timed-out states
func ProcessTimeouts(ctx context.Context, repo Repository) error {
// Find all states that should timeout
records, err := repo.FindRecords(
predicate.Where(`
Data["order.AwaitingApproval"].ExpectedTimeoutAt < :now
`, predicate.ParamBinds{
":now": schema.MkInt(time.Now().Unix()),
}),
)
for _, record := range records.Items {
machine := NewMachine(deps, record.Data)
err := machine.Handle(ctx, &ExpireTimeoutCMD{
RunID: record.ID,
})
// Save updated state...
}
}
3. Benefits of State-Based Timeouts
This approach enables powerful querying and recovery:
// Find all timeout errors for retry
timeoutErrors, _ := repo.FindRecords(
predicate.Where(`Data["order.ProcessError"].Code = :code`,
predicate.ParamBinds{":code": schema.MkString("TIMEOUT")},
),
)
// Find long-waiting approvals
longWaiting, _ := repo.FindRecords(
predicate.Where(`
Type = :type
AND Data["order.AwaitingApproval"].ExpectedTimeoutAt > :soon
`, predicate.ParamBinds{
":type": schema.MkString("process"),
":soon": schema.MkInt(time.Now().Add(1*time.Hour).Unix()),
}),
)
Real Implementation
See the workflow engine implementation where:
- Await
state tracks ExpectedTimeoutTimestamp
- Error
state uses standardized error codes including ProblemCallbackTimeout
- Background timeout processor and task queue setup demonstrate how to process timeouts asynchronously
Debugging and Observability
State History Tracking
The mkunion state machine pattern leverages Change Data Capture (CDC) for automatic state history tracking. Since every state transition is persisted with versioning through optimistic concurrency control, you get a complete audit trail without modifying your state machine logic.
The schemaless.Repository
creates an append log of all state changes with version numbers, providing ordering guarantees and enabling powerful history tracking capabilities. CDC processors consume this stream asynchronously to build history aggregates, analytics, and debugging tools - all without impacting state machine performance. The system automatically handles failures through persistent, replayable streams that survive crashes and allow processors to resume from their last position.
This approach integrates seamlessly with other mkunion patterns like retry processors and timeout handlers, creating a unified system where every state change is tracked, queryable, and analyzable.
Real Implementation
The example app demonstrates CDC integration with taskRetry.RunCDC(ctx)
and store.AppendLog()
. Detailed examples of building history processors, analytics pipelines, and debugging tools will be added in future updates.
Metrics and Monitoring
Currently, metrics collection is the responsibility of the user. If you need Prometheus metrics or other monitoring, include them in your dependency interface and use them within your Transition
function:
type Dependencies interface {
// Your business dependencies
StockService() StockService
// Metrics dependencies - user's responsibility to provide
Metrics() *prometheus.Registry
TransitionCounter() prometheus.Counter
}
func Transition(ctx context.Context, deps Dependencies, cmd Command, state State) (State, error) {
// Manual metrics collection
startTime := time.Now()
defer func() {
deps.TransitionCounter().Inc()
// Record duration, state types, etc.
}()
// Your transition logic here
}
There's no automatic metrics injection - you must explicitly add metrics to your dependencies and instrument your transitions manually.
Future Enhancement
Automatic metrics collection would be a valuable addition to machine.Machine
. This could include built-in counters for transitions, error rates, and timing histograms without requiring manual instrumentation.
Evolution and Versioning
Backward Compatible Changes
When evolving state machines, maintain compatibility:
// Version 1
//go:tag mkunion:"OrderState"
type (
OrderCreated struct {
ID string
Items []Item
}
)
// Version 2 - Added field with default
//go:tag mkunion:"OrderState"
type (
OrderCreated struct {
ID string
Items []Item
Discount float64 `json:"discount,omitempty"` // New field
}
)
State Migration Strategies
Handle state structure changes:
// Migration function
func MigrateOrderState(old []byte) (State, error) {
// Try to unmarshal as current version
current, err := shared.JSONUnmarshal[OrderState](old)
if err == nil {
return current, nil
}
// Try older version
v1, err := shared.JSONUnmarshal[OrderStateV1](old)
if err == nil {
// Convert v1 to current
return convertV1ToCurrent(v1), nil
}
return nil, fmt.Errorf("unknown state version")
}
Deprecating States and Commands
Gracefully phase out old states:
//go:tag mkunion:"OrderState"
type (
// Deprecated: Use OrderPending instead
OrderCreated struct {
// ... fields
}
OrderPending struct {
// New state structure
}
)
func Transition(ctx context.Context, deps Dependencies, cmd Command, state State) (State, error) {
// Handle deprecated state
if old, ok := state.(*OrderCreated); ok {
// Automatically migrate to new state
state = &OrderPending{
// Map old fields to new
}
}
// Continue with normal processing
// ...
}
Performance Considerations
Memory Optimization
-
Reuse State Instances: For states without data, use singletons
-
Lazy Loading: Don't load unnecessary data in states
State Storage
mkunion uses state storage pattern where the current state of the state machine is persisted directly to the database. This approach:
- Stores the complete state after each transition
- Provides immediate access to current state without replay
- Supports optimistic concurrency control through versioning
- Works seamlessly with the
x/storage/schemaless
package
Example using typed repository:
// ExampleStateStorage demonstrates basic state storage using typed repository
func ExampleStateStorage() error {
// Initialize typed repository for state storage
store := schemaless.NewInMemoryRepository[schema.Schema]()
repo := typedful.NewTypedRepository[State](store)
// Save state to database
_, err := repo.UpdateRecords(schemaless.Save(schemaless.Record[State]{
ID: "order-123",
Type: "orders",
Data: &OrderPending{
OrderID: "order-123",
Items: []OrderItem{
{SKU: "WIDGET-1", Quantity: 2, Price: 29.99},
},
},
}))
return err
}
Concurrent Processing
When multiple processes might update the same state, use optimistic concurrency control provided by the schemaless repository:
// Load current state with version
record, err := repo.Get(orderID, "orders")
if err != nil {
if errors.Is(err, schemaless.ErrNotFound) {
// First command - create new record
record = schemaless.Record[State]{
ID: orderID,
Type: "orders",
Data: &OrderPending{}, // Initial state
Version: 0,
}
} else {
return fmt.Errorf("failed to load state: %w", err)
}
}
// Create machine with current state
machine := NewMachine(deps, record.Data)
// Apply command
err = machine.Handle(ctx, cmd)
if err != nil {
return fmt.Errorf("command failed: %w", err)
}
// Attempt to save with version check
record.Data = machine.State()
_, err = repo.UpdateRecords(schemaless.Save(record))
if err == nil {
// Success!
return nil
}
if errors.Is(err, schemaless.ErrVersionConflict) {
// Another process updated the state
// Log and retry
log.Printf("Version conflict on attempt %d, retrying...", attempt+1)
// Exponential backoff
backoff := time.Duration(attempt*attempt) * 100 * time.Millisecond
time.Sleep(backoff)
continue
}
Key strategies:
1. Optimistic Concurrency: Use version checking to detect conflicts
2. Retry Logic: Implement exponential backoff for version conflicts
3. Partition by ID: Process different entities in parallel safely
See the Optimistic Concurrency Control section for detailed examples.
Optimistic Concurrency Control
The x/storage/schemaless
package provides built-in optimistic concurrency control using version fields. This ensures data consistency when multiple processes work with the same state.
How It Works
- Each record has a
Version
field that increments on updates - Updates specify the expected version in the record
- If versions don't match,
ErrVersionConflict
is returned - Applications retry with the latest version
Complete Example
// ProcessOrderCommandWithConcurrency demonstrates handling concurrent state updates
// with optimistic concurrency control and retry logic
func ProcessOrderCommandWithConcurrency(ctx context.Context, store schemaless.Repository[schema.Schema], orderID string, cmd Command) error {
repo := typedful.NewTypedRepository[State](store)
deps := Dependencies{}
// Retry loop for handling version conflicts
maxRetries := 3
for attempt := 0; attempt < maxRetries; attempt++ {
// Load current state with version
record, err := repo.Get(orderID, "orders")
if err != nil {
if errors.Is(err, schemaless.ErrNotFound) {
// First command - create new record
record = schemaless.Record[State]{
ID: orderID,
Type: "orders",
Data: &OrderPending{}, // Initial state
Version: 0,
}
} else {
return fmt.Errorf("failed to load state: %w", err)
}
}
// Create machine with current state
machine := NewMachine(deps, record.Data)
// Apply command
err = machine.Handle(ctx, cmd)
if err != nil {
return fmt.Errorf("command failed: %w", err)
}
// Attempt to save with version check
record.Data = machine.State()
_, err = repo.UpdateRecords(schemaless.Save(record))
if err == nil {
// Success!
return nil
}
if errors.Is(err, schemaless.ErrVersionConflict) {
// Another process updated the state
// Log and retry
log.Printf("Version conflict on attempt %d, retrying...", attempt+1)
// Exponential backoff
backoff := time.Duration(attempt*attempt) * 100 * time.Millisecond
time.Sleep(backoff)
continue
}
// Other error - don't retry
return fmt.Errorf("failed to save state: %w", err)
}
return fmt.Errorf("max retries exceeded due to version conflicts")
}
Batch Operations with Concurrency Control
When updating multiple records:
// ProcessBulkOrdersWithConcurrency demonstrates batch operations with concurrency control
func ProcessBulkOrdersWithConcurrency(ctx context.Context, store schemaless.Repository[schema.Schema], updates map[string]Command) error {
repo := typedful.NewTypedRepository[State](store)
deps := Dependencies{}
// Load all records first
records := make(map[string]schemaless.Record[State])
for orderID := range updates {
record, err := repo.Get(orderID, "orders")
if err != nil {
return fmt.Errorf("failed to load %s: %w", orderID, err)
}
records[orderID] = record
}
// Process all commands
var recordsToSave []schemaless.Record[State]
for orderID, cmd := range updates {
record := records[orderID]
machine := NewMachine(deps, record.Data)
if err := machine.Handle(ctx, cmd); err != nil {
return fmt.Errorf("command failed for %s: %w", orderID, err)
}
record.Data = machine.State()
recordsToSave = append(recordsToSave, record)
}
// Save all at once with version checking
_, err := repo.UpdateRecords(schemaless.Save(recordsToSave...))
if errors.Is(err, schemaless.ErrVersionConflict) {
// Handle partial failures
// In a real implementation, you'd check which records failed and retry those
return fmt.Errorf("version conflict in batch update: %w", err)
}
return err
}