recursive.codes

recursive.codes

recursive.codes


The Personal Blog of Todd Sharp

Producing and Consuming Messages in Node.JS with Oracle Advanced Queuing (AQ)

Posted By: Todd Sharp on 11/19/2021 8:00 GMT
Tagged: Messaging, Node

If you've been following my blog posts lately, you may have noticed that I've been focusing a bit on messaging. We talked a while back about using Oracle Advanced Queuing (AQ) with Micronaut in a Java application, and more recently we looked at how to interact with AQ via REST APIs. But there's another way to work with AQ natively that we haven't looked at yet - and that method is to use the oracledb module with Node.JS. I certainly don't have to tell you about the popularity of Node, and using it to interact with AQ is very straightforward. So let's dig in and create an Express application that can enqueue and dequeue messages. We're going to build on this example going forward, so stay tuned for some exciting follow-up posts in the near future. If you get stuck at any point or want to dig further into this functionality, check out the documentation on using AQ with the Node oracledb module.

In this post, we're going to perform the following tasks:

Reminder! AQ is included at no charge in Autonomous DB. Also, you can turn up 2 free Autonomous DB instances in the Oracle Cloud "always free" tier, so running this demo (or using this code in production) will cost you absolutely nothing!!

Create Queue

The first thing we need to do is make sure that we have a user with the proper permissions and roles. If this is your first time working with AQ, open a SQL editor and run the following as the admin user.

