February 28, 2024

Pluggable Architecture in About 300 Lines of Code

This blog post guides readers through implementing a pluggable architecture in Go using around 300 lines of code. It introduces the concept, outlines essential requirements, and walks through the plugin interface and runner.

Key Takeaways

  • Building a plugin architecture in Go is easier than you might think.

  • Graceful shutdowns and signaling are essential for a plugin system.

  • A Standard plugin interface can simplify your application.

Building a pluggable architecture in Go is easier than you might think. In this blog post, we’ll walk through implementing a pluggable architecture in Go. While this implementation isn’t as robust as a full framework, it’s a great starter package for your own application and provides an easy and common way to structure services that run within your application.

First let’s outline the requements for our pluggable architecture.

  • It should support graceful shutdown of services/plugins through common go conventions.
  • It should support a context.Context argument.
  • It should be decoupled from the main application, and not require any changes to the main application.
  • Implementation should be flexible and support using functions or objects as plugins.

With this in mind, I development the graceful package as apart of my experimental work in building an http toolkit for building webservers in Go. You can find that work here.

In this post we’ll dive into the core pieces of the graceful package and how that can help create a flexible structure to build you applications on.

Walking Through The Architecture

This example omits some configuration and convenience methods for brevity, you can see the entire implementation in this repository github.com/hay-kot/httpkit.

If you want to use this in your own project, I suggest copying the code into your own project and modifying it to fit your needs.

The Plugin Interface

The plugin interface is the contract that all plugins must adhere to. It’s a 2 method interface that provides the plugin name and the start method.

type Plugin interface {
	Name() string
	Start(ctx context.Context) error
}

The Name() method returns the name of the plugin, this is used for logging during the lifecycle. We’ll see this in action later.

The Start(ctx context.Context) error method is the entry point to your plugin. It contains the run logic to be executed when the application starts. This method should block until the context is canceled, which would look something like this:

Blocking Version

Blocking plugins are the simplest to implement. They are started and run within in a blocking call. Within the runner it will be scheduled in a goroutine and run in the background. If you’re using a blocking call, it may also be useful to spin your own goroutine to manage the stop call to your plugin when the context is canceled.

func (p *MyPlugin) Start(ctx context.Context) error {
    go func() {
        <-ctx.Done()
        p.stop()
    }

    err := p.run()
    if err != nil {
        return err
    }
}

Non-Blocking Version

No-blocking plugings are also simple to implement. Start your implementation and then block on the context until it’s canceled. This is useful if you need to do some cleanup or other work after the plugin has started.

func (p *MyPlugin) Start(ctx context.Context) error {
    err, stop := p.start(ctx) // Non-blocking Call
    if err != nil {
        return err
    }

    defer stop()
    <-ctx.Done()

    return nil
}

The Runner

Now that we’ve established the plugin interface, we need a way to manage the plugins. This is where the runner comes in. The runner is responsible for starting and stopping the plugins.

// Runner is the orchestrator of the plugins provided. It will start and cancel
// the plugins based on the context and os.Signals provided.
type Runner struct {
	started  bool
	plugins  []Plugin
	shutdown chan struct{}
	opts     *runnerOpts
}

func NewRunner(opts ...RunnerOptFunc) *Runner {
	o := &runnerOpts{
		signals: []os.Signal{os.Interrupt, syscall.SIGTERM},
		timeout: 5 * time.Second,
		println: func(v ...any) {}, // NOOP
	}
	for _, opt := range opts {
		opt(o)
	}

	return &Runner{
		opts:     o,
		shutdown: make(chan struct{}),
	}
}

The Runner is initialized with sensible defaults, but can be configured using the options pattern. You can see those options here. We will also create it with a channel for shutdown signals, and a boolean toggle for managing start state.

Next, we need to add the plugins to the runner. This is done with the AddPlugin and AddFunc methods.

func (svr *Runner) AddPlugin(p ...Plugin) {
	svr.plugins = append(svr.plugins, p...)
}

func (svr *Runner) AddFunc(name string, fn func(ctx context.Context) error) {
	svr.plugins = append(svr.plugins, PluginFunc(name, fn))
}

PluginFunc implementation

Starting the Runner

And Finally, the core of the runner is the Start method. This method will start all the plugins and block until the context is canceled or a shutdown signal is received. This method is the most complicated, I’ve annotated the codeblock with comments the explain each section of the method.

We’re going to go through this method step, by step. Find the full source code Here

First, we check to see if the server has already been started. If it has, we return an error.

