1. Address spaces and addresses
1.1. Address space
An address space is a group of addresses that can be accessed through a single connection (per protocol). This means that clients connected to the endpoints of an address space can send messages to or receive messages from any authorized address within that address space. An address space can support multiple protocols, as defined by the address space type.
AMQ Online has two types of address spaces:
1.2. Address
An address is part of an address space and represents a destination for sending and receiving messages. An address has a type, which defines the semantics of sending messages to and receiving messages from that address.
The types of addresses available in AMQ Online depend on the address space type.
1.3. Standard address space
The standard address space is the default address space in AMQ Online. It consists of an AMQP router network in combination with attachable storage units. Clients connect to a message router, which forwards messages to or from one or more message brokers. This address space type is appropriate when you have many connections and addresses. However, the standard address space has the following limitations:
-
No transaction support
-
No message ordering
-
No selectors on queues
-
No browsing on queues
-
No message groups
Clients connect and send and receive messages in this address space using the AMQP or MQTT protocols. Note that MQTT does not support qos2 or retained messages.
1.3.1. Standard address types
The standard address space supports five different address types:
-
queue
-
topic
-
anycast
-
multicast
-
subscription
Queue
The queue address type is a store-and-forward queue. This address type is appropriate for implementing a distributed work queue, handling traffic bursts, and other use cases when you want to decouple the producer and consumer. A queue can be sharded across multiple storage units. Message ordering might be lost for queues in the standard address space.
Regarding dead-letter queues (DLQs), you can determine if any messages are stored in a DLQ by logging in to the AMQ Console and viewing the Addresses page. To resolve this situation, you must connect to a client and consume from a DLQ address.
Topic
The topic address type supports the publish-subscribe messaging pattern where there are 1..N producers and 1..M consumers. Each message published to a topic address is forwarded to all subscribers for that address. A subscriber can also be durable, in which case messages are kept until the subscriber has acknowledged them.
Note
|
If you create a subscription on a topic, any senders to that topic must specify the topic capability.
|
Hierarchical topics and wildcards
A client receiving from a topic address can specify a wildcard address with the topic address as the root. The wildcard behavior follows the MQTT syntax:
-
/
is a separator -
+
matches one level -
#
matches one or more levels
So, for example:
-
a/#/b
matchesa/foo/b
,a/bar/b
, anda/foo/bar/b
-
a/+/b
matchesa/foo/b
anda/bar/b
, but would not matcha/foo/bar
In the standard address space, the first level must always be a defined topic address; that is, #
and +
are not valid as the first characters of a subscribing address.
Known issue with creating a subscriber on a hierarchical topic
A known issue exists where creating a subscriber on a hierarchical topic in AMQ Online causes the broker to instead create it as a competing consumer (handling the address like a queue rather than a topic). For more information about the specific workaround for your client, see the applicable client example section in Connecting applications to AMQ Online.
Anycast
The anycast address type is a scalable direct address for sending messages to one consumer. Messages sent to an anycast address are not stored, but are instead forwarded directly to the consumer. This method makes this address type ideal for request-reply (RPC) uses or even work distribution. This is the cheapest address type as it does not require any persistence.
Multicast
The multicast address type is a scalable direct address for sending messages to multiple consumers. Messages sent to a multicast address are forwarded to all consumers receiving messages on that address. Because message acknowledgments from consumers are not propagated to producers, only pre-settled messages can be sent to multicast addresses.
Subscription
The subscription address type allows a subscription to be created for a topic that holds messages published to the topic even if the subscriber is not attached. The subscription is accessed by the consumer using <topic-address>::<subscription-address>. For example, for a subscription mysub
on a topic mytopic
the consumer consumes from the address mytopic::mysub
.
1.4. Brokered address space
The brokered address space is designed to support broker-specific features, at the cost of limited scale in terms of the number of connections and addresses. This address space supports JMS transactions, message groups, and selectors on queues and topics.
Clients can connect as well as send and receive messages in this address space using the following protocols:
-
AMQP
-
CORE
-
OpenWire
-
MQTT
-
STOMP
1.4.1. Brokered address types
The brokered address space supports two address types:
-
queue
-
topic
Queue
The queue address type is a store-and-forward queue. This address type is appropriate for implementing a distributed work queue, handling traffic bursts, and other use cases where you want to decouple the producer and consumer. A queue in the brokered address space supports selectors, message groups, transactions, and other JMS features. Message order can be lost with released messages.
Topic
The topic address type supports the publish-subscribe messaging pattern in which there are 1..N producers and 1..M consumers. Each message published to a topic address is forwarded to all subscribers for that address. A subscriber can also be durable, in which case messages are kept until the subscriber has acknowledged them.
Hierarchical topics and wildcards
A client receiving from a topic address can specify a wildcard address with the topic address as the root. The wildcard behavior follows the MQTT syntax:
-
/
is a separator -
+
matches one level -
#
matches one or more levels
So, for example:
-
a/#/b
matchesa/foo/b
,a/bar/b
,a/foo/bar/b
-
a/+/b
matchesa/foo/b
anda/bar/b
, but would not matcha/foo/bar
Known issue with creating a subscriber on a hierarchical topic
A known issue exists where creating a subscriber on a hierarchical topic in AMQ Online causes the broker to instead create it as a competing consumer (handling the address like a queue rather than a topic). For more information about the specific workaround for your client, see the applicable client example section in Connecting applications to AMQ Online.
2. Connecting applications to AMQ Online
You can connect your application to AMQ Online using one of the following client examples.
To connect to the messaging service from outside the {KubePlatform} cluster, TLS must be used with SNI set to specify the fully qualified host name for the address space. The port used is 443.
The messaging protocols supported depends on the type of address space used. For more information about address space types, see Address space.
2.1. Client examples
2.1.1. AMQ Python example
You can use the following AMQ Python example to connect your application to AMQ Online. This example assumes you have created an address of type queue
named myqueue
.
from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
class HelloWorld(MessagingHandler):
def __init__(self, server, address):
super(HelloWorld, self).__init__()
self.server = server
self.address = address
def on_start(self, event):
conn = event.container.connect(self.server)
event.container.create_receiver(conn, self.address)
event.container.create_sender(conn, self.address)
def on_sendable(self, event):
event.sender.send(Message(body="Hello World!"))
event.sender.close()
def on_message(self, event):
print(event.message.body)
event.connection.close()
Container(HelloWorld("amqps://_messaging-route-hostname_:443", "myqueue")).run()
Known issue with creating a subscriber on a hierarchical topic
A known issue exists where creating a subscriber on a hierarchical topic in AMQ Online causes the broker to instead create it as a competing consumer (handling the address like a queue rather than a topic).
The workaround for this issue involves setting the capability "topic"
in the source.
-
In the
simple_recv.py
file, modify thefrom proton.reactor import Container
to add theReceiverOption
:
class CapabilityOptions(ReceiverOption):
def apply(self, receiver):
receiver.source.capabilities.put_object(symbol("topic"))
-
Modify the following line to add
options=CapabilityOptions()
:
def on_start(self, event):
event.container.create_receiver(conn, self.address, options=CapabilityOptions())
2.1.2. AMQ JMS example
You can use the following AMQ JMS example to connect your application to AMQ Online. This example assumes you have created an address of type queue
named myqueue
.
package org.apache.qpid.jms.example;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
public class HelloWorld {
public static void main(String[] args) throws Exception {
try {
// The configuration for the Qpid InitialContextFactory has been supplied in
// a jndi.properties file in the classpath, which results in it being picked
// up automatically by the InitialContext constructor.
Context context = new InitialContext();
ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
Destination queue = (Destination) context.lookup("myQueueLookup");
Connection connection = factory.createConnection(System.getProperty("USER"), System.getProperty("PASSWORD"));
connection.setExceptionListener(new MyExceptionListener());
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(queue);
MessageConsumer messageConsumer = session.createConsumer(queue);
TextMessage message = session.createTextMessage("Hello world!");
messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
TextMessage receivedMessage = (TextMessage) messageConsumer.receive(2000L);
if (receivedMessage != null) {
System.out.println(receivedMessage.getText());
} else {
System.out.println("No message received within the given timeout!");
}
connection.close();
} catch (Exception exp) {
System.out.println("Caught exception, exiting.");
exp.printStackTrace(System.out);
System.exit(1);
}
}
private static class MyExceptionListener implements ExceptionListener {
@Override
public void onException(JMSException exception) {
System.out.println("Connection ExceptionListener fired, exiting.");
exception.printStackTrace(System.out);
System.exit(1);
}
}
}
with jndi.properties:
connectionfactory.myFactoryLookup = amqps://messaging-route-hostname:443?transport.trustAll=true&transport.verifyHost=false
queue.myQueueLookup = myqueue
2.1.3. AMQ JavaScript example
You can use the following AMQ JavaScript example to connect your application to AMQ Online. This example assumes you have created an address of type queue
named myqueue
.
var container = require('rhea');
container.on('connection_open', function (context) {
context.connection.open_receiver('myqueue');
context.connection.open_sender('myqueue');
});
container.on('message', function (context) {
console.log(context.message.body);
context.connection.close();
});
container.on('sendable', function (context) {
context.sender.send({body:'Hello World!'});
context.sender.detach();
});
container.connect({username: 'username', password: 'password', port:443, host:'messaging-route-hostname', transport:'tls', rejectUnauthorized:false});
AMQ JavaScript example using WebSockets
var container = require('rhea');
var WebSocket = require('ws');
container.on('connection_open', function (context) {
context.connection.open_receiver('myqueue');
context.connection.open_sender('myqueue');
});
container.on('message', function (context) {
console.log(context.message.body);
context.connection.close();
});
container.on('sendable', function (context) {
context.sender.send({body:'Hello World!'});
context.sender.detach();
});
var ws = container.websocket_connect(WebSocket);
container.connect({username: 'username', password: 'password', connection_details: ws("wss://messaging-route-hostname:443", ["binary"], {rejectUnauthorized: false})});
2.1.4. AMQ C++ example
The C++ client has equivalent simple_recv
and simple_send
examples with the same options as Python. However, the C++ library does not perform the same level of processing on the URL; in particular it will not accept amqps://
to imply using TLS, so the example needs to be modified as follows:
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/default_container.hpp>
#include <proton/delivery.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/ssl.hpp>
#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/url.hpp>
#include <iostream>
#include "fake_cpp11.hpp"
class hello_world : public proton::messaging_handler {
private:
proton::url url;
public:
hello_world(const std::string& u) : url(u) {}
void on_container_start(proton::container& c) OVERRIDE {
proton::connection_options co;
co.ssl_client_options(proton::ssl_client_options());
c.client_connection_options(co);
c.connect(url);
}
void on_connection_open(proton::connection& c) OVERRIDE {
c.open_receiver(url.path());
c.open_sender(url.path());
}
void on_sendable(proton::sender &s) OVERRIDE {
proton::message m("Hello World!");
s.send(m);
s.close();
}
void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
std::cout << m.body() << std::endl;
d.connection().close();
}
};
int main(int argc, char **argv) {
try {
std::string url = argc > 1 ? argv[1] : "messaging-route-hostname:443/myqueue";
hello_world hw(url);
proton::default_container(hw).run();
return 0;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}
Known issue with creating a subscriber on a hierarchical topic
A known issue exists where creating a subscriber on a hierarchical topic in AMQ Online causes the broker to instead create it as a competing consumer (handling the address like a queue rather than a topic).
The workaround involves setting the capability "topic"
in the source.
-
In the
topic_receive.cpp
file, edit the code so that it is similar to what is shown in this example:
void on_container_start(proton::container& cont) override {
proton::connection conn = cont.connect(conn_url_);
proton::receiver_options opts {};
proton::source_options sopts {};
sopts.capabilities(std::vector<proton::symbol> { "topic" });
opts.source(sopts);
conn.open_receiver(address_, opts);
}
2.1.5. AMQ .NET example
You can use the following AMQ .NET example to connect your application to AMQ Online. This example assumes you have created an address of type queue
named myqueue
.
using System;
using Amqp;
namespace Test
{
public class Program
{
public static void Main(string[] args)
{
String url = (args.Length > 0) ? args[0] : "amqps://messaging-route-hostname:443";
String address = (args.Length > 1) ? args[1] : "myqueue";
Connection.DisableServerCertValidation = true;
Connection connection = new Connection(new Address(url));
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "test-sender", address);
Message messageSent = new Message("Test Message");
sender.Send(messageSent);
ReceiverLink receiver = new ReceiverLink(session, "test-receiver", address);
Message messageReceived = receiver.Receive(TimeSpan.FromSeconds(2));
Console.WriteLine(messageReceived.Body);
receiver.Accept(messageReceived);
sender.Close();
receiver.Close();
session.Close();
connection.Close();
}
}
}