Steven's Thoughts

Quick Container-based Microservice Development with Air

History. That's the microservice we will focus on this week.

We drop the video-storage microservice from previous weeks in Chapter 5: Communication Between Microservices of Bootstrapping Microservices so we can focus on the distinct ways microservices communicate with each other: directly via HTTP requests or indirectly using a third-party message broker like RabbitMQ. (Or Kafka, which will be covered in a subsequent series.)

But first, let's discuss our development process. We helpfully used air in the first post of this series to enable live reload outside of a container. We used air again last week outside of Docker to quickly isolate a configuration issue.

Wouldn't it be great if we could quickly iterate on our Dockerized microservices while they're actually running? Fortunately for us, we can create a Dockerfile for development that runs air and creates a superb workflow.

The Dockerfile in question?

FROM golang:1.23
RUN go install github.com/air-verse/air@latest
WORKDIR /src
ENV CGO_ENABLED=0
CMD ["air"]

We don't add any files to /src in this base container so we're forced to add a volume: mapping in our docker-compose.yml files. Should we forget, docker compose will remind us with this gentle error:

video-streaming  |
video-streaming  |   __    _   ___
video-streaming  |  / /\  | | | |_)
video-streaming  | /_/--\ |_| |_| \_ v1.60.0, built with Go go1.23.1
video-streaming  |
video-streaming  | mkdir /src/tmp
video-streaming  | watching .
video-streaming  | !exclude tmp
video-streaming  | building...
video-streaming  | go: go.mod file not found in current directory or any parent directory; see 'go help modules'
video-streaming  | failed to build, error: exit status 1
video-streaming  | running...
video-streaming  | /bin/sh: 1: /src/tmp/main: not found
video-streaming  | Process Exit with Code: 127

Ensure your volume mappings are correct if you experience this error. Then restart your services with dc down --volumes && dc up --build and you wil be ready to go with live reload fully enabled.

Are you ready? Let's begin.

Round 1: Ensure air works by refactoring video-streaming. (source)

We're going to keep it simple here: remove video-storage and minio from the docker-compose.yml file, then add a history section. Your new docker-compose.yml should look like this:

version: "3"
services:
  video-streaming:
    image: video-streaming
    build:
      context: ./video-streaming
      dockerfile: Dockerfile.dev
    container_name: video-streaming
    ports:
      - "4000:80"
    volumes:
      - ./video-streaming:/src
    environment:
      - PORT=80
    restart: "no"

  history:
    image: history
    build:
      context: ./history
      dockerfile: Dockerfile.dev
    container_name: history
    ports:
      - "4001:80"
    environment:
      - PORT=80
    restart: "no"

Notice that we changed the Dockerfile of each service to Dockerfile.dev. This is the air-based Dockerfile mentioned above that gives us live reload as we code.

Next, create a history folder in the project root and add the following file as history/main.go:

package main

import (
	"fmt"
	"log/slog"
	"net/http"
	"os"
)

func main() {
	// We switch from log to slog from here for structured logging.
	// We could use JSON, but text works for now.
	logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

	// We refactor the main func so we can fail eaaily on errors.
	// The benefit?  Reduced error handling clutter in main.
	if err := run(logger); err != nil {
		logger.Error(`[fatal]`, `err`, err)
		os.Exit(1)
	}
}

func run(log *slog.Logger) error {
	port, found := os.LookupEnv(`PORT`)
	if !found {
		// Fail if the port variable isn't found.
		return fmt.Errorf(`Please specify the port number for the HTTP server with the environment variable PORT.`)
	}

	// Create a basic ServeMux for later use.
	mux := http.NewServeMux()
	mux.HandleFunc("GET /viewed", func(w http.ResponseWriter, r *http.Request) {
	})

	log.Info(`Microservice online`)
	return http.ListenAndServe(fmt.Sprint(":", port), mux)
}

A good rule of thumb is to keep the main function simple, so we move the bulk of its code to the run function and pass it a new *slog.Logger. (We will use it in every example moving forward.)

The removal of video-storage in this chapter means we have to revert to a revised version of video-streaming from Chapter 3. Moving the core logic to the run function in video-streaming gives us:

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"log/slog"
	"net/http"
	"os"
	"strconv"
)

const (
	contentLength = "Content-Length"
	contentType   = "Content-Type"
)

type viewedMessageBody struct {
	VideoPath string `json:"videoPath" bson:"videoPath"`
}

func main() {
	logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

	if err := run(logger); err != nil {
		logger.Error(`[fatal]`, `err`, err)
		os.Exit(1)
	}
}

