Functional State machines

Recently I have been doing a lot of time-series analysis (cryptocurrency trade histories). After the first week of trying 20 things before breakfast, I settled upon using discrete, deterministic finite state machines to call features, such as peaks, troughs, slides, spikes and so on.

The ddfsm API that I’ve evolved towards has surprised me a bit so I thought it would be worth sharing. Before you ask, at this time the code is all tied up in a commercial code-base and I’ve not had time to extract it into a library.

State machines consume a stream of input tokens and emit a stream of emission tokens. The intuition is that a state machine is related to the type:

\(M : T^* \rightarrow E^*\)

We will leave aside the religious war between More and Meale by treating transitions as dumb animals and treating states as things which when challenged cause a transition to be fired and an emission to be emitted, making them related to the type:

\(S : T \rightarrow E \times (S \rightarrow S)\)

So where has this typy intuition got us? Well, let’s write some concrete code (in scala, of course!) and see  how it plays out.

trait StateMachine[T, E] {
def initialState: State[T, E]
}

trait State[T, E] {
def transition(t: T): StateUpdate[T, E]
}

case class StateUpdate[T, E](nextState: State[T, E], emission: E, log: List[String])

def parse[T, E](tokens: Stream[T], m: StateMachine[T, E]): Stream[E] = ???

The parse function is implemented the obvious way, by taking the initial state and then using the states to build up a stream of emissions as the stream of tokens is consumed.
Let’s implement a concrete state. This one is going to spot when the current value is lower than the previous one.

trait DropSpotter[E] extends State[Double, E] {
ds =>
def previous: Double
def onDrop: Double => StateUpdate[Double, E]
def noDrop: State[Double, Boolean] => StateUpdate[Double, E]

def transition(d: Double): StateUpdate[Double, Boolean] = if(d < previous) {
onDrop(t)
} else {
val nextState = new DropSpotter () {
val previous = t
val onDrop = ds.onDrop
val noDrop = ds.noDrop
}
noDrop(nextState)
}
}

There are several important features of this trait. Firstly, the transition method is encapsulating the decision logic, the bit of code where we spot the drop. It essentially doesn’t do anything else. Next, notice that we have a structured way to encode the transitions. The onDrop method encapsulates the behaviour of the transition when there has been a drop, and the noDrop method encapsulates the behaviour of the transition where there has not been a drop. The logic of this state is that it will transition back to itself while there has been no drop and will transition to something else when there has been.

Whatever is gluing this transition into a state machine will fill in these values to supply the actual transition behaviour. The onDrop method maps Double to a state update. This allows this transition to initialize the next state with the ‘zero’ token for that state. The noDrop method maps a state to the update. We will let the state machine choose what emission we have but we’re going to control the next state. In this case the next state will track the new previous value but have the same actions.

This is all well and good, but it is unusual to actually have a stream of doubles. In my application, we actually have a stream of trades. Now, we could map the stream of trades into a stream of doubles, but then it is a PITA to work backwards to which emissions relate to which trades. It would be much better if we could convert a State[Double, E] into a State[Trade, E].

This is where our earlier intuition about how states are related in form to functions from tokens to emissions comes in handy. The analogous operation on a function is comap. So, let’s define this for states and see where it gets us.

def comap[T1, E, T2](s: State[T1, E], f: T2=>T1): State[T2, E] = new State{T2, E]() {
def transition(t2: T2): StateUpdate[T2, E] = s.transition(f(t2) match {
case StateUpdate(nextState, emission, log) =>
StateUpdate(comap(nextState, f), emission, log)
}
}

Well! That was remarkably pain-free, and didn’t involve violating State in any way. I think we’re on to something here. Now, if comap was so easy to implement, then how about its ying-yang twin map? If we go back to our intuition again, map on the function analogy for a state would transform the emission value and type.

def map[T, E1, E2](s: State[T, E1], f: E1 -> E2): State[T, E2] = new State[T, E2() {
def transition(t: T): StateUpdate[T, E2] = s.transition(t) match {
case StateUpdate(nextState, emission, log) =>
StateUpdate(map(nextState, f), f(emission), log)
}
}

Again, entirely painless and with no encroachment on personal boundaries.

As well as these functor combinators for state, I have the same for StateMachine. I also have combinators that run states in parallel in various ways, state machines which run on the emissions of other state machines, and lots of other fun things. I have a truly composable deterministic discrete finite state machine library, by designing everything to be functional and read-only.

This is such a clean and sane abstraction that I find it impossible to believe that this wasn’t published before I was born and then forgotten and rediscovered and reinvented multiple times since. So, please, if you happen to know the original literature for this, or if you where the original author for this, write a comment and put me right.

2 thoughts on “Functional State machines

    1. Hi Thomas,

      Yes. I think this is what we’ve got here. FSTs are nice because they are inherently composable in ways that FSMs aren’t.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>