Publish Messages to a Topic
The ITopic
/Topic
interface facilitates to publish of messages against the topic. This also provides event registrations for message delivery failure, receiving messages, and deleting topics.
It provides the Publish
method, which publishes the message to a specific topic in the cache. While publishing messages to a topic, the publisher can set the delivery option for messages, message expiration, message delivery failure notification, and topic deletion notification.
Here, we describe how to publish messages, publish messages asynchronously, publish bulk messages and publish ordered messages.
Prerequisites
- To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
- For API details, refer to: ICache, CacheItem, ITopic, Publish, GetTopic, PublishAsync, ExpirationTime, MessageDeliveryFailure, OnTopicDeleted, DeliveryOption, Message, PublishBulk, MessageFailedEventArgs, TopicDeleteEventArgs.
- To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
- For API details, refer to: Cache, CacheItem, Topic, getTopic, publish, Message, setExpirationTime, TopicListener, addMessageDeliveryFailureListener, addTopicDeletedListener, DeliveryOption, All, publishAsync, publishBulk, TopicDeleteEventArgs, MessageFailedEventArgs, MessageEventArgs.
- To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
- For API details, refer to: Cache, CacheItem, Topic, getTopic, publish, getMessagingService, TimeSpan, setExpirationTime, TopicListener, addMessageDeliveryFailureListener, addTopicDeletedListener, DeliveryOption, publishBulk, MessageReceivedListener.
- To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
- For API details, refer to: Cache, CacheItem, get_topic, get_messaging_service, TimeSpan, set_expiration_time, add_message_delivery_failure_listener, add_topic_deleted_listener, publish_async, publish_bulk, MessageEventArgs, get_topic_name, MessageFailedEventArgs, get_message_failure_reason, TopicDeleteEventArgs.
Publish Messages
The following code sample does the following:
- Create dedicated topics for Order related messages.
- Register
MessageDeliveryFailure
event for topic.
- Register
OnTopicDeleted
event for topic.
- Create messages for each topic, enabling expiration and delivery options.
- Publish the messages.
try
{
// Pre-Condition: Cache is already connected
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageReceived;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
orderTopic.Publish(orderMessage, DeliveryOption.All, true);
}
else
{
// No topic exists
}
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.MESSAGE_ID_ALREADY_EXISTS)
{
// Message ID already exists, specify a new ID
}
if (ex.ErrorCode == NCacheErrorCodes.TOPIC_DISPOSED)
{
// Specified topic has been disposed
}
if (ex.ErrorCode == NCacheErrorCodes.PATTERN_BASED_PUBLISHING_NOT_ALLOWED)
{
// Message publishing on pattern based topic is not allowed
// Get non-pattern based topic
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any other generic exception like ArgumentNullException or ArgumentException
// Topic name is null/empty
}
try {
// Precondition: Cache is already connected
// Already existing topic
String topicName = "orderTopic";
// Get topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
// Create object to be sent in the message
Order order = fetchOrdersFromDB(1100);
// Create new message
Message orderMessage = new Message(order);
TimeSpan expiryTime = new TimeSpan(12, 12, 12);
// Set expiration time of the message
orderMessage.setExpirationTime(expiryTime);
// Register message delivery failure
MyTopicListener topicListener = new MyTopicListener();
orderTopic.addMessageDeliveryFailureListener(topicListener);
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
orderTopic.publish(orderMessage, DeliveryOption.All, true);
} else {
// No topic exists
}
} catch (OperationFailedException exception) {
if (exception.getErrorCode() == NCacheErrorCodes.MESSAGE_ID_ALREADY_EXISTS) {
// Message ID already exists. Specify a new ID
}
if (exception.getErrorCode() == NCacheErrorCodes.TOPIC_DISPOSED) {
// Specified topic has been disposed
}
if (exception.getErrorCode() == NCacheErrorCodes.PATTERN_BASED_PUBLISHING_NOT_ALLOWED) {
// Message publishing on pattern based topic is not allowed
// Get non-pattern based topic
} else {
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
} catch (Exception exception) {
// Any generic exception like IllegalArgumentException or NullPointerException
}
try {
// Precondition: Cache is already connected
// Already existing topic
val topicName = "orderTopic"
// Get topic
val orderTopic = cache.getMessagingService.getTopic(topicName)
if (topicName != null) {
// Create object to be sent in the message
val order = fetchOrderFromDB(1100)
// Create new message
val orderMessage = Message(order)
val expiryTime = new TimeSpan(12, 12, 12)
// Set expiration time of the message
orderMessage.setExpirationTime(expiryTime)
// Register message delivery failure
val topicListener = TopicEventListener()
orderTopic.addMessageDeliveryFailureListener(topicListener)
orderTopic.addTopicDeletedListener(topicListener)
// Publish the order with delivery option set as All
// and register message delivery failure
orderTopic.publish(orderMessage, DeliveryOption.All, true)
}
else {
// No topic exists
}
}
catch {
case exception: Exception => {
// Handle any errors
}
}
// This is an async method
try {
// Precondition: Cache is already connected
// Topic "orderTopic" exists in the cache
let topicName = "orderTopic";
// Get the topic
let orderTopic = await ncache.getMessagingService().getTopic(topicName);
if (!(orderTopic == null)) {
// Create object to be sent in message
let order = await this.fetchOrdersFromDb(this.orderId);
// Create a new message with the object order
let orderMessage = new ncache.Message(order);
// Set the expiration time of the message
let expiryTime = new ncache.TimeSpan(12, 12, 12);
orderMessage.setExpirationTime(expiryTime);
let topicListener = new ncache.MyTopicListener();
// Register message delivery failure
orderTopic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
orderTopic.publish(orderMessage, ncache.DeliveryOption.All, true);
} else {
// No topic exists
}
} catch (error) {
// Handle any errors
}
try:
# Precondition: Cache is already connected
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create object to be sent in message
order = fetch_order_from_db("1001")
# Create a new message with the object order
order_message = ncache.Message(order)
# Set the expiration time of the message
expiry_time = ncache.TimeSpan(12, 12, 12)
order_message.set_expiration_time(expiry_time)
# Register message delivery failure
order_topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
order_topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
order_topic.publish(order_message, ncache.DeliveryOption.ALL, True)
else:
# No topic exists
print("Topic not found")
except Exception as exp:
# Handle errors
Note
To ensure the operation is fail safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Publish Asynchronously
Messages can be published on the topic asynchronously using PublishAsync so that the application does not wait for the operation completion to perform the next operation. The user has the control returned to them immediately for further processing.
The following example lets you publish a message asynchronously.
// Topic "orderTopic" exists in cache
string topicName = "orderTopic";
// Get the topic
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (orderTopic != null)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Register message delivery failure
orderTopic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
orderTopic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
Task task = orderTopic.PublishAsync(orderMessage, DeliveryOption.All, true);
if(task.IsFaulted)
{
// Task Failed
}
}
else
{
// No topic exists
}
// Topic orderTopic already exists in the cache
String topicName = "orderTopic";
// Get the topic
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (orderTopic != null) {
// Create object to be sent in the message
Order order = fetchOrdersFromDB(1100);
// Create new message
Message orderMessage = new Message(order);
TimeSpan expiryTime = new TimeSpan(12, 12, 12);
// Set expiration time of the message
orderMessage.setExpirationTime(expiryTime);
// Register message delivery failure
MyTopicListener topicListener = new MyTopicListener();
orderTopic.addMessageDeliveryFailureListener(topicListener);
orderTopic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
TimeScheduler.Task task = (TimeScheduler.Task) orderTopic.publishAsync(orderMessage, DeliveryOption.All, true);
if (task.IsCancelled()) {
// task cancelled
}
} else {
// No topic exists
}
// Topic orderTopic already exists in the cache
val topicName = "orderTopic"
// Get the topic
val orderTopic = cache.getMessagingService.getTopic(topicName)
if (orderTopic != null) {
// Create object to be sent in the message
val order = fetchOrderFromDB(1100)
// Create new message
val orderMessage = new Message(order)
val expiryTime = new TimeSpan(12, 12, 12)
// Set expiration time of the message
orderMessage.setExpirationTime(expiryTime)
// Register message delivery failure
val topicListener = TopicEventListener()
orderTopic.addMessageDeliveryFailureListener(topicListener)
orderTopic.addTopicDeletedListener(topicListener)
// Publish the order with delivery option set as All
// and register message delivery failure
val task = orderTopic.publishAsync(orderMessage, DeliveryOption.All, true)
if (task.isCompleted) {
// task completed
}
}
else {
// No topic exists
}
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
order_topic = cache.get_messaging_service().get_topic(topic_name)
if order_topic is not None:
# Create object to be sent in message
order = fetch_order_from_db("1001")
# Create a new message with the object order
order_message = ncache.Message(order)
# Set the expiration time of the message
expiry_time = ncache.TimeSpan(12, 12, 12)
order_message.set_expiration_time(expiry_time)
# Register message delivery failure
order_topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
order_topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
task = order_topic.publish_async(order_message, ncache.DeliveryOption.ALL, True)
# Use this task object as per your business logic
else:
# No topic exists
print("Topic not found")
Publish Bulk Messages
Multiple messages can be published in a single call using the PublishBulk method. This improves the performance and memory usage as a bulk of messages will be combined and published in a single call.
The code below takes an instance of an already created topic orderTopic
, and shows the bulk publishing of messages to the topic.
// Topic "orderTopic" exists in cache
ITopic topic = cache.MessagingService.GetTopic("orderTopic");
if (topic != null)
{
// create dictionary for storing bulk
List<Tuple<Message, DeliveryOption>> messageList = new List<Tuple<Message, DeliveryOption>>();
Order[] orders = FetchOrdersFromDB();
for (int i = 0; i < 100; i++)
{
Message message = new Message(orders[i]);
message.ExpirationTime = TimeSpan.FromSeconds(10000);
messageList.Add(new Tuple<Message, DeliveryOption>(message, DeliveryOption.All));
}
// Register message delivery failure
topic.MessageDeliveryFailure += OnFailureMessageRecieved;
//Register topic deletion notification
topic.OnTopicDeleted = TopicDeleted;
// Publish the order with delivery option set as all
// and register message delivery failure
// In case of failed publishing of messages, exceptions
// will be returned
IDictionary<Message, Exception> keys = topic.PublishBulk(messageList, true);
}
// topic already exists
String topicName = "orderTopic";
String customerID = "DUMON";
Topic topic = cache.getMessagingService().getTopic(topicName);
if (topic != null) {
// create dictionary for storing bulk
Map messageMap = new HashMap();
Order[] orders = fetchOrdersFromDb(customerID);
for (int i = 0; i < 100; i++) {
Message message = new Message(orders[i]);
message.setExpirationTime(TimeSpan.FromSeconds(10000));
messageMap.put(message, DeliveryOption.All);
}
MyTopicListener topicListener = new MyTopicListener();
// Register message delivery failure
topic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
topic.addTopicDeletedListener(topicListener);
Map<Message, Exception> keys = topic.publishBulk(messageMap, true);
} else {
// topic is null
}
// topic already exists
val topicName = "orderTopic"
val customerID = "DUMON"
val topic = cache.getMessagingService.getTopic(topicName)
if (topic != null) { // create dictionary for storing bulk
var messageMap: Map[Message, DeliveryOption] = Map()
val orders = fetchOrderFromDB(customerID)
for (i <- 0 until 100) {
val message = new Message(orders(i))
message.setExpirationTime(TimeSpan.FromSeconds(10000))
messageMap = messageMap + (message -> DeliveryOption.All)
}
val topicListener = TopicEventListener()
// Register message delivery failure
topic.addMessageDeliveryFailureListener(topicListener)
// Register topic deletion notification
topic.addTopicDeletedListener(topicListener)
val keys = topic.publishBulk(messageMap, true)
}
else {
// topic is null
}
// This is an async method
// Topic "orderTopic" exists in the cache
let topicName = "orderTopic";
// Get the topic
let topic = ncache.getMessagingService().getTopic(topicName);
if (topic != null) {
// Create Map for storing messages in bulk
let messageMap = new Map();
let orders = this.fetchOrdersByCustomerId(this.customerId);
var i;
for (i = 0; i < 100; i++) {
let message = new ncache.Message(orders[i]);
message.setExpirationTime(ncache.TimeSpan.FromSeconds(10000));
messageMap.set(message, ncache.DeliveryOption.All);
}
let topicListener = new ncache.TopicListener();
// Register message delivery failure
topic.addMessageDeliveryFailureListener(topicListener);
// Register topic deletion notification
topic.addTopicDeletedListener(topicListener);
// Publish the order with delivery option set as All
// and register message delivery failure
// In case of failed publishing of messages, exception will be returned
let keys = topic.publishBulk(messageMap, true);
} else {
// No topic exists
}
# Topic "orderTopic" exists in the cache
topic_name = "orderTopic"
# Get the topic
topic = cache.get_messaging_service().get_topic(topic_name)
if topic is not None:
# Create Dict for storing messages in bulk
messages_map = {}
orders = fetch_orders_by_customer_id("ALFKI")
for i in range(0, 100):
message = ncache.Message(orders[++i])
message.set_expiration_time(ncache.TimeSpan.from_seconds(10000))
messages_map[message] = ncache.DeliveryOption.ALL
# Register message delivery failure
topic.add_message_delivery_failure_listener(topic_listener)
# Register topic deletion notification
topic.add_topic_deleted_listener(topic_listener)
# Publish the order with delivery option set as All
# and register message delivery failure
# In case of failed publishing of messages, exception will be returned
keys = topic.publish_bulk(messages_map, True)
else:
# No topic exists
print("Topic not found")
Publish Ordered Messages
Note
This feature is only available in NCache 5.2 and onward
Messages can be published by mentioning a sequence name resulting in the messages being published in a specific order. In order to specify ordered messages, a string sequence name is added with the chain of the messages that makes sure to publish all the messages belonging to a specific sequence name on the same server node.
In the example given below, a sequence name is added with the messages, and the messages are then published using the Publish
method.
// Specify the topic name that already exists
string topicName = "orderTopic";
// Get the topic with the specified name
ITopic orderTopic = cache.MessagingService.GetTopic(topicName);
if (topicName != null)
{
for (int i = 0; i < 30; i++)
{
// Create the object to be sent in message
Order order = FetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Specify a unique sequence name for the messages
string sequenceName = "OrderMessages";
// Set the expiration time of the message
orderMessage.ExpirationTime = TimeSpan.FromSeconds(5000);
// Publish message with the sequence name
orderTopic.Publish(orderMessage, DeliveryOption.All, sequenceName, true);
}
}
else
{
// No topic found
}
// Specify the topic name that already exists
String topicName = "orderTopic";
// Get the topic with the specified name
Topic orderTopic = cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
for (int i = 0; i < 30; i++) {
// Create the object to be sent in the message
Order order = fetchOrdersFromDB(10248);
// Create the new message with the object order
var orderMessage = new Message(order);
// Specify a unique sequence name for the message
String sequenceName = "OrderMessage";
// Set the expiration time of the message
orderMessage.setExpirationTime(TimeSpan.FromSeconds(5000));
// Publish message with the sequence name
orderTopic.publish(orderMessage, DeliveryOption.All, sequenceName, true);
}
} else {
// No topic found
}
// Specify the topic name that already exists
val topicName = "orderTopic"
// Get the topic with the specified name
val orderTopic = cache.getMessagingService.getTopic(topicName)
if (topicName != null)
for (i <- 0 until 30) { // Create the object to be sent in the message
val order = fetchOrderFromDB(10248)
// Create the new message with the object order
val orderMessage = Message(order)
// Specify a unique sequence name for the message
val sequenceName = "OrderMessage"
// Set the expiration time of the message
orderMessage.setExpirationTime(TimeSpan.FromSeconds(5000))
// Publish message with the sequence name
orderTopic.publish(orderMessage, DeliveryOption.All, sequenceName, true)
}
else {
// No topic found
}
// This is an async method
// Specify the topic name that already exists
let topicName = "orderTopic";
// Get the topic with the specified name
let orderTopic = await this.cache.getMessagingService().getTopic(topicName);
if (topicName != null) {
var i;
for (i = 0; i < 30; i++) {
// Create the object to be sent in the message
let order = await this.fetchOrderFromDB(10248);
// Create the new message with the object order
var orderMessage = new ncache.Message(order);
// Specify a unique sequence name for the message
let sequenceName = "OrderMessage";
// Set the expiration time of the message
orderMessage.setExpirationTime(ncache.TimeSpan.FromSeconds(5000));
// Publish message with the sequence name
orderTopic.publish(
orderMessage,
ncache.DeliveryOption.All,
sequenceName,
true
);
}
} else {
// No topic found
}
# Specify the topic name that already exists
topic_name = "orderTopic"
# Get the topic with the specified name
order_topic = cache.get_messaging_service().get_topic(topic_name)
if topic_name is not None:
for i in range(0, 30):
# Create the object to be sent in the message
order = fetch_order_from_db(10248)
# Create the new message with the object order
order_message = ncache.Message(order)
# Specify a unique sequence name for the message
sequence_name = "OrderMessage"
# Set the expiration time of the message
order_message.set_expiration_time(ncache.TimeSpan.from_seconds(5000))
# Publish message with the sequence name
order_topic.publish(order_message, ncache.DeliveryOption.ALL, True, sequence_name)
else:
# No topic found
print("Topic not found")
Register Callbacks
private void OnFailureMessageReceived(object sender, MessageFailedEventArgs args)
{
// Failure reason can be get from args.MessageFailureReason
}
private void TopicDeleted(object sender, TopicDeleteEventArgs args)
{
// Deleted topic is args.TopicName
}
@Override
public void onTopicDeleted(Object sender, TopicDeleteEventArgs args) {
// Deleted topic is args.getTopicName();
}
@Override
public void onMessageDeliveryFailure(Object sender, MessageFailedEventArgs args) {
// Failure reason is args.getMessageFailureReason();
}
@Override
public void onMessageReceived(Object sender, MessageEventArgs args) {
// Perform operations
}
class MessageListener extends MessageReceivedListener {
override def onMessageReceived(sender: Any, args: MessageEventArgs): Unit = {
// Perform operations
}
}
class TopicEventListener extends TopicListener {
override def onTopicDeleted(sender: Any, args: TopicDeleteEventArgs): Unit = {
// Deleted topic is args.getTopicName
}
override def onMessageDeliveryFailure(sender: Any, args: MessageFailedEventArgs): Unit = {
// Failure reason is args.getMessageFailureReason
}
}
onTopicDeleted(sender, args) {
// Deleted topic is args.getTopicName()
}
onMessageDeliveryFailure(sender, args) {
// Failure reason is args.getMessageFailureReason()
}
onMessageReceived(sender, args) {
// Perform operations
}
def on_topic_deleted(sender: object, args: ncache.TopicDeleteEventArgs):
# Perform operations
print("Deleted topic is " + args.get_topic_name())
def on_message_delivery_failure(sender: object, args: ncache.MessageFailedEventArgs):
# Perform operations
print("Failure reason is " + str(args.get_message_failure_reason()))
def on_message_received(sender: object, args: ncache.MessageEventArgs):
# Perform operations
print("Message received from topic " + args.get_topic_name())
Additional Resources
NCache provides sample application for Pub/Sub on GitHub.
See Also
Pub/Sub Topics
Subscribe to a Topic
Pub/Sub Events