Next, open a new SQL editor and connect as the newly created aqdemouser user. We will need to create a queue table, a queue, and then start the queue. Our queue name will be AQDEMOUSER.MQTT_BRIDGE_QUEUE (we'll need to use this later on in our JavaScript code, so keep it handy).

At this point, the queue is ready to enqueue (send) and dequeue (receive) messages. If you feel like testing the queue to make sure it's working as expected, run the following.

Enqueue a JSON string as a message:

Which should return:

PL/SQL procedure successfully completed.

Dequeue the JSON string message, parse the object, and retrieve an element from the JSON object:

Which returns:

How long did that take, 2 minutes? Not bad, eh?

Create Application

Let's create an Express application to work with our demo queue. That'll give us the ability to expose some HTTP endpoints for enqueuing and dequeuing messages. Of course, messaging might normally happen "behind the scenes" in a microservice (or monolith) in response to user actions or other business rules. But for this demo, it gives us a nice way to test out the queue. Run the following command to quickly scaffold out an Express application (this assumes you have the Express application generator installed). We'll also install the oracledb and (optionally) the debug module (for pretty debug messages) while we're at it.

The application will eventually need some sensitive values (username, password, etc) to connect to the queue. Let's use dotenv so that we can store those credentials in environment variables.

Now we can create a file called .env in the project root, and populate our credentials. 

Heads Up! To use the Node oracledb module, you need to configure it just a bit (you'll need an Instant Client and your Autonomous DB wallet). There's a handy doc online that walks you through installing the module depending on your Operating System. Make sure you've done that first, and then use the paths to your instant client and wallet in your .env file below.

Connect String? If you're not sure where this comes from, it's a predefined string that uses the format [dbname]_[type]. You can choose the [type] based on the level of performance and concurrency required for your application. A list of these values can be found in the tnsnames.ora file inside your wallet.

Let's move on to creating a service class that will encapsulate all of our queuing activities. We'll be able to inject this service into our router (and other places in the application) later on.

Create Service

The QueueService is a basic class that will encapsulate our DB work related to our queue. As such, we'll need to include the oracledb module and store our credentials in the class for use from the methods that we'll add in just a bit. Create the class, and add an init() method (we'll pass the values in later on when we instantiate this class).

We're going to use a lot of async/await code in this class in order to prevent having to litter the class with a bunch of callbacks. It'll also be handy to add a "helper" method to retrieve a connection pool so that we don't have to do that in each method, so let's add an async method called getPool() to the QueueService.

And since we want to make sure things are cleaned up when we are done, add a method to close it.

Add Enqueue (Single) Method

We'll want to add a method that will enable us to produce a single message into the queue. To do this, we need to get the connection pool (this.getPool()), grab a connection to the DB from that pool (pool.getConnection()), and then get our queue from the connection (connection.getQueue(this.queueName)). Once we have the queue, we can produce a message (queue.enqOne()), passing it a string that contains the message (in this case, an object that is converted to a JSON string). Then we commit the transaction (connection.commit()) and close the connection (connection.close()).

We can also enqueue an array of messages all at once if we need to by using the queue.enqMany() method. For more information on this, refer to the documentation.

Add Dequeue (Single) Method

Next, let's add a method to dequeue a single message. The process here is similar to enqueuing - get the pool, a connection, the queue, call queue.deqOne(), commit and close. Notice that we're setting a value of oracledb.AQ_DEQ_NO_WAIT  into our queue's options via queue.deqOptions.wait. If we didn't set this option, the queue would wait for an available message before returning, which is not what we want for this demo.

Add Dequeue (Many) Method

Just as with enqueuing, we can dequeue multiple messages at the same time. Let's add a method to dequeue an array of messages. It's almost identical to the dequeueOne() method above, except that we're calling deqMany() on the queue instead of deqOne(). Note that we can limit the number of messages dequeued with each call by passing an integer to the deqMany() method.

And that does it for the QueueService. It's ready to do its work.

Initialize the Queue Service

Before we can use the QueueService, we need to set our credentials into it. Recall that we have our credentials set as environment variables, so we just need to pass them in at runtime to the service via the init() method. Open up the app.js file and do that like so:

While we're here, let's make sure that we properly close the pool when the app shuts down.

Add HTTP Endpoints to Queue/Dequeue

So our app is configured, and our service is created and initialized. Now we can expose a few endpoints to let us interact! In index.js, inject the initialized QueueService.

Add Enqueue Endpoint

Create an endpoint to POST a message to the queue.

We can test this by POSTing a few messages via cURL:

If you want, add -i to the cURL request to view the response headers (confirming the 201 status response):

Add Dequeue (Single) Endpoint

Create an endpoint to GET a single message.

A quick cURL to test it:

Add Dequeue (Many) Endpoint

Add endpoint to GET an array of messages:

Another test to confirm that we get an array back (containing the remaining 2 enqueued messages):

Bonus: Dequeue Messages in a Stream!

As a special added bonus, we can also add a /dequeueStream endpoint that uses Server-Sent Events to return a constant stream of messages. Here we simply create an interval that tries to dequeue a single message every second. If a message exists, we write it to the open stream. When the client disconnects, we clear the interval and call res.end().

We can also test this in cURL. Open a request like so and observe it for a few seconds.

Notice that there is no data coming through (because there are no pending messages in the queue). Now open a separate cURL window and try posting a few messages:

Observe the stream and you'll notice the incoming messages!

Summary

In this post, we created an AQ queue and an Express application that produces and consumes messages from that queue. Stay tuned for future posts where we'll look at building upon this application to allow it to act as a "bridge" between AQ and other messaging protocols.

Image by jplenio from Pixabay



Related Posts

Building Messaging Bridges with Node-RED

Building Messaging Bridges with Node-RED

Over the last few posts, we've looked in detail at Oracle Advanced Queuing. Most recently we looked at "bridges" - or applications that helped us broker...

Bridging MQTT and Oracle Streaming Service (OSS) with Node.js

Bridging MQTT and Oracle Streaming Service (OSS) with Node.js

Recently, we've been taking a dive into messaging. In the last post, we talked about creating a messaging "bridge" so that we could consume an incoming...

Creating an MQTT to Oracle Advanced Queuing (AQ) Bridge in Node

Creating an MQTT to Oracle Advanced Queuing (AQ) Bridge in Node

In my last post, we looked at how to use the Node oracledb module to produce (enqueue) and consume (dequeue) messages from an Oracle Advanced Queuing (...

Note: Comments are currently closed on this blog. Disqus is simply too bloated to justify its use with the low volume of comments on this blog. Please visit my contact page if you have something to say!