// Attempt to log the watched video to the history microservice.
func sendViewedMessage(log *slog.Logger, videoPath string) {
	// Create the request body
	viewedMessageBody := viewedMessageBody{
		VideoPath: videoPath,
	}

	// Create a new POST request
	req, _ := http.NewRequest(http.MethodPost, `http://history/viewed`, nil)

	// Set the content type header
	req.Header.Set(`Content-Type`, `application/json`)

	// Create a buffer to hold the JSON data
	var jsonBuffer bytes.Buffer

	// Use json.NewEncoder to encode the request body directly into the request
	json.NewEncoder(&jsonBuffer).Encode(viewedMessageBody)

	// Send the request
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		log.Error(`Failed to send 'viewed' message!`)
		log.Error(err.Error())
		return
	}
	defer resp.Body.Close()

	// Log the success.
	log.Error(`Sent 'viewed' message to history microservice.`)
}

func run(log *slog.Logger) error {
	port, found := os.LookupEnv(`PORT`)
	if !found {
		return fmt.Errorf(`Please specify the port number for the HTTP server with the environment variable PORT.`)
	}

	mux := http.NewServeMux()
	mux.HandleFunc(`GET /video`, func(w http.ResponseWriter, r *http.Request) {
		videoPath := `./videos/SampleVideo_1280x720_1mb.mp4`
		videoReader, err := os.Open(videoPath)
		if err != nil {
			log.Error(`Not Found`)
			w.WriteHeader(http.StatusNotFound)
			return
		}
		defer videoReader.Close()
		videoStats, err := videoReader.Stat()
		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			return
		}

		w.Header().Add(contentLength, strconv.FormatInt(videoStats.Size(), 10))
		w.Header().Add(contentType, `video/mp4`)
		io.Copy(w, videoReader)
		sendViewedMessage(log, videoPath)
	})

	log.Info(`Microservice online`)
	return http.ListenAndServe(`:`+port, mux)
}

Now the exciting part: cd into your project root and change a file. Maybe change the log.Info message or add a w.WriteHeader(http.StatusNotImplemented) to the /watched endpoint and see the magic happen live.

Onto Round 2.

Round 2: Tight coupling with the history microservice. (source)

Our history microservice stores every view of our sample video in a MongoDB collection, so let's add the mongo service to our docker-compose.yml file:

version: "3"
services:
  db:
    image: mongo:7
    container_name: db
    ports:
      - "4000:27017"
    restart: always

  video-streaming:
    image: video-streaming
    build:
      context: ./video-streaming
      dockerfile: Dockerfile.dev
    container_name: video-streaming
    ports:
      - "4001:80"
    volumes:
      - ./video-streaming:/src
    environment:
      - PORT=80
    restart: "no"

  history:
    image: history
    build:
      context: ./history
      dockerfile: Dockerfile.dev
    container_name: history
    ports:
      - "4002:80"
    volumes:
      - ./history:/src
    environment:
      - PORT=80
      - DBHOST=mongodb://db:27017/
      - DBNAME=video-streaming
    depends_on:
      - db
    restart: "no"

Based on what we see, this configuration demonstrates very tight coupling between the video-streaming and history microservices. We know that because:

  1. We know every view of a video in video-streaming is recorded somewhere, presumably via the history microservice.
  2. There is no HISTORY_HOST environment variable for video-streaming to know where to connect to.

So let's take a look at the updated history code.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log/slog"
	"net/http"
	"os"
	"strconv"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

const (
	contentLength = "Content-Length"
	contentType   = "Content-Type"
)

type viewedMessageBody struct {
	VideoPath string `json:"videoPath" bson:"videoPath"`
}

func main() {
	logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

	if err := run(logger); err != nil {
		logger.Error(`run failed`, `err`, err.Error())
	}
}

