Recently, we've been taking a dive into messaging. In the last post, we talked about creating a messaging "bridge" so that we could consume an incoming MQTT topic and produce an outgoing stream of messages to an Oracle Advanced Queuing (AQ) queue. It worked, and it adequately addressed the challenge of communicating between messaging protocols within our application architecture. In this post, let's take a similar approach, but instead of producing messages to AQ, we'll publish to an Oracle Streaming Service (OSS) topic. I've certainly talked about OSS before on this blog, so you are hopefully already familiar with it, but if not you can think of it as a real-time, serverless event streaming platform that just happens to be compatible with Apache Kafka. When it comes to messaging, I've personally found that managed options are quite often the best option. As a developer, I don't have a whole lot of time to deploy and maintain a messaging platform, so having a service that takes care of that and allows me to interact with topics/streams via an open-source API (such as the Kafka SDK) is a win in my book.
To help you visualize things, here's a high-level overview of the bridge architecture. It'll look pretty similar to the MQTT-AQ bridge we talked about in the last post.
The cool part about this particular bridge is that it takes even less work to implement than the MQTT-AQ bridge. In fact, other than configuration (via a JSON file), it's only 7 lines of code. This time, we'll rely on an existing Node module instead of writing the low-level code ourselves to broker the message between protocols. But we'll look at the internals of that module to see how it does what it does so that we can do it ourselves if we need to.
Like the last post, we'll assume that we already have an MQTT server setup to handle incoming messages. Again, if you need to set up your own you can refer to my post about running RabbitMQ in an "always free" instance in the Oracle Cloud, my post about running Mosquitto in the Oracle Cloud, or choose your favorite and get it up and running.
For this bridge, we'll take advantage of the MQTT-Kafka bridge project by nodefluent.
Before I could install the mqtt-to-kafka-bridge
module, I had to first tell the linker where to find OpenSSL on my machine (since I'm using a Mac). To do this, I set the following two variables in my shell:
See install notes if on Mac: https://www.npmjs.com/package/node-rdkafka
Create a stream. Search for 'Streaming' in the search bar and click on the result in 'Services'.
On the stream list page, click on 'Create Stream'.
Name the stream (#1), choose 'Select Existing Stream Pool' (#2), choose 'Default Pool' (#3), and specify the message retention period (#4) and number of partitions (#5).
Once the stream is created, copy the stream pool OCID from the stream details page. We will need this, later on, to connect with the Kafka SDK.
In order to create our configuration for our bridge, we will need to create a dedicated service user and generate an auth token for that user.
Create a dedicated user for the streaming service. Search for 'Users' in the search bar and click on the result in 'Services'.
Click 'Create User' and populate the dialog to create an IAM user for the streaming service.
After the new user is created, go to the user details page and generate an auth token:
Copy the token and keep it handy - you can not retrieve it after you leave the 'Generate Token' dialog. Now, we'll need to create a group, add the new user to that group, and create a group policy:
Now we have a user with an auth token, in a group that can work with streams!
At this point we can get down to the business of creating the bridge. The MQTT-Kafka bridge module requires us to create a config file to define our connection information. Create a file called config.js
and populate it as such. We'll add more config in just a second.
Now we add a block to configure our credentials for MQTT. Substitute your credentials here, obviously!
Next, we'll define our OSS configuration in the kafka
block. When we're using the Kafka compatible endpoint for OSS, we need to define our username in the following format:[tenancyName]/[username]/[stream pool OCID]
. Your endpoint might also be different, depending on your region. See the docs if you get stuck at any point!
Add a routing block to define which MQTT queue(s) get routed to which OSS topics.
That's the credentials portion of the config. There are more configuration options to set which you can find in the sample on GitHub. Now we can create the server. If you remember from above, that's a whole 7 lines of code. Create index.js
and populate:
That's it. That's the whole bridge.
Let's test it out! Run the server with npm start
, and observe the console.
We're ready to produce a message to MQTT and see if it gets published to our topic in OSS! Using your favorite CLI tool, publish a new message to the topic that you configured the bridge to listen on.
You'll notice the bridge processing the incoming message from MQTT.
Now check your stream in the Oracle Cloud Console:
Success! We have implemented an MQTT-OSS bridge!
We took a fun journey across a bridge between MQTT and Oracle Streaming Service (OSS). This helps our applications communicate between incompatible profiles which makes them flexible and easy to work with from our favorite languages and frameworks.
Photo by Umer Sayyam on Unsplash
Over the last few posts, we've looked in detail at Oracle Advanced Queuing. Most recently we looked at "bridges" - or applications that helped us broker...
In my last post, we looked at how to use the Node oracledb module to produce (enqueue) and consume (dequeue) messages from an Oracle Advanced Queuing (...
I recently blogged about a brand-new service in the Oracle Cloud - Database Tools. It's a safe, secure way to store your database credentials in the...