The previous article peeked into -Based Programming (FBP), a paradigm that puts the of the data above the code that makes the data . An FBP application can be described as a network of loosely coupled processing nodes, only connected through data pipelines. The article’s code made use of a quite sophisticated FBP library that made the magic of a convenient syntax happen through reflection (hidden within the library, but still).

The article triggered a couple of comments on Reddit that were suggesting pure Go approaches, without using third-party libraries.

This made me curious.

How well would the code from the previous article translate into “just stdlib” code?

I gave it a try, and here is the result.

Constructing an FBP net in pure stdlib Go

The approach is as simple as it can get:

Data flow: Every node reads from one or more input channels, and writes to one or more output channels.

Network construction: Weaving the net happens in main() through creating channels and connecting them to the input and output ports of the processing nodes.

Starting the net: The net starts by calling a Process() method on each node and feeding data into the network’s input channel.

Stopping the net: The net stops when the net’s input channel is closed. Then every node whose input channels get closed closes his output channels and shuts down, and this way the shutdown propagates through the network until the last node (the “sink” node with no output channel) stops.

Changes to the code

The code below is based on a 1:1 copy of the code from the previous article. Then the following changes were .

Input channels

The goflow framework takes care of each node’s input channels, and the nodes need special “On...()” functions that received a single channel item at a time.

I changed the nodes to have their own input channels, and I replaced the On...() methods with Process() methods that take no arguments and start a goroutine to read from the input channel(s) and write to the output channel(s). This is substantially more code compared to the On...() methods that mostly were one-liners; however, in real life where each node would contain much more code, the overhead for input and output handling would be negligible.

No more fan-in

The original code used one channel between the two counter nodes and the printer node. Go channels trivially support a fan-in scenario with multiple writers and one reader on the same channel.

I had to change this so that the printer node now has two input channels, and the two counter nodes do not share the same output channel anymore but send their results into separate channels.

Why? The reason is the network’s shutdown mechanism. As described above, each node shuts down when its input channels are closed. Piece of cake, you might think, but things get difficult when a channel has multiple writers, as in the counter/printer part of our network.

As you know, closing a channel closes it completely, and other writers panic when trying to write to this channel. (Personally, I would prefer a fan-in semantic where all writers except the last one would only close their own end of the channel they share rather than the whole channel at once, but this is not how channels work in Go.)

So we need to split every multi-writer channel into separate channels. Then we can write a merge function that merges all the channels into one, and also takes care of closing the output channels when the last of the input channels closes.

Or, rather than writing one, we can take a ready-made merge() function from the Go blog With some very minor changes, the merge function is now a method of the printer node. Problem solved!

Signaling shutdown completion to the outside

Without the goflow framework, we also need to add a mechanism to tell the outside that the network has shut down. This is the duty of the final node in the network. Similar to how goflow does it, our printer node closes a channel of empty structs when concluding work.

An empty, unbuffered channel blocks its readers. When it is closed, however, it starts delivering the channels zero value. Any read operation on this channel then unblocks, and this is how we can make main() wait for the network to shut down.

(Side note: This behavior may seem counterintuitive and difficult to deal with, but remember that the “comma, ok” idiom for the receive operator tells you if the channel has been closed.)

The code

Ok, enough talking about the whats and whys, now let’s dive into the code!

And that’s it! For more complex networks, you can always define an interface like this…

type processor interface {
	Process()
}

…then define a network…

type counterNet map[string]processor

…and in main(), create the network from the nodes…

net := counterNet{
	"splitter": &splitter{
		In:   in,
		Out1: sToWc,
		Out2: sToLc,
	},
	"wordCounter": &wordCounter{
		Sentence: sToWc,
		Count:    wcToP,
	},
	// ...
}

…and then start all nodes within a loop (thanks to the interface defined above):

for node := range net {
	net[node].Process()
}

When you go get the code (see below), an extra file with a runnable interface version of the code is included (interfaceVersion/flow2goWithInterface.go).

How to get and run the code

Step 1: go get the code. Two notes here:

  • Use the -d flag to prevent auto-installing the binary into $GOPATH/bin.
  • Ensure to include the ellipsis (…) at the end to also fetch the alternate versions in the subdirectories goflowVersion (from the previous article) and interfaceVersion (see above).
go get -d github.com/appliedgo/flow2go/...

Step 2: cd to the source code directory.

cd $GOPATH/src/github.com/appliedgo/flow2go

Step 3. Run the binary.

go run flow2go.go

You should see an output similar to this:

Start the nodes.
Splitter starts.
WordCounter starts.
LetterCounter starts.
Printer starts.
Send the data into the network.
Splitter has finished.
WordCounter has finished.
Words: 13
Words: 17
Words: 8
Letters: 45
LetterCounter has finished.
Letters: 70
Letters: 36
Printer has finished.
Network has shut down.

Conclusion

With only some basic Go mechanisms – goroutines, channels, and a WaitGroup (in the merge method), it is possible to re-implement the FBP network from the previous article without any third-party library. The code size increased a bit but in a manageable way that should scale quite well with the number of nodes.

Happy coding!



Source link

LEAVE A REPLY

Please enter your comment!
Please enter your name here