recursive.codes


The Personal Blog of Todd Sharp

Creating an MQTT to Oracle Advanced Queuing (AQ) Bridge in Node

Posted By: Todd Sharp on 12/3/2021 8:00 GMT
Tagged: Messaging, Node

In my last post, we looked at how to use the Node oracledb module to produce (enqueue) and consume (dequeue) messages from an Oracle Advanced Queuing (AQ) queue. This approach works well when we've got control over the messaging infrastructure and can set up both the producer and consumer. But sometimes reality is a bit different and the reality of our environments dictates that we need to consume from (or produce to) a messaging platform that we didn't necessarily choose or implement ourselves. For example, maybe the service that your team is working to implement needs to consume messages from an MQTT topic and do "something" with those messages. Your business needs will dictate what that "something" is - maybe filter the incoming messages based on some criteria. Maybe trigger some workflow, or persist all/some of the data. After you've worked with the incoming message, maybe your service needs to publish another message indicating that the processing is complete. Or, maybe you just need to pass the message on without any processing, but to an endpoint that speaks a different protocol? I know this sounds contrived, but the reality is that it's quite common to need to "bridge" different messaging protocols together. In this post, we'll enhance the application that we built in the last post to add an MQTT to AQ bridge. You just might be surprised at how little code is required to build this bridge.

There's not much to this. Simple pub/sub style messaging involves a message "source" (or the "producer"), possibly an intermediate filter, and a message "sink" (or the "consumer"). This requires some sort of messaging platform. There are several to choose from (some utilizing different protocols among them) and each has its different strengths and weaknesses. We won't go deep into messaging protocols in this post (that's a topic for another day). To help visualize this so that we can fully understand why a bridge is necessary, let's take a look at a basic architecture that uses MQTT (a very lightweight protocol commonly used in IoT applications).

In a "perfect" world, this architecture works well. But as stated earlier, sometimes we don't have full control over the architecture. For whatever reason, sometimes it is necessary to "bridge" between messaging protocols.

Assumptions

We'll assume that you've already got an MQTT server setup and ready for incoming messages, but if you've not done that yet, consider my post on running RabbitMQ in an "always free" instance in the Oracle Cloud, or feel free to get your own MQTT server running (there are tons of options that you can choose from).

Update Environment Config

If you recall, in the last post we set up a .env file to store our application credentials. Before we can create a bridge, let's add some configuration to the .env file that we'll need in order to connect to the MQTT topic.

I'm connecting to an older MQTT topic that uses version 3, so my MQTT_PROTOCOL and MQTT_VERSION might be different than yours. Refer to the documentation for the MQTT node module to determine the values necessary for your server.

Create the Bridge Service

Essentially we need to consume an MQTT topic, and for each incoming message, we need to produce a new message in our AQ queue. Basically, this:

Remember, we're building upon the application that we started in the last post. Let's start by adding the mqtt module to the application.

We'll create our bridge as a service called MqttAqBridge. To get started, we'll inject the QueueService that we built in the last post. Side note: we can see here the advantage of encapsulating functionality into services since we can easily use the QueueService from multiple places within the application. We'll need the queue service to produce the incoming MQTT messages that the bridge is consuming to AQ. In the bridge class, we add an init method that we'll use to pass in credentials and meta info necessary for the bridge to communicate with the MQTT topic. Also, we initialize a few counters (incomingProcessed, outgoingSent, and sendErrors) to enable us to provide stats later on.

Next, we will add a connect() method. The MQTT client has configurable options, so let's be flexible and accept an object of options. We'll merge the options object with our credentials and pass that into the mqtt.connect() method to create a client.

Next, let's add a callback for errors. We'll print errors to the console, but in your application, you can do whatever you need to.

Now we'll subscribe to the 'connect' event. Again, just printing a message to the console here. Do whatever you need in your application!

Next, we'll subscribe to the topic:

Subscribe to the 'close' event.

Finally, we'll listen for incoming messages. Here's where the real "magic" happens, and there's not much to it. If you refer to the diagram above, you'll know that the only thing we need to do here is publish the incoming message to the outgoing AQ queue. And we do that with our QueueService

Since we broke it up into several bits above, here is the entire connect() method so you can see it in full.

Initialize the Bridge

Now we just need to initialize the bridge, pass in our credentials, and call connect(). Open up app.js, and add the following:

Test Bridge

Let's start up the app and test it out. I use a command-line tool called mosquitto_pub to easily publish messages to MQTT.

In theory, those MQTT messages should now be available on our AQ queue. If you remember in the last post, we created an endpoint to dequeue from our AQ topic. Let's make a call to the /dequeueMany endpoint and see if the messages are there:

Fantastic!! Our bridge works!

Summary

What can I say? We created a bridge between MQTT and AQ. It didn't take a lot of code. It opened up tons of possibilities for messaging in our applications. But wait, there's more! We'll look at a different kind of bridge in the next post.

Image by E. Dichtl from Pixabay




Related Posts

Building Messaging Bridges with Node-RED

Building Messaging Bridges with Node-RED

Over the last few posts, we've looked in detail at Oracle Advanced Queuing. Most recently we looked at "bridges" - or applications that helped us broker...

Bridging MQTT and Oracle Streaming Service (OSS) with Node.js

Bridging MQTT and Oracle Streaming Service (OSS) with Node.js

Recently, we've been taking a dive into messaging. In the last post, we talked about creating a messaging "bridge" so that we could consume an incoming...

Producing and Consuming Messages in Node.JS with Oracle Advanced Queuing (AQ)

Producing and Consuming Messages in Node.JS with Oracle Advanced Queuing (AQ)

If you've been following my blog posts lately, you may have noticed that I've been focusing a bit on messaging. We talked a while back about using Oracle...

Note: Comments are currently closed on this blog. Disqus is simply too bloated to justify its use with the low volume of comments on this blog. Please visit my contact page if you have something to say!