func (svr *Runner) Start(ctx context.Context) error {
    // Check if the server has already been started
    // If it has, return an error
	if svr.started {
		return ErrRunnerAlreadyStarted
	}

Next we setup our context, and ensure that cancel is called by using the defer keyword. Note that we provide the signals to the context, which will automatically cancel the context when the server is shutdown.

    // Create a new context that will be canceled when the server is shutdown
    // This is used to manage the lifecycle of the plugins. Note that because we
    // accept a context, we can also be canceled by the caller.
	ctx, cancel := signal.NotifyContext(ctx, svr.opts.signals...)
	defer cancel()

Once we’ve setup our context, we can start our plugins. We’ll look at this in two parts. First we can to initialize some variables. A waitgroup to track the state of the plugins, a channel to track any errors during startup, and a wgChannel that we’ll use later for our shutdown timeout. We also create a go routine that will wait on the waitgroup and close the channel when complete.

    // Next we are going to create a few items to help manage the lifecycle state of the
    // plugins.
	var (
        // Create a waitgroup to wait for all the plugins to stop
		wg          = sync.WaitGroup{}
        // A plugin error channel to terminate on startup errors
		pluginErrCh = make(chan error)
        // a waitgroup channel to block until all the plugins have stopped.
        // We'll use this to manage the shutdown with a timeout.
		wgChannel   = make(chan struct{})
	)

	wg.Add(len(svr.plugins))

	go func() {
		wg.Wait()
		close(wgChannel)
	}()

Next, we iterate over the plugins and start them in their own go routine. We also check for any errors during startup and write them to the pluginErrCh channel. We also use the wg.Done() method to signal that the plugin has stopped.

	var plugErr error
	for _, p := range svr.plugins {
		if plugErr != nil {
			break
		}
        // Launch the plugin in a goroutine
		go func(p Plugin) {
			defer func() {
				wg.Done()
			}()

			err := p.Start(ctx)
			if err != nil {
				plugErr = err

				// safely write to the channel
				// if the channel is full, we don't want to block
				select {
				case pluginErrCh <- err:
				default:
				}
			}
		}(p)
	}

The last thing we’ll do is block on the context we created early to complete or the an error from the pluginErrCh using a select statement. In the event that the context is canceled, we’ll create a timer using the timeout duration that was provided and then use the wgChannel we created earlier for plugin status and then wait for either the wgChannel to be closed, or for the newTimer to expire. This is the most important part of the plugin architecture.

	svr.started = true
	defer func() {
		svr.started = false
	}()

	// Block until either the context is canceled, or a plugin returns an error
	select {
	case <-ctx.Done():

        // Once the shutdown process has started, we create a timer with the
        // specified timeout.
		newTimer := time.NewTimer(svr.opts.timeout)
		defer newTimer.Stop()

		svr.opts.println("server received signal, shutting down")

        // Then we wait for all plugins to stop OR the timeout to expire.
		select {
		case <-wgChannel:
			svr.opts.println("all plugins have stopped, shutting down")
			return nil
		case <-newTimer.C:
			svr.opts.println("timeout waiting for plugins to stop, shutting down")
			return context.DeadlineExceeded
		}
	case err := <-pluginErrCh:
		svr.opts.println("plugin error:", err)
		return err
	}
}

Usage

Now that we’ve seen how the architecture is implemented, let’s see how we can use it. We’ll start by creating a simple plugin that logs a message every second.

func main() {
    r := graceful.NewRunner()
    r.AddFunc("Server 1", func(ctx context.Context) error {
        server := &http.Server{
            Addr:    ":8080",
            Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                w.Write([]byte("Hello From Server 1"))
            }),
        }

        go func() {
            <-ctx.Done()
            server.Shutdown(ctx)
        }()

        return server.ListenAndServe()
    })

    r.AddFunc("Server 2", func(ctx context.Context) error {
        server := &http.Server{
            Addr:    ":8081",
            Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                w.Write([]byte("Hello From Server 2"))
            }),
        }

        go func() {
            <-ctx.Done()
            server.Shutdown(ctx)
        }()

        return server.ListenAndServe()
    })

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    if err := r.Start(ctx); err != nil {
        log.Fatal(err)
    }
}

Do I Really Need a Package For This?

Maybe?

Probably Not.

The implementation for the runner and plugin system is quite simple, instead of using a package for this, I would highly suggest that you take the source code and incorporate it directly into your codebase.

I will be re-using this pattern in all of my future projects that require management of multiple services within a single application. I hope you find it useful too!