Efficient Event Streaming: Mastering Pub/Sub with Fastify and DragonFly

Efficient Event Streaming: Mastering Pub/Sub with Fastify and DragonFly

·

6 min read

By Luca Del Puppo

Building resilient and scalable applications is one of the most difficult challenges in our job. You must follow good practices and use the right tools in the right situation to achieve these objectives. This article illustrates a possible solution to scale applications using our delicious Fastify server and DragonFly. If you don’t know DragonFly, no worries; there is a small paragraph to introduce it, but if you are familiar with Redis, there is nothing to fear.

Scenario

One of the common scenarios we face in a project is to react when something happens.

Imagine a sign-up form; behind a simple form, there are many linked processes. For instance, save user data in the database, send an email to confirm the identity, send an SMS or anything else that should happen as a consequence of that.

In this post, we will use DragonFly as a Pub/Sub service to deliver an event when something happens in a Fastify application.

In this example, we will create a flow to handle the state history of users in a system, as the picture below shows.

Setup

We already prepared a project to get started. You will find it here.

The repository contains the whole application, and together we will see all the crucial parts of using DragonFly with Fastify.

To set up your environment, run the following commands, and you will be ready to start:

  • npm run infra:up

  • npm run db:migrate

  • npm run dev

The main folders relevant to this post are src/graphql and src/services/message-consumer. The first one contains the GraphQL server, and the second one contains the Fastify Plugin used as a consumer.

DragonFly, the quick introduction

DragonFly is a service very similar to Redis. It complies with the Redis API; if you want, you can switch between them, changing only the connection string.

DragonFly has performance among its objectives. To achieve that goal, it is implemented differently from Redis. In addition to that, in order to attract developers and consumers, DragonFly is 100% compliant with the Redis API and the Memcached API; still, the algorithms and data structures are different under the hood.

If you are looking for benchmarks, you can find them here.

Publish the first messages to DragonFly

There isn’t an official plugin to handle DragonFly with Fastify. Still, because DragonFly is compliant with the Redis API, you can simply use the @fastify/redis plugin directly. For the sake of clarity, we’re going to customize it a little bit nonetheless.

In the src/plugins/dragonFly.js path, you can find the plugin

import fp from 'fastify-plugin'

async function dragonFlyPlugin(app, opts) {
  await app.register(import('@fastify/redis'), {
    host: opts.config.DRAGONFLY_HOST,
    port: opts.config.DRAGONFLY_PORT
  })

  app.decorate('dragonFly', app.redis)
}

export default fp(dragonFlyPlugin, {
  fastify: '4.x',
  name: 'dragon-fly'
})

As you can see, after the registration of the @fastify/redis plugin, the Fastify server was decorated with a dragonFly client, in this way, you can use the syntax app.dragonFly to gain access to the dragonFly client.

Publishing messages to DragonFly is one of the post’s goals and the first one we will tackle. To do this we create a new Fastify plugin that has the goal of publishing messages on DragonFly. You can find the code in src/plugins/eventEmitter.js.

import { randomUUID } from 'crypto'
import fp from 'fastify-plugin'

async function eventEmitterPlugin(app) {
  function buildEvent(payload) {
    return {
      eventId: randomUUID(),
      payload,
      eventAt: Date.now()
    }
  }

  async function publishEvent(eventName, payload) {
    const event = buildEvent(payload)
    return await app.dragonFly.publish(eventName, JSON.stringify(event))
  }

  const emitter = {
    publishEvent
  }

  app.decorate('eventEmitter', emitter)
}

export default fp(eventEmitterPlugin, {
  fastify: '4.x',
  dependencies: ['dragon-fly'],
  name: 'event-emitter'
})

Looking at the code, you can see it is straightforward. This plugin depends on the dragonFly plugin and decorates the server with the eventEmitter decorator. This object contains only the publishEvent method, used to publish the event in DragonFly, and it accepts the eventName and the payload of the message.

The event emitter is used when a user is created, accepted, or rejected. In this example, these actions are simulated using three simple GraphQL mutations. The GraphQL server’s setup is at src/graphql/index.js and it’s pretty simple.

import mercurius from 'mercurius'
import mercuriusCache from 'mercurius-cache'
import { loaders, resolvers, schema } from './graphql.js'