func run(log *slog.Logger) error {
	port := os.Getenv(`PORT`)
	dbhost := os.Getenv(`DBHOST`)
	dbname := os.Getenv(`DBNAME`)

	// Connect to Mongo
	// https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo
	clientOpts := options.Client().
		ApplyURI(dbhost)
	client, err := mongo.Connect(context.TODO(), clientOpts)
	if err != nil {
		return fmt.Errorf(`failed to connect to MongoDB: %s`, err)
	}
	// Retrieve a Mongo collection from the database.
	collection := client.Database(dbname).Collection(`history`)

	mux := http.NewServeMux()
	mux.HandleFunc(`POST /viewed`, func(w http.ResponseWriter, r *http.Request) {
		// use json.NewDecoder().Decode() to get videoPath
		var messageBody viewedMessageBody
		err := json.NewDecoder(r.Body).Decode(&messageBody)
		if err != nil {
			http.Error(w, err.Error(), http.StatusBadRequest)
			log.Error(`/viewed.Decode`, `err`, err.Error())
			return
		}

		// insertOne in history collection.
		_, err = collection.InsertOne(r.Context(), bson.M{`videoPath`: messageBody.VideoPath})
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			log.Error(`/viewed.collection.Insert`, `err`, err.Error())
			return
		}

		log.Info(`add`, `videoPath`, messageBody.VideoPath)

		w.WriteHeader(http.StatusOK)
	})

	mux.HandleFunc(`GET /history`, func(w http.ResponseWriter, r *http.Request) {
		skip := r.FormValue(`skip`)
		skipInt, err := strconv.Atoi(skip)
		if err != nil {
			w.WriteHeader(http.StatusNotAcceptable)
			return
		}
		limit := r.FormValue(`limit`)
		limitInt, err := strconv.Atoi(limit)
		if err != nil {
			w.WriteHeader(http.StatusNotAcceptable)
			return
		}

		findOptions := options.Find().
			SetSkip(int64(skipInt)).
			SetLimit(int64(limitInt))

		cursor, err := collection.Find(context.Background(), bson.D{}, findOptions)
		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			log.Error(`/history.collection.Find`, `err`, err.Error())
			return
		}
		defer cursor.Close(context.Background())

		var history []viewedMessageBody
		if err := cursor.All(context.Background(), &history); err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			log.Error(`/history.cursor.All`, `err`, err.Error())
			return
		}

		json.NewEncoder(w).Encode(history)
	})

	log.Info(`Microservice online!`)
	return http.ListenAndServe(fmt.Sprintf(`:%s`, port), mux)
}

We notice lines 39-48 connect to the MongoDB and retrieve a collection that we can use for general CRUD purposes, so video-storage must send a request here when images are viewed.

Are we right? Let's check.

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"log/slog"
	"net/http"
	"os"
	"strconv"
)

const (
	contentLength = "Content-Length"
	contentType   = "Content-Type"
)

type viewedMessageBody struct {
	VideoPath string `json:"videoPath"`
}

func main() {
	logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

	if err := run(logger); err != nil {
		logger.Error(`run failed`, `err`, err.Error())
	}
}

func run(log *slog.Logger) error {
	port, found := os.LookupEnv(`PORT`)
	if !found {
		return fmt.Errorf(`Please specify the port number for the HTTP server with the environment variable PORT.`)
	}

	mux := http.NewServeMux()
	mux.HandleFunc(`GET /video`, func(w http.ResponseWriter, r *http.Request) {
		videoPath := `./videos/SampleVideo_1280x720_1mb.mp4`
		videoReader, err := os.Open(videoPath)
		if err != nil {
			log.Error(`/video.os.Open`, `err`, err.Error())
			w.WriteHeader(http.StatusNotFound)
			return
		}
		defer videoReader.Close()
		videoStats, err := videoReader.Stat()
		if err != nil {
			log.Error(`/videoReader.Stat`, `err`, err.Error())
			w.WriteHeader(http.StatusInternalServerError)
			return
		}

		w.Header().Add(contentLength, strconv.FormatInt(videoStats.Size(), 10))
		w.Header().Add(contentType, `video/mp4`)
		// use io.Copy for streaming.
		io.Copy(w, videoReader)
		sendViewedMessage(log, videoPath)
	})

	log.Info(`Microservice online!`)
	return http.ListenAndServe(fmt.Sprint(`:`, port), mux)
}

// Attempt to log the watched video to the history microservice upon a view.
func sendViewedMessage(log *slog.Logger, videoPath string) {
	// Create the request body
	messageBody := viewedMessageBody{
		VideoPath: videoPath,
	}

	// Create a new POST request
	req, _ := http.NewRequest(http.MethodPost, `http://history/viewed`, nil)

	// Set the content type header
	req.Header.Set(`Content-Type`, `application/json`)

	// Create a buffer to hold the JSON data
	var jsonBuffer bytes.Buffer

	// Use json.NewEncoder to encode the request body directly into the request
	json.NewEncoder(&jsonBuffer).Encode(messageBody)

	// Send the request
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		log.Error(`Failed to send 'viewed' message!`)
		log.Error(err.Error())
		return
	}
	defer resp.Body.Close()

	// Log the success.
	log.Error(`Sent 'viewed' message to history microservice.`)
}

Yes we are! The GET /video handler calls sendViewedMessage on line 58, which then calls http://history/viewed on line 73.

While the code works, this tight coupling of microservices makes video-streaming entirely dependent on history being healthy.

