Persisting union in database
Persisting state in database
At this point, we have implemented and tested the Order Service state machine.
The next thing that we need to address on our road to production is to persist state in a database.
MkUnion aims to support you in this task by providing you with the x/storage/schemaless
package that will take care of:
- mapping Go structs to database representation and back from the database to a struct.
- leveraging optimistic concurrency control to resolve conflicts
- providing you with a simple API to work with the database
- and more
Below is a test case that demonstrates a complete example of initializing a database,
building a state using NewMachine
, and saving and loading state from the database.
func Example_storeStateInDatabase() {
now := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
// example state
state := &OrderCompleted{
Order: Order{
ID: "123",
OrderAttr: OrderAttr{Price: 100, Quantity: 3},
DeliveredAt: &now,
},
}
// let's use in memory storage for storing State union
storage := schemaless.NewInMemoryRepository[State]()
// let's save it to storage
_, err := storage.UpdateRecords(schemaless.Save(schemaless.Record[State]{
ID: state.Order.ID,
Type: "orders",
Data: state,
}))
records, err := storage.FindingRecords(schemaless.FindingRecords[schemaless.Record[State]]{
RecordType: "orders",
})
fmt.Println(err)
fmt.Printf("%+#v\n", *records.Items[0].Data.(*OrderCompleted))
//Output: <nil>
//state.OrderCompleted{Order:state.Order{ID:"123", OrderAttr:state.OrderAttr{Price:100, Quantity:3}, WorkerID:"", StockRemovedAt:<nil>, PaymentChargedAt:<nil>, DeliveredAt:time.Date(2021, time.January, 1, 0, 0, 0, 0, time.UTC), CancelledAt:<nil>, CancelledReason:""}}
}
sequenceDiagram
participant R as Request
participant Store as Store
activate R
R->>R: Validate(request) -> error
R->>Store: Load state from database by request.ObjectId
activate Store
Store->>R: Ok(State)
deactivate Store
R->>R: Create machine with state
R->>R: Apply command to state
R->>Store: Save state in database under request.ObjectId
activate Store
Store->>R: Ok()
deactivate Store
deactivate R
Example implementation of such a sequence diagram:
func Handle(rq Request, response Response) { // Assuming Resopnse was a typo for Response
ctx := rq.Context()
// extract `objectId` and `command` from the request and perform some validation
id := rq.ObjectId
command := rq.Command
// Load the state from the store
state, err := store.Find(ctx, id)
if err != nil { /*handle error*/ }
machine := NewSimpleMachineWithState(Transition, state)
newState, err := machine.Apply(command, state) // cmd was used before, assuming command
if err != nil { /*handle error*/ }
err = store.Save(ctx, newState) // Assuming missing =
if err != nil { /*handle error*/ }
// serialize the response
response.Write(newState)
}
Error as state. Self-healing systems.
In a request-response situation, handling errors is easy, but what if something goes wrong in some long-lived process? How should errors be handled in such a situation? Without making what we've learned about state machines useless or hard to use?
One solution is to treat errors as state. In such a case, our state machines will never return an error, but instead will return a new state that will represent an error.
When we introduce an explicit command responsible for correcting RecoverableError
, we can create self-healing systems.
Thanks to that, even in situations where errors are unknown, we can retroactively introduce self-healing logic that corrects states.
Since there is always only one error state, it makes such state machines easy to reason about.
//go:generate mkunion -name State
type (
// ...
RecoverableError struct {
ErrCode int
PrevState State
RetryCount int
}
)
//go:generate mkunion -name Command
type (
// ...
CorrectStateCMD struct {}
)
Now, we have to implement the recoverable logic in our state machine.
The example above shows how to do it in the Transition
function.
Here is an example implementation of such a transition function:
func Transition(cmd Command, state State) (State, error) {
return MustMatchCommandR2(
cmd,
/* ... */
func(cmd *CorrectStateCMD) (State, error) {
switch state := state.(type) {
case *RecoverableError:
state.RetryCount = state.RetryCount + 1
// here we can do some self-healing logic
if state.ErrCode == DuplicateServiceUnavailable { // Assuming DuplicateServiceUnavailable is a defined error code
newState, err := Transition(&MarkAsDuplicateCMD{}, state.PrevState) // Assuming MarkAsDuplicateCMD is a defined command
if err != nil {
// we failed to correct the error, so we return an error state
return &RecoverableError{
ErrCode: 0, // Consider setting a more specific error code from 'err'
PrevState: state.PrevState,
RetryCount: state.RetryCount,
}, nil
}
// we managed to fix the state, so we return new state
return newState, nil
} else {
// log information that we have a new error code that we don't know how to handle
}
// try to correct the error in the next iteration
return state, nil
}
return state, nil // Added default return for the switch
},
) // Assuming MustMatchCommandR2 is a function that requires this closing parenthesis
}
Now, to correct states, we have to select all states that are in an error state from the database.
It can be used in many ways; the example below uses an abstraction called TaskQueue
that is responsible for running tasks in the background.
This abstraction guarantees that all records (historical and new ones) will be processed. You can think of it as a queue that is populated by records from the database that meet SQL query criteria.
You can use a CRON job and poll the database.
//go:generate mms deployyml -type=TaskQueue -name=CorrectMSPErrors -autoscale=1,10 -memory=128Mi -cpu=100m -timeout=10s -schedule="0 0 * * *"
func main()
sql := "SELECT * FROM ObjectState WHERE RecoverableError.RetryCount < 3" // Assuming ObjectState is the table
store := datalayer.DefaultStore() // Assuming datalayer.DefaultStore() is available
queue := TaskQueueFrom("correct-msp-errors", sql, store) // Assuming TaskQueueFrom is available
queue.OnTask(func (ctx context.Context, task Task) error { // Assuming Task type is defined
state := task.State()
cmd := &CorrectStateCMD{}
machine := NewSimpleMachineWithState(Transition, state)
newState, err := machine.Apply(cmd, state) // machine.Apply might need context
if err != nil {
return err
}
return task.Save(ctx, newState) // Assuming task.Save is available
})
err := queue.Run(ctx) // Assuming ctx is defined
if err != nil {
log.Panic(err)
}
}
State machines and command queues and workflows
What if a command would initiate a state "to process" and save it in the database? What if a task queue would take such a state and process it? Wouldn't this be something like a command queue?
Under what conditions should background processes be used to transition these states?
Processors per state
It's like micromanaging the TaskQueue, where each state has its own logic and knows what command to apply to a given state. This could be a good starting point when there isn't a lot of good tooling.
Processor for state machine
With good tooling, the transition of states can be declared in one place, and deployment to the task queue could be done automatically.
Note that only some of the transitions need to happen in the background; others can be done in a request-response manner.
Processor for state machine with workflow
A state machine could be generalized to a workflow. We can think about it as a set of generic Commands and States (like a Turing machine).
States like Pending
, Completed
, Failed
.
Commands like Process
, Retry
, Cancel
.
And a workflow DSL with commands like: Invoke
, Choose
, Assign
.
Where a function is some ID string, and functions need to be either
pulled from a registry or called remotely (InvokeRemote
).
Some operations would require a callback (InvokeAndAwait
).
Then a background processor would be responsible for executing such a workflow (using a task queue). The program would be responsible for defining the workflow and registering functions.
Such programs could also be optimized for deployment, if some function would be better to run on the same machine that makes an RPC call, like a function making an RPC call to a database and caching the result in memory or in a cache cluster dedicated to a specific BFF.