export default async function graphqlService(app, opts) {
  await app.register(import('../plugins/dragonFly.js'), opts)
  await app.register(import('../plugins/eventEmitter.js'), opts)
  const { dragonFly } = app

  await app.register(mercurius, {
    schema,
    resolvers,
    loaders,
    graphiql: true
  })
}

It registers the DragonFly and the EventEmitter plugins, and then, using Mercurius, it sets up the GraphQL server.

In the src/graphql/graphql.js file there are the GraphQL mutations and as you can see, after the data is updated in the database, every mutation publishes the USER_STATUS_UPDATED_EVENT event on DragonFly using the ctx.app.eventEmitter.publishEvent method.

...
export const resolvers = {
  Query: {
    getUsers: async (_, __, ctx) => {
      return getUsers(ctx.app.pg)
    }
  },
  Mutation: {
    createUser: async (_, { user }, ctx) => {
      const newUser = await insertUser(ctx.app.pg, user)
      await ctx.app.eventEmitter.publishEvent(
        USER_STATUS_UPDATED_EVENT,
        newUser
      )
      return newUser
    },
    approveUser: async (_, { userId }, ctx) => {
      const updatedUser = await updateUserStatus(ctx.app.pg, userId, 'approved')
      await ctx.app.eventEmitter.publishEvent(
        USER_STATUS_UPDATED_EVENT,
        updatedUser
      )
      return updatedUser
    },
    rejectUser: async (_, { userId }, ctx) => {
      const updatedUser = await updateUserStatus(ctx.app.pg, userId, 'rejected')
      await ctx.app.eventEmitter.publishEvent(
        USER_STATUS_UPDATED_EVENT,
        updatedUser
      )
      return updatedUser
    }
  }
}
...

With these simple steps, we have covered all the requirements to publish a message from Fastify to DragonFly, and as you can see, it is pretty simple.

To recap:

  • Register the DragonFly plugin

  • Create the EventEmitter plugin

  • Register the GraphQL server

  • Use the EventEmmiter decorator in the mutations

Channel subscription

To consume the message published into a DragonFly channel, we’ll have to create a consumer that subscribes to the channel, and every time it receives a message, handle it.

In the src/services/message-consumer/index.js file, there is all the code required to subscribe to DragonFly and handle the message.

...
export const resolvers = {
  Query: {
    getUsers: async (_, __, ctx) => {
      return getUsers(ctx.app.pg)
    }
  },
  Mutation: {
    createUser: async (_, { user }, ctx) => {
      const newUser = await insertUser(ctx.app.pg, user)
      await ctx.app.eventEmitter.publishEvent(
        USER_STATUS_UPDATED_EVENT,
        newUser
      )
      return newUser
    },
    approveUser: async (_, { userId }, ctx) => {
      const updatedUser = await updateUserStatus(ctx.app.pg, userId, 'approved')
      await ctx.app.eventEmitter.publishEvent(
        USER_STATUS_UPDATED_EVENT,
        updatedUser
      )
      return updatedUser
    },
    rejectUser: async (_, { userId }, ctx) => {
      const updatedUser = await updateUserStatus(ctx.app.pg, userId, 'rejected')
      await ctx.app.eventEmitter.publishEvent(
        USER_STATUS_UPDATED_EVENT,
        updatedUser
      )
      return updatedUser
    }
  }
}
...

First, we must register the DragonFly plugin and then using the subscribe method will start the subscription to the channel.

Then, we’ll need to listen to the message event to handle the messages. This listener will be called on each new message in DragonFly.

In this example, the code calls the insertRegistrationHistoryRow method that saves the event in the user history to track the action. Finally, it’s also important to subscribe to Fastify’s onClose hook to unsubscribe the client when the server shuts down.

Now it’s time to check the result of running our application:

  • Run npm run dev

  • Navigate on http://localhost:3000/graphiql

  • Call the mutations createUser and then approveUser with the ID of the last user created

  • In the user’s history, you should see the history of such actions

As we have seen, creating a Pub/Sub service with DragonFly is a piece of cake, and the combo of DragonFly plus Fastify is also perfect for a high-performance solution.

Wrapping up

That’s a wrap on our journey on to scaling an application with Fastify and DragonFly. In this article, we discovered what DragonFly is and how it works. You learned how to use DragonFly as an event bus to scale and decouple the systems, and how easy it is to integrate it in a Fastify application with Mercurius.