What if we want to upgrade the history microservice? video-streaming will silently fail to record new views until the upgrade is complete. (This can work in a Modular Monolith. That's a conversation for a future article, however.)

Do we have options if we forego the direct service-to-service communication? Indeed we do!

Which leads us to…

Round 3: Loose coupling of microservices with RabbitMQ. (source)

Think of RabbitMQ like en email inbox. Anyone can send you a message anytime they wish and it is stored in your inbox for you to read—and answer, if you wish, at your leisure.

Wouldn't it be nice if the history microservice upgrade example had that ability? It can with RabbitMQ!

Let's see how.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log/slog"
	"net/http"
	"os"
	"strconv"

	amqp "github.com/rabbitmq/amqp091-go"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

type viewedMessageBody struct {
	VideoPath string `json:"videoPath" bson:"videoPath"`
}

func main() {
	logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

	if err := run(logger); err != nil {
		logger.Error(`run failed`, `err`, err.Error())
		os.Exit(1)
	}
}

func run(log *slog.Logger) error {
	port := os.Getenv(`PORT`)
	dbhost := os.Getenv(`DBHOST`)
	dbname := os.Getenv(`DBNAME`)
	rabbit := os.Getenv(`RABBIT`)

	// Connect to Mongo
	// https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo
	clientOpts := options.Client().
		ApplyURI(dbhost)
	client, err := mongo.Connect(context.TODO(), clientOpts)
	failWithError(log, err, `mongo.Connect`)

	collection := client.Database(dbname).Collection(`history`)
	defer client.Disconnect(context.TODO())

	// Connect to RabbitMQ
	conn, err := amqp.Dial(rabbit)
	failWithError(log, err, `amqp.Dial`)
	defer conn.Close()

	// Now we need to connect to the queue, consume messages.
	ch, err := conn.Channel()
	failWithError(log, err, `conn.Channel`)
	defer ch.Close()

	// Ensure the viewed queue exists in RabbitMQ;
	// create if necessary.
	viewedMessageQueue, err := ch.QueueDeclare(
		`viewed`, // name
		true,     // durable
		false,    // delete when unused
		false,    // exclusive
		false,    // no-wait
		nil,      // arguments
	)
	failWithError(log, err, `ch.QueueDeclare`)

	// Create a message retrieval channel now
	// that we know the `viewed` queue exists.
	msgs, err := ch.Consume(
		viewedMessageQueue.Name, // queue
		``,                      // consumer
		false,                   // auto-ack
		false,                   // exclusive
		false,                   // no-local
		false,                   // no-wait
		nil,                     // args
	)
	failWithError(log, err, `ch.Consume`)

	go func() {
		// Retrieve every message in the viewedMessageQueue.
		// Do we know who sent it?  No, but that's the beauty
		// of it.
		for msg := range msgs {
			var msgBody viewedMessageBody
			bson.Unmarshal(msg.Body, &msgBody)

			// Store msgBody into our collection.
			res, err := collection.InsertOne(context.TODO(), msgBody)
			failWithError(log, err, `collection.InsertOne`)
			log.Info(`collection.InsertOne`, `insertedId`, res.InsertedID)
			// Acknowledge the message receipt so it can be deleted.
			msg.Ack(true)
		}
	}()

	// The viewed handler is no longer necessary since we're pulling from the
	// queue.  But we do need an endpoint that will print our view history.
	mux := http.NewServeMux()
	// That's where /history comes in.
	mux.HandleFunc(`GET /history`, func(w http.ResponseWriter, r *http.Request) {
		// Retrieve a list of limit messages after the first skip many.
		skip, limit := r.FormValue(`skip`), r.FormValue(`limit`)
		skipInt, _ := strconv.Atoi(skip)
		limitInt, _ := strconv.Atoi(limit)
		// Set the find option parameters.
		findOptions := options.Find().
			SetSkip(int64(skipInt)).
			SetLimit(int64(limitInt))

		// Find every entry.  Ignore the first skipInt,
		// retrieve up to limitInt.
		cursor, err := collection.Find(context.TODO(), bson.D{}, findOptions)
		warnOnNonFatalError(log, err, `/history.collection.Find`)

		// Create storage space for our query and retrieve.
		var results []viewedMessageBody
		err = cursor.All(context.TODO(), &results)
		warnOnNonFatalError(log, err, `/history.Cursor.All`)

		// Return the results as a JSON body.
		json.NewEncoder(w).Encode(results)
	})

	// Start the server.
	log.Info(`Microservice online.`)
	return http.ListenAndServe(fmt.Sprintf(`:%s`, port), mux)
}

// warnOnNonFatalError logs errors without stopping the program.
func warnOnNonFatalError(log *slog.Logger, err error, msg string) {
	if err != nil {
		log.Error(msg, `error`, err)
	}
}

// failWithError logs the error and stops the program.
func failWithError(log *slog.Logger, err error, msg string) {
	if err != nil {
		log.Error(msg, `error`, err)
		os.Exit(2)
	}
}

The code above demonstrates how every message retrieved from the viewed queue is stored in the history collection. So who puts messages in the queue?

For testing, we can use RabbitMQ's control panel by visiting http://localhost:15672 and logging in with the username and password of guest.

Notice in the docker-compose.yml below that both the video-streaming and history services have access to RabbitMQ's connection string. (As noted in the book, only use guest in development.)

services:
  db:
    image: mongo:7
    container_name: db
    ports:
      - "4000:27017"
    restart: always

  rabbit:
    image: rabbitmq:3.12.4-management
    container_name: rabbit
    ports:
      - "5672:5672"
      - "15672:15672"
    restart: always

  video-streaming:
    image: video-streaming
    build:
      context: ./video-streaming
      dockerfile: Dockerfile.dev
    container_name: video-streaming
    ports:
      - "4001:80"
    volumes:
      - ./video-streaming:/src
    environment:
      - PORT=80
      - RABBIT=amqp://guest:guest@rabbit:5672
    depends_on:
      - rabbit
      - history
    restart: "no"

  history:
    image: history
    build:
      context: ./history
      dockerfile: Dockerfile.dev
    container_name: history
    ports:
      - "4002:80"
    volumes:
      - ./history:/src
    environment:
      - PORT=80
      - DBHOST=mongodb://db:27017/
      - DBNAME=video-streaming
      - RABBIT=amqp://guest:guest@rabbit:5672
    depends_on:
      - db
      - rabbit
    restart: "no"

We can test this microservice by logging in to RabbitMQ and clicking the Queues and Streams tab. Then click the Publish message dropdown, paste in a JSON message like {"videoPath":"./videos/SampleVideo_1280x720_1mb.mp4"} , and click the Publish message button.

You will see a log message like the following if you already ran dc up --build. This means the history microservice is correctly receiving messages as they are added to RabbitMQ!

history          | time=2024-10-01T02:27:45.382Z level=INFO msg=collection.InsertOne insertedId=66fb5e21456bbf5fb79a4a94

Still with me? Great! Let's now take a look at the refactored video-streaming microservice that connects and publishes messages to the viewedMessageQueue.

package main

import (
	"fmt"
	"io"
	"log/slog"
	"net/http"
	"os"
	"strconv"

	amqp "github.com/rabbitmq/amqp091-go"
	"go.mongodb.org/mongo-driver/bson"
)

const (
	contentLength = "Content-Length"
	contentType   = "Content-Type"
)

type viewedMessageBody struct {
	VideoPath string `json:"videoPath" bson:"videoPath"`
}

func main() {
	logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

	if err := run(logger); err != nil {
		logger.Error(`run failed`, `err`, err.Error())
		os.Exit(1)
	}
}

func run(log *slog.Logger) error {
	port, found := os.LookupEnv(`PORT`)
	if !found {
		return fmt.Errorf(`Please specify the port number for the HTTP server with the environment variable PORT.`)
	}

	rabbit := os.Getenv(`RABBIT`)

	// Connect to RabbitMQ
	conn, err := amqp.Dial(rabbit)
	failWithError(log, err, `amqp.Dial`)
	defer conn.Close()

	// Now we need to connect to the queue, consume messages.
	ch, err := conn.Channel()
	failWithError(log, err, `conn.Channel`)
	defer ch.Close()

	// Ensure the viewed queue exists.
	viewedMessageQueue, err := ch.QueueDeclare(
		`viewed`, // name
		true,     // durable
		false,    // delete when unused
		false,    // exclusive
		false,    // no-wait
		nil,      // arguments
	)
	failWithError(log, err, `ch.QueueDeclare`)

	mux := http.NewServeMux()
	mux.HandleFunc(`GET /video`, func(w http.ResponseWriter, r *http.Request) {
		videoPath := `./videos/SampleVideo_1280x720_1mb.mp4`
		videoReader, err := os.Open(videoPath)
		if err != nil {
			log.Error(`/video.os.Open`, `err`, err.Error())
			w.WriteHeader(http.StatusNotFound)
			return
		}
		defer videoReader.Close()
		videoStats, err := videoReader.Stat()
		if err != nil {
			log.Error(`/videoReader.Stat`, `err`, err.Error())
			w.WriteHeader(http.StatusInternalServerError)
			return
		}

		w.Header().Add(contentLength, strconv.FormatInt(videoStats.Size(), 10))
		w.Header().Add(contentType, `video/mp4`)
		// use io.Copy for streaming.
		io.Copy(w, videoReader)
		sendViewedMessage(log, videoPath, ch, &viewedMessageQueue)
	})

	log.Info(`Microservice online!`)
	return http.ListenAndServe(fmt.Sprint(`:`, port), mux)
}

func sendViewedMessage(log *slog.Logger, path string, channel *amqp.Channel, queue *amqp.Queue) {
	body := viewedMessageBody{
		VideoPath: path,
	}

	// Convert our payload into BSON; an optimized
	// version of JSON.
	payload, err := bson.Marshal(body)
	if err != nil {
		failWithError(log, err, `bson.Marshal`)
		return
	}

	// Attempt to publish a message to the given queue.
	err = channel.Publish(``, queue.Name, false, false, amqp.Publishing{
		ContentType: `application/bson`,
		Body:        payload,
	})
	failWithError(log, err, `channel.Publish`)
}

func failWithError(log *slog.Logger, err error, msg string) {
	if err != nil {
		log.Error(msg, `error`, err)
		os.Exit(2)
	}
}

The only change from Round 2 is a slightly refactored sendViewedMessage, which now publishes a message to the viewedMessageQueue via the proper channel instead of directly contacting the history microservice.

With RabbitMQ in place, we can now safely upgrade history without any risk of losing messages.

Will this code work if multiple microservices want to listen for messages on the viewed queue? Not exactly, which leads us to:

Round 4: Loose coupling of microservices with multiple recipients. (source)

The bad thing about queues? They can only be read from once.

The good thing about RabbitMQ? It adds the concept of an Exchange.

An Exchange routes messages from its producers, video-streaming in this case, to every queue that binds to it with QueueBind.

Let's see some code. We'll start with video-streaming this time:

package main

import (
	"fmt"
	"io"
	"log/slog"
	"net/http"
	"os"
	"strconv"

	amqp "github.com/rabbitmq/amqp091-go"
	"go.mongodb.org/mongo-driver/bson"
)

const (
	contentLength = `Content-Length`
	contentType   = `Content-Type`
)

type viewedMessageBody struct {
	VideoPath string `json:"videoPath" bson:"videoPath"`
}

func main() {
	logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

	if err := run(logger); err != nil {
		logger.Error(`run failed`, `err`, err.Error())
		os.Exit(1)
	}
}

func run(log *slog.Logger) error {
	port, found := os.LookupEnv(`PORT`)
	if !found {
		fmt.Errorf(`Please specify the port number for the HTTP server with the environment variable PORT.`)
	}

	rabbit := os.Getenv(`RABBIT`)

	// Connect to RabbitMQ
	conn, err := amqp.Dial(rabbit)
	failWithError(log, err, `amqp.Dial`)
	defer conn.Close()

	// Create a channel to communicate with RabbitMQ.
	ch, err := conn.Channel()
	failWithError(log, err, `conn.Channel`)
	defer ch.Close()

	// Declare an exchange of type "fanout".
	// This exchange will route messages to all queues bound to it,
	// allowing for broadcast messaging to multiple consumers.
	err = ch.ExchangeDeclare(
		`Viewed`, // Exchange name.
		`fanout`, // Exchange type.
		true,     // Durable?
		false,    // Delete when unused.
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failWithError(log, err, `ch.ExchangeDeclare`)

	mux := http.NewServeMux()
	mux.HandleFunc(`GET /video`, func(w http.ResponseWriter, r *http.Request) {
		videoPath := `./videos/SampleVideo_1280x720_1mb.mp4`
		videoReader, err := os.Open(videoPath)
		if err != nil {
			w.WriteHeader(http.StatusNotFound)
			return
		}
		defer videoReader.Close()
		videoStats, err := videoReader.Stat()
		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			return
		}

		w.Header().Add(contentLength, strconv.FormatInt(videoStats.Size(), 10))
		w.Header().Add(contentType, `video/mp4`)
		// use io.Copy for streaming.
		io.Copy(w, videoReader)
		sendViewedMessage(log, videoPath, ch)
	})

	log.Info(`Microservice online!`)
	return http.ListenAndServe(fmt.Sprint(`:`, port), mux)
}

func sendViewedMessage(log *slog.Logger, path string, channel *amqp.Channel) {
	// Refactor to send to RabbitMQ.
	body := viewedMessageBody{
		VideoPath: path,
	}

	payload, err := bson.Marshal(body)
	if err != nil {
		return
	}

	err = channel.Publish(`Viewed`, ``, false, false, amqp.Publishing{
		ContentType: `application/bson`,
		Body:        payload,
	})

	failWithError(log, err, `Unable to publish to RabbitMQ channel`)
}

func failWithError(log *slog.Logger, err error, msg string) {
	if err != nil {
		log.Error(msg, `error`, err)
		os.Exit(2)
	}
}

The ch.ExchangeDeclare function ensures the creation of the Exchange bound to the communication channel ch. sendViewedMessage in the above code publishes a message to the Viewed exchange that is consumed by all queues bound to it.

Let's now look at the updated history code:

package main

import (
	"context"
	"fmt"
	"log/slog"
	"net/http"
	"os"
	"strconv"

	amqp "github.com/rabbitmq/amqp091-go"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

type viewedMessageBody struct {
	VideoPath string `json:"videoPath" bson:"videoPath"`
}

func main() {
	logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

	if err := run(logger); err != nil {
		logger.Error(`run failed`, `err`, err.Error())
		os.Exit(1)
	}
}

func run(log *slog.Logger) error {
	port := os.Getenv(`PORT`)
	dbhost := os.Getenv(`DBHOST`)
	dbname := os.Getenv(`DBNAME`)
	rabbit := os.Getenv(`RABBIT`)

	// Connect to Mongo
	// https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo
	clientOpts := options.Client().
		ApplyURI(dbhost)
	client, err := mongo.Connect(context.TODO(), clientOpts)
	failWithError(log, err, `mongo.Connection`)

	collection := client.Database(dbname).Collection(`history`)
	defer client.Disconnect(context.TODO())

	// Connect to RabbitMQ
	conn, err := amqp.Dial(rabbit)
	failWithError(log, err, `amqp.Dial`)
	defer conn.Close()

	// Create a channel to communicate with RabbitMQ.
	ch, err := conn.Channel()
	failWithError(log, err, `conn.Channel`)
	defer ch.Close()

	// Declare an exchange of type "fanout" so we can bind to it.
	err = ch.ExchangeDeclare(
		`Viewed`, // Exchange name.
		`fanout`, // Exchange type.
		true,     // Durable?
		false,    // Delete when unused.
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failWithError(log, err, `ch.ExchangeDeclare`)

	// Declare a queue named "historyQueue".
	// This queue will be used to store messages routed from the "Viewed" exchange.
	historyQueue, err := ch.QueueDeclare(
		`historyQueue`, // name
		false,          // durable
		false,          // delete when unused
		false,          // exclusive
		false,          // no-wait
		nil,            // arguments
	)
	failWithError(log, err, `ch.QueueDeclare`)

	// Bind the historyQueue to the exchange.
	err = ch.QueueBind(
		historyQueue.Name, // Queue name to bind,
		``,                // routing key; not applicable for fanout exchanges
		`Viewed`,          // Exchange name.
		false,             // no-wait
		nil,               // arguments
	)
	failWithError(log, err, `ch.QueueBind`)

	// Create a channel to receive messages sent to our historyQueue.
	msgs, err := ch.Consume(
		historyQueue.Name, // queue
		``,                // consumer
		false,             // auto-ack
		false,             // exclusive
		false,             // no-local
		false,             // no-wait
		nil,               // args
	)
	failWithError(log, err, `ch.Consume`)

	go func() {
		for msg := range msgs {
			var msgBody viewedMessageBody
			bson.Unmarshal(msg.Body, &msgBody)

			// Add to Mongo
			res, err := collection.InsertOne(context.TODO(), msgBody)
			failWithError(log, err, `collection.InsertOne`)
			log.Info(`collection.InsertOne`, `insertedId`, res.InsertedID)
			msg.Ack(true)
		}
	}()

	// The viewed handler is no longer necessary since we're ulling from the
	// queue.  But we do need an endpoint that will print our view history.
	mux := http.NewServeMux()
	mux.HandleFunc(`GET /history`, func(w http.ResponseWriter, r *http.Request) {
		skip, limit := r.FormValue(`skip`), r.FormValue(`limit`)
		skipInt, _ := strconv.Atoi(skip)
		limitInt, _ := strconv.Atoi(limit)
		w.Header().Add(`Content-Type`, `plain/text`)
		findOptions := options.Find().
			SetSkip(int64(skipInt)).
			SetLimit(int64(limitInt))

		cursor, err := collection.Find(context.TODO(), bson.D{}, findOptions)
		warnOnNonFatalError(log, err, `/history.collection.Find`)

		var results []viewedMessageBody

		err = cursor.All(context.TODO(), &results)
		warnOnNonFatalError(log, err, `/history.Cursor.All`)

		for _, result := range results {
			log.Info(`cursor.All`, `videoPath`, result.VideoPath)
		}
	})

	log.Info(`Microservice online!`)
	return http.ListenAndServe(fmt.Sprintf(`:%s`, port), mux)
}

func warnOnNonFatalError(log *slog.Logger, err error, msg string) {
	if err != nil {
		log.Error(msg, `error`, err)
	}
}

func failWithError(log *slog.Logger, err error, msg string) {
	if err != nil {
		log.Error(msg, `error`, err)
		os.Exit(2)
	}
}

The important aspects of the code above are (1) we define the Exchange, (2) we define a historyQueue, and (3) we bind the historyQueue to the Exchange so we can receive messages sent to it.

We repeat these steps for a new recommendations microservice to prove that an Exchange delivers messagea to all of its subscribers:

package main

import (
	"fmt"
	"log/slog"
	"net/http"
	"os"

	amqp "github.com/rabbitmq/amqp091-go"
	"go.mongodb.org/mongo-driver/bson"
)

type viewedMessageBody struct {
	VideoPath string `json:"videoPath" bson:"videoPath"`
}

func main() {
	logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

	if err := run(logger); err != nil {
		logger.Error(`run failed`, `err`, err.Error())
		os.Exit(1)
	}
}

func run(log *slog.Logger) error {
	port := os.Getenv(`PORT`)
	rabbit := os.Getenv(`RABBIT`)

	// Connect to RabbitMQ
	conn, err := amqp.Dial(rabbit)
	failWithError(log, err, `amqp.Dial`)
	defer conn.Close()

	// Create a channel to communicate with RabbitMQ.
	ch, err := conn.Channel()
	failWithError(log, err, `conn.Channel`)
	defer ch.Close()

	// Declare an exchange of type "fanout".
	// This exchange will route messages to all queues bound to it,
	// allowing for broadcast messaging to multiple consumers.
	err = ch.ExchangeDeclare(
		`Viewed`,
		`fanout`,
		true,
		false,
		false,
		false,
		nil)
	failWithError(log, err, `ch.ExchangeDeclare`)

	// Declare a queue named "recommendationsQueue".
	// This queue will be used to store messages routed from the "Viewed" exchange.
	recommendationsQueue, err := ch.QueueDeclare(
		`recommendationsQueue`, // name
		false,                  // durable
		false,                  // delete when unused
		false,                  // exclusive
		false,                  // no-wait
		nil,                    // arguments
	)
	failWithError(log, err, `ch.QueueDeclare`)

	// Bind the recommendationsQueue to the exchange.
	err = ch.QueueBind(
		recommendationsQueue.Name, // Queue name to bind,
		``,                        // routing key; not applicable for fanout exchanges
		`Viewed`,                  // Exchange name.
		false,                     // no-wait
		nil,                       // arguments
	)
	failWithError(log, err, `ch.QueueBind`)

	// Create a channel to receive messages sent to our recommendationsQueue.
	msgs, err := ch.Consume(
		recommendationsQueue.Name, // queue
		``,                        // consumer
		false,                     // auto-ack
		false,                     // exclusive
		false,                     // no-local
		false,                     // no-wait
		nil,                       // args
	)
	failWithError(log, err, `ch.Consume`)

	go func() {
		for d := range msgs {
			var msgBody viewedMessageBody
			bson.Unmarshal(d.Body, &msgBody)

			log.Info(`'viewed' message ack.`)
		}
	}()

	mux := http.NewServeMux()
	log.Info(`Microservice online!`)
	// We're simply starting this server as a demo.
	return http.ListenAndServe(fmt.Sprintf(`:%s`, port), mux)
}

func failWithError(log *slog.Logger, err error, msg string) {
	if err != nil {
		log.Error(msg, `error`, err)
		os.Exit(2)
	}
}

Launch the whole system with dc up --build and watch the log messages appear everytime you visit http://localhost:4001/video.

Conclusion

We have now started the code for every microservice in the amazing Bootstrapping Microservices book. Login and comments section aside, you now have the basic understanding of how to build sites like Facebook, Flickr, YouTube, and even Twitter/X.

Next week we'll focus on deploying our microservices to any Kubernetes host. We'll keep everything local (and free) by using minikube.

Then we'll learn to run Continuous Integration tests locally with act.

You've learned a lot in this series so far, but there's always so much more to learn!

Rambling

To be honest, learning about the air-based (well, nodemon in the book) workflow is what made me want to start blogging.

The joy of coding for me is trying, failing, learning, and trying again until I succeed in bringing my vision to life for me and the world. Developing microservices with Go and WITHOUT air always reminded me of this classic xkcd comic:

https://imgs.xkcd.com/comics/compiling.png

#microservices #docker #mongodb #air #rabbitmq