If you've been following my blog posts lately, you may have noticed that I've been focusing a bit on messaging. We talked a while back about using Oracle Advanced Queuing (AQ) with Micronaut in a Java application, and more recently we looked at how to interact with AQ via REST APIs. But there's another way to work with AQ natively that we haven't looked at yet - and that method is to use the
oracledb module with Node.JS. I certainly don't have to tell you about the popularity of Node, and using it to interact with AQ is very straightforward. So let's dig in and create an Express application that can enqueue and dequeue messages. We're going to build on this example going forward, so stay tuned for some exciting follow-up posts in the near future. If you get stuck at any point or want to dig further into this functionality, check out the documentation on using AQ with the Node oracledb module.
In this post, we're going to perform the following tasks:
Reminder! AQ is included at no charge in Autonomous DB. Also, you can turn up 2 free Autonomous DB instances in the Oracle Cloud "always free" tier, so running this demo (or using this code in production) will cost you absolutely nothing!!
The first thing we need to do is make sure that we have a user with the proper permissions and roles. If this is your first time working with AQ, open a SQL editor and run the following as the
Next, open a new SQL editor and connect as the newly created
aqdemouser user. We will need to create a queue table, a queue, and then start the queue. Our queue name will be
At this point, the queue is ready to
enqueue (send) and
dequeue (receive) messages. If you feel like testing the queue to make sure it's working as expected, run the following.
Enqueue a JSON string as a message:
Which should return:
PL/SQL procedure successfully completed.
Dequeue the JSON string message, parse the object, and retrieve an element from the JSON object:
How long did that take, 2 minutes? Not bad, eh?
Let's create an Express application to work with our demo queue. That'll give us the ability to expose some HTTP endpoints for enqueuing and dequeuing messages. Of course, messaging might normally happen "behind the scenes" in a microservice (or monolith) in response to user actions or other business rules. But for this demo, it gives us a nice way to test out the queue. Run the following command to quickly scaffold out an Express application (this assumes you have the Express application generator installed). We'll also install the
oracledb and (optionally) the
debug module (for pretty debug messages) while we're at it.
The application will eventually need some sensitive values (username, password, etc) to connect to the queue. Let's use
dotenv so that we can store those credentials in environment variables.
Now we can create a file called
.env in the project root, and populate our credentials.
Heads Up! To use the Node
oracledb module, you need to configure it just a bit (you'll need an Instant Client and your Autonomous DB wallet). There's a handy doc online that walks you through installing the module depending on your Operating System. Make sure you've done that first, and then use the paths to your instant client and wallet in your
.env file below.
Connect String? If you're not sure where this comes from, it's a predefined string that uses the format [dbname]_[type]. You can choose the [type] based on the level of performance and concurrency required for your application. A list of these values can be found in the
tnsnames.ora file inside your wallet.
Let's move on to creating a service class that will encapsulate all of our queuing activities. We'll be able to inject this service into our router (and other places in the application) later on.
QueueService is a basic class that will encapsulate our DB work related to our queue. As such, we'll need to include the
oracledb module and store our credentials in the class for use from the methods that we'll add in just a bit. Create the class, and add an
init() method (we'll pass the values in later on when we instantiate this class).
We're going to use a lot of async/await code in this class in order to prevent having to litter the class with a bunch of callbacks. It'll also be handy to add a "helper" method to retrieve a connection pool so that we don't have to do that in each method, so let's add an async method called
getPool() to the
And since we want to make sure things are cleaned up when we are done, add a method to close it.
We'll want to add a method that will enable us to produce a single message into the queue. To do this, we need to get the connection pool (
this.getPool()), grab a connection to the DB from that pool (
pool.getConnection()), and then get our queue from the connection (
connection.getQueue(this.queueName)). Once we have the queue, we can produce a message (
queue.enqOne()), passing it a string that contains the message (in this case, an object that is converted to a JSON string). Then we commit the transaction (
connection.commit()) and close the connection (
We can also enqueue an array of messages all at once if we need to by using the
queue.enqMany() method. For more information on this, refer to the documentation.
Next, let's add a method to dequeue a single message. The process here is similar to enqueuing - get the pool, a connection, the queue, call
queue.deqOne(), commit and close. Notice that we're setting a value of
oracledb.AQ_DEQ_NO_WAIT into our queue's options via
queue.deqOptions.wait. If we didn't set this option, the queue would wait for an available message before returning, which is not what we want for this demo.
Just as with enqueuing, we can dequeue multiple messages at the same time. Let's add a method to dequeue an array of messages. It's almost identical to the
dequeueOne() method above, except that we're calling
deqMany() on the queue instead of
deqOne(). Note that we can limit the number of messages dequeued with each call by passing an integer to the
And that does it for the
QueueService. It's ready to do its work.
Before we can use the
QueueService, we need to set our credentials into it. Recall that we have our credentials set as environment variables, so we just need to pass them in at runtime to the service via the
init() method. Open up the
app.js file and do that like so:
While we're here, let's make sure that we properly close the pool when the app shuts down.
So our app is configured, and our service is created and initialized. Now we can expose a few endpoints to let us interact! In
index.js, inject the initialized
Create an endpoint to POST a message to the queue.
We can test this by POSTing a few messages via cURL:
If you want, add
-i to the cURL request to view the response headers (confirming the
201 status response):
Create an endpoint to GET a single message.
A quick cURL to test it:
Add endpoint to GET an array of messages:
Another test to confirm that we get an array back (containing the remaining 2 enqueued messages):
As a special added bonus, we can also add a
/dequeueStream endpoint that uses Server-Sent Events to return a constant stream of messages. Here we simply create an interval that tries to dequeue a single message every second. If a message exists, we write it to the open stream. When the client disconnects, we clear the interval and call
We can also test this in cURL. Open a request like so and observe it for a few seconds.
Notice that there is no data coming through (because there are no pending messages in the queue). Now open a separate cURL window and try posting a few messages:
Observe the stream and you'll notice the incoming messages!
In this post, we created an AQ queue and an Express application that produces and consumes messages from that queue. Stay tuned for future posts where we'll look at building upon this application to allow it to act as a "bridge" between AQ and other messaging protocols.
Over the last few posts, we've looked in detail at Oracle Advanced Queuing. Most recently we looked at "bridges" - or applications that helped us broker...
Recently, we've been taking a dive into messaging. In the last post, we talked about creating a messaging "bridge" so that we could consume an incoming...
In my last post, we looked at how to use the Node oracledb module to produce (enqueue) and consume (dequeue) messages from an Oracle Advanced Queuing (...