Pluggable Architecture in About 300 Lines of Code
This blog post guides readers through implementing a pluggable architecture in Go using around 300 lines of code. It introduces the concept, outlines essential requirements, and walks through the plugin interface and runner.
Key Takeaways
Building a plugin architecture in Go is easier than you might think.
Graceful shutdowns and signaling are essential for a plugin system.
A Standard plugin interface can simplify your application.
Building a pluggable architecture in Go is easier than you might think. In this blog post, we’ll walk through implementing a pluggable architecture in Go. While this implementation isn’t as robust as a full framework, it’s a great starter package for your own application and provides an easy and common way to structure services that run within your application.
First let’s outline the requements for our pluggable architecture.
- It should support graceful shutdown of services/plugins through common go conventions.
- It should support a
context.Context
argument. - It should be decoupled from the main application, and not require any changes to the main application.
- Implementation should be flexible and support using functions or objects as plugins.
With this in mind, I development the graceful
package as apart of my experimental work in building an http toolkit for building webservers in Go. You can find that work here.
In this post we’ll dive into the core pieces of the graceful package and how that can help create a flexible structure to build you applications on.
Walking Through The Architecture
This example omits some configuration and convenience methods for brevity, you can see the entire implementation in this repository github.com/hay-kot/httpkit.
If you want to use this in your own project, I suggest copying the code into your own project and modifying it to fit your needs.
The Plugin Interface
The plugin interface is the contract that all plugins must adhere to. It’s a 2 method interface that provides the plugin name and the start method.
type Plugin interface {
Name() string
Start(ctx context.Context) error
}
The Name()
method returns the name of the plugin, this is used for logging during the lifecycle. We’ll see this in action later.
The Start(ctx context.Context) error
method is the entry point to your plugin. It contains the run logic to be executed when the application starts. This method should block until the context is canceled, which would look something like this:
Blocking Version
Blocking plugins are the simplest to implement. They are started and run within in a blocking call. Within the runner it will be scheduled in a goroutine and run in the background. If you’re using a blocking call, it may also be useful to spin your own goroutine to manage the stop call to your plugin when the context is canceled.
func (p *MyPlugin) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
p.stop()
}
err := p.run()
if err != nil {
return err
}
}
Non-Blocking Version
No-blocking plugings are also simple to implement. Start your implementation and then block on the context until it’s canceled. This is useful if you need to do some cleanup or other work after the plugin has started.
func (p *MyPlugin) Start(ctx context.Context) error {
err, stop := p.start(ctx) // Non-blocking Call
if err != nil {
return err
}
defer stop()
<-ctx.Done()
return nil
}
The Runner
Now that we’ve established the plugin interface, we need a way to manage the plugins. This is where the runner comes in. The runner is responsible for starting and stopping the plugins.
// Runner is the orchestrator of the plugins provided. It will start and cancel
// the plugins based on the context and os.Signals provided.
type Runner struct {
started bool
plugins []Plugin
shutdown chan struct{}
opts *runnerOpts
}
func NewRunner(opts ...RunnerOptFunc) *Runner {
o := &runnerOpts{
signals: []os.Signal{os.Interrupt, syscall.SIGTERM},
timeout: 5 * time.Second,
println: func(v ...any) {}, // NOOP
}
for _, opt := range opts {
opt(o)
}
return &Runner{
opts: o,
shutdown: make(chan struct{}),
}
}
The Runner is initialized with sensible defaults, but can be configured using the options pattern. You can see those options here. We will also create it with a channel for shutdown signals, and a boolean toggle for managing start state.
Next, we need to add the plugins to the runner. This is done with the AddPlugin
and AddFunc
methods.
func (svr *Runner) AddPlugin(p ...Plugin) {
svr.plugins = append(svr.plugins, p...)
}
func (svr *Runner) AddFunc(name string, fn func(ctx context.Context) error) {
svr.plugins = append(svr.plugins, PluginFunc(name, fn))
}
Starting the Runner
And Finally, the core of the runner is the Start
method. This method will start all the plugins and block until the context is canceled or a shutdown signal is received. This method is the most complicated, I’ve annotated the codeblock with comments the explain each section of the method.
We’re going to go through this method step, by step. Find the full source code Here
First, we check to see if the server has already been started. If it has, we return an error.
func (svr *Runner) Start(ctx context.Context) error {
// Check if the server has already been started
// If it has, return an error
if svr.started {
return ErrRunnerAlreadyStarted
}
Next we setup our context, and ensure that cancel
is called by using the defer
keyword. Note that we provide the signals to the context, which will automatically cancel the context when the server is shutdown.
// Create a new context that will be canceled when the server is shutdown
// This is used to manage the lifecycle of the plugins. Note that because we
// accept a context, we can also be canceled by the caller.
ctx, cancel := signal.NotifyContext(ctx, svr.opts.signals...)
defer cancel()
Once we’ve setup our context, we can start our plugins. We’ll look at this in two parts. First we can to initialize some variables. A waitgroup to track the state of the plugins, a channel to track any errors during startup, and a wgChannel that we’ll use later for our shutdown timeout. We also create a go routine that will wait on the waitgroup and close the channel when complete.
// Next we are going to create a few items to help manage the lifecycle state of the
// plugins.
var (
// Create a waitgroup to wait for all the plugins to stop
wg = sync.WaitGroup{}
// A plugin error channel to terminate on startup errors
pluginErrCh = make(chan error)
// a waitgroup channel to block until all the plugins have stopped.
// We'll use this to manage the shutdown with a timeout.
wgChannel = make(chan struct{})
)
wg.Add(len(svr.plugins))
go func() {
wg.Wait()
close(wgChannel)
}()
Next, we iterate over the plugins and start them in their own go routine. We also check for any errors during startup and write them to the pluginErrCh
channel. We also use the wg.Done()
method to signal that the plugin has stopped.
var plugErr error
for _, p := range svr.plugins {
if plugErr != nil {
break
}
// Launch the plugin in a goroutine
go func(p Plugin) {
defer func() {
wg.Done()
}()
err := p.Start(ctx)
if err != nil {
plugErr = err
// safely write to the channel
// if the channel is full, we don't want to block
select {
case pluginErrCh <- err:
default:
}
}
}(p)
}
The last thing we’ll do is block on the context we created early to complete or the an error from the pluginErrCh
using a select
statement. In the event that the context is canceled, we’ll create a timer using the timeout duration that was provided and then use the wgChannel
we created earlier for plugin status and then wait for either the wgChannel to be closed, or for the newTimer
to expire. This is the most important part of the plugin architecture.
svr.started = true
defer func() {
svr.started = false
}()
// Block until either the context is canceled, or a plugin returns an error
select {
case <-ctx.Done():
// Once the shutdown process has started, we create a timer with the
// specified timeout.
newTimer := time.NewTimer(svr.opts.timeout)
defer newTimer.Stop()
svr.opts.println("server received signal, shutting down")
// Then we wait for all plugins to stop OR the timeout to expire.
select {
case <-wgChannel:
svr.opts.println("all plugins have stopped, shutting down")
return nil
case <-newTimer.C:
svr.opts.println("timeout waiting for plugins to stop, shutting down")
return context.DeadlineExceeded
}
case err := <-pluginErrCh:
svr.opts.println("plugin error:", err)
return err
}
}
Usage
Now that we’ve seen how the architecture is implemented, let’s see how we can use it. We’ll start by creating a simple plugin that logs a message every second.
func main() {
r := graceful.NewRunner()
r.AddFunc("Server 1", func(ctx context.Context) error {
server := &http.Server{
Addr: ":8080",
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello From Server 1"))
}),
}
go func() {
<-ctx.Done()
server.Shutdown(ctx)
}()
return server.ListenAndServe()
})
r.AddFunc("Server 2", func(ctx context.Context) error {
server := &http.Server{
Addr: ":8081",
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello From Server 2"))
}),
}
go func() {
<-ctx.Done()
server.Shutdown(ctx)
}()
return server.ListenAndServe()
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := r.Start(ctx); err != nil {
log.Fatal(err)
}
}
Do I Really Need a Package For This?
Maybe?
Probably Not.
The implementation for the runner and plugin system is quite simple, instead of using a package for this, I would highly suggest that you take the source code and incorporate it directly into your codebase.
I will be re-using this pattern in all of my future projects that require management of multiple services within a single application. I hope you find it useful too!