Persisting union in database
Persisting state in database
TODO complete description!
At this point of time, we have implemented and tested Order Service state machine.
Next thing that we need to address in our road to the production is to persist state in database.
MkUnion aims to support you in this task, by providing you x/storage/schemaless
package that will take care of:
- mapping golang structs to database representation and back from database to struct.
- leveraging optimistic concurrency control to resolve conflicts
- providing you with simple API to work with database
- and more
Below is test case that demonstrate complete example of initializing database,
building an state using NewMachine
, and saving and loading state from database.
func ExampleStoreStateInDatabase() {
now := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
// example state
state := &OrderCompleted{
Order: Order{
ID: "123",
OrderAttr: OrderAttr{Price: 100, Quantity: 3},
DeliveredAt: &now,
},
}
// let's use in memory storage for storing State union
storage := schemaless.NewInMemoryRepository[State]()
// let's save it to storage
_, err := storage.UpdateRecords(schemaless.Save(schemaless.Record[State]{
ID: state.Order.ID,
Type: "orders",
Data: state,
}))
records, err := storage.FindingRecords(schemaless.FindingRecords[schemaless.Record[State]]{
RecordType: "orders",
})
fmt.Println(err)
fmt.Printf("%+#v\n", *records.Items[0].Data.(*OrderCompleted))
//Output: <nil>
//state.OrderCompleted{Order:state.Order{ID:"123", OrderAttr:state.OrderAttr{Price:100, Quantity:3}, WorkerID:"", StockRemovedAt:<nil>, PaymentChargedAt:<nil>, DeliveredAt:time.Date(2021, time.January, 1, 0, 0, 0, 0, time.UTC), CancelledAt:<nil>, CancelledReason:""}}
}
sequenceDiagram
participant R as Request
participant Store as Store
activate R
R->>R: Validate(request) -> error
R->>Store: Load state from database by request.ObjectId
activate Store
Store->>R: Ok(State)
deactivate Store
R->>R: Create machine with state
R->>R: Apply command on a state
R->>Store: Save state in database under request.ObjectId
activate Store
Store->>R: Ok()
deactivate Store
deactivate R
Example implementation of such sequence diagram:
func Handle(rq Request, response Resopnse) {
ctx := rq.Context()
// extract objectId and command from request + do some validation
id := rq.ObjectId
command := rq.Command
// Load state from store
state, err := store.Find(ctx, id)
if err != nil { /*handle error*/ }
machine := NewSimpleMachineWithState(Transition, state)
newState, err := machine.Apply(cmd, state)
if err != nil { /*handle error*/ }
err := store.Save(ctx, newState)
if err != nil { /*handle error*/ }
// serialize response
response.Write(newState)
}
Error as state. Self-healing systems.
In request-response situation, handing errors is easy, but what if in some long-lived process something goes wrong? How to handle errors in such situation? Without making what we learn about state machines useless or hard to use?
One solution is to treat errors as state. In such case, our state machines will never return error, but instead will return new state, that will represent error.
When we introduce explicit command responsible for correcting RecoverableError, we can create self-healing systems. Thanks to that, even in situation when errors are unknown, we can retroactivly introduce self-healing logic that correct states.
Because there is always there is only one error state, it makes such state machines easy to reason about.
//go:generate mkunion -name State
type (
// ...
RecoverableError struct {
ErrCode int
PrevState State
RetryCount int
}
)
//go:generate mkunion -name Command
type (
// ...
CorrectStateCMD struct {}
)
Now, we have to implement recoverable logic in our state machine.
We show example above how to do it in Transition
function.
Here is example implementation of such transition function:
func Transition(cmd Command, state State) (State, error) {
return MustMatchCommandR2(
cmd,
/* ... */
func(cmd *CorrectStateCMD) (State, error) {
switch state := state.(type) {
case *RecoverableError:
state.RetryCount = state.RetryCount + 1
// here we can do some self-healing logic
if state.ErrCode == DuplicateServiceUnavailable {
newState, err := Transition(&MarkAsDuplicateCMD{}, state.PrevState)
if err != nil {
// we failed to correct error, so we return error state
return &RecoverableError{
ErrCode: err,
PrevState: state.PrevState,
RetryCount: state.RetryCount,
}, nil
}
// we manage to fix state, so we return new state
return newState, nil
} else {
// log information that we have new code, that we don't know how to handle
}
// try to correct error in next iteration
return state, nil
}
}
}
Now, to correct states we have to select from database all states that are in error state.
It can be use in many ways, example below use a abstraction called TaskQueue
that is responsible for running tasks in background.
This abstraction guaranties that all records (historical and new ones) will be processed. You can think about it, as a queue that is populated by records from database, that meet SQL query criteria.
You can use CRON job and pull database.
//go:generate mms deployyml -type=TaskQueue -name=CorrectMSPErrors -autoscale=1,10 -memory=128Mi -cpu=100m -timeout=10s -schedule="0 0 * * *"
func main()
sql := "SELECT * FROM ObjectState WHERE RecoverableError.RetryCount < 3"
store := datalayer.DefaultStore()
queue := TaskQueueFrom("correct-msp-errors", sql, store)
queue.OnTask(func (ctx context.Context, task Task) error {
state := task.State()
cmd := &CorrectStateCMD{}
machine := NewSimpleMachineWithState(Transition, state)
newState, err := machine.Apply(cmd, state)
if err != nil {
return err
}
return task.Save(ctx, newState)
})
err := queue.Run(ctx)
if err != nil {
log.Panic(err)
}
}
State machines and command queues and workflows
What if command would initiate state "to process" and save it in db What if task queue would take such state and process it Woudn't this be something like command queue?
When to make a list of background processes that transition such states?
processors per state
It's like micromanage TaskQueue, where each state has it's own state, and it knows what command to apply to given state This could be good starting point, when there is not a lot of good tooling
processor for state machine
With good tooling, transition of states can be declared in one place, and deployment to task queue could be done automatically.
Note, that only some of the transitions needs to happen in background, other can be done in request-response manner.
processor for state machine with workflow
State machine could be generalized to workflow. We can think about it as set of generic Command and State (like a turing machine).
States like Pending, Completed, Failed Commands like Process, Retry, Cancel
And workflow DSL with commands like: Invoke, Choose, Assign Where function is some ID string, and functions needs to be either pulled from registry, or called remotely (InvokeRemote). some operations would require callback (InvokeAndAwait)
Then background processor would be responsible for executing such workflow (using task queue) Program would be responsible for defining workflow, and registering functions.
Such programs could be also optimised for deployment, if some function would be better to run on same machine that do RPC call like function doing RPC call to database, and caching result in memory or in cache cluster dedicated to specific BFF