How to use Continuous Query in Cache
Note
This feature is only available in NCache Enterprise Edition.
Assuming that you have indexed the required searchable attributes, you are now required to implement continuous query in your application. Keeping in mind the purpose of continuous queries, the first thing you need to do is to define all
the callbacks that need to be executed once the result set of your query is in any way changed. Then, we need to register the continuous query with the cache server.
If all your applications don't require tracking of any query result set, then you should not only unregister
notifications but also unregister the query from your cache.
Prerequisites
- To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details, refer to: ICache, EventDataFilter, EventType, ExecuteReader, RegisterCQ, UnRegisterCQ, UnRegisterNotification, ContinuousQuery, RegisterNotification
, QueryDataNotificationCallback, ICacheReader, FieldCount, Read, Insert, CQEventArg, GetValue.
- To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details, refer to: Cache, EventDataFilter, EventType, executeReader, registerCQ, unRegisterCQ, removeDataModificationListener, CacheItem, CacheItemVersion, insert, CQEventArg, getEventType, read, getValue, ContinuousQuery, addDataModificationListener, getFieldCount, CacheReader.
- To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details, refer to: Cache, EventDataFilter, EventType, executeReader, registerCQ, unRegisterCQ, removeDataModificationListener, CacheItem, insert, getEventType, getValue, read, getFieldCount, ContinuousQuery, addDataModificationListener.
- To learn about the standard prerequisites required to work with all NCache client side features please refer to the given page on Client Side API Prerequisites.
- Indexing for searchable objects and their attributes need to be configured first as explained in Configuring Query Indexes in Administrator's Guide.
- Cache should have some data related to configured attributes.
- For API details refer to: get_event_type, execute_reader, register_cq,
un_register_cq, remove_data_modification_listener, CacheItem, insert, get_field_count, add_data_modification_listener, ContinuousQuery, read, get_value, EventType, CQEventArg.
Step 1: Register Callback for Events
Assuming that you have indexed the required searchable attributes, you can implement Continuous Query in your application. This requires you to define all the callbacks that need to be executed once the result set of your query is modified. Then, we need to register the Continuous Query with the cache server.
A callback can be registered for multiple events.
static void QueryItemCallBack(string key, CQEventArg arg)
{
switch (arg.EventType)
{
case EventType.ItemAdded:
// "key" has been added to cache
break;
case EventType.ItemUpdated:
// "key" has been updated in cache
// Get updated Product object
if (arg.Item != null)
{
Product updatedProduct = arg.Item.GetValue<Product>();
// Perform operations
}
break;
case EventType.ItemRemoved:
// "key" has been removed from cache
break;
}
}
public void queryItemCallback (String key, CQEventArg arg)
{
switch (arg.getEventType())
{
case ItemAdded:
// Key has been added to the cache
break;
case ItemUpdated:
// Key has been updated in the cache
// Get updated product object
if (arg.getItem() != null)
{
Product updatedProduct = arg.getItem().getValue(Product.class);
// Perform operations accordingly
}
break;
case ItemRemoved:
// Key has been removed from the cache
break;
}
}
class CQListener extends QueryDataModificationListener {
override def onQueryDataModified(key: String, eventArgs: CQEventArg): Unit = {
eventArgs.getEventType match {
case EventType.ItemAdded =>
// Key has been added to the cache
case EventType.ItemUpdated =>
// 'key' has been updated in the cache
// get the updated product
if (eventArgs.getItem != null) {
val updateProduct = eventArgs.getItem.getValue(classOf[Product])
// perform operations
}
case EventType.ItemRemoved =>
// 'key' has been removed from the cache
}
}
}
queryItemCallback(key, arg)
{
switch (arg.getEventType())
{
case ncache.ItemAdded:
// Key has been added to the cache
break;
case ncache.ItemUpdated:
// Key has been updated in the cache
// Get updated product object
if (arg.getItem() != null)
{
let updatedProduct = arg.getItem().getValue(Product);
// Perform operations accordingly
}
break;
case ncache.ItemRemoved:
// key has been removed from the cache
break;
}
}
def query_item_callback(key: str, arg: ncache.CQEventArg):
if arg.get_event_type() is ncache.EventType.ITEM_ADDED:
# Key has been added to the cache
print(key + " added to cache")
if arg.get_event_type() is ncache.EventType.ITEM_UPDATED:
# Key has been updated in the cache
# Get updated product object
if arg.get_item() is not None:
updated_product = arg.get_item().get_value(Product)
# Perform operations accordingly
if arg.get_event_type() is ncache.EventType.ITEM_REMOVED:
# key has been removed from the cache
print(key + " removed from cache")
Step 2: Register Query and Notifications
After the callbacks are registered, create a Continuous Query, which specifies the criteria for the result set of which the events will be fired. This query will be registered against the server.
Once Continuous Query has been created, the pre-defined callbacks are registered with the query. The callbacks are registered according to EventType
and EventDataFilter
.
The continuous query can now be registered on the server using RegisterCQ()
. You can use this method multiple times in your application to receive the notifications for a change in the dataset of your query.
Any modifications in cache event notifications will be triggered according to the event type. For querying cached data, ExecuteReader
executes the query and the result set generated is then read at client side, chunk by chunk.
You can trigger events by modifying cache data such that it affects the result set. The code sample updates an existing cache item such that is added to the query result set, thereby firing an ItemAdded event.
Warning
If the connection breaks between a server and client, any events fired within this duration will not be received by the client.
try
{
// Precondition: Cache is already connected
// Query for required operation
string query = "SELECT $VALUE$ FROM FQN.Product WHERE Category = ?";
var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Category", "Beverages");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Item add notification
// EventDataFilter.None returns the cache keys added
cQuery.RegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemAdded, EventDataFilter.None);
// Item update notification
// EventDataFilter.DataWithMetadata returns cache keys + modified item + metadata on updation
cQuery.RegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemUpdated, EventDataFilter.DataWithMetadata);
// Item Remove notification
// EventDataFilter.Metadata returns cache keys + item metadata on updation
cQuery.RegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
// Register continuousQuery on server
cache.MessagingService.RegisterCQ(cQuery);
// Query Cached Data
ICacheReader reader = cache.SearchService.ExecuteReader(queryCommand);
// If resultset is not empty
if (reader.FieldCount > 0)
{
while (reader.Read())
{
Product result = reader.GetValue<Product>(1);
// Perform operations
}
}
else
{
// Null query result set returned
}
// Update Product Data in Cache to trigger callback
var updatedProduct = new Product()
{
ProductID = 1001,
ProductName = "Coffee",
Category = "Beverages" // Complies with criteria
};
string key = $"Product:(updatedProduct.ProductID)";
cache.Insert(key, updatedProduct);
// This will add the item to the result set
// as it matches query criteria
}
catch (OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.INCORRECT_FORMAT)
{
// Make sure that the query format is correct
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any generic exception like ArgumentException, ArgumentNullException
}
try
{
// Precondition: Cache is already connected
// Query for required operation
String query = "SELECT Values FROM FQN.Product WHERE Category = ?";
QueryCommand queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Category", "Beverages");
// Create continuous query
com.alachisoft.ncache.client.ContinuousQuery continuousQuery = new com.alachisoft.ncache.client.ContinuousQuery(queryCommand);
EventListener listener = new EventListener();
EnumSet<com.alachisoft.ncache.runtime.events.EventType> eventType = EnumSet.of(com.alachisoft.ncache.runtime.events.EventType.ItemAdded, com.alachisoft.ncache.runtime.events.EventType.ItemRemoved, com.alachisoft.ncache.runtime.events.EventType.ItemUpdated);
// Item remove notification
// EventDataFilter.Metadata returns cache keys + item metadata on updation
continuousQuery.addDataModificationListener(listener, eventType, EventDataFilter.None);
// Register continuous query on server
cache.getMessagingService().registerCQ(continuousQuery);
CacheReader reader = cache.getSearchService().executeReader(queryCommand);
if (reader.getFieldCount() > 0)
{
while (reader.read())
{
Product result = reader.getValue(1, Product.class);
// Perform operations
}
}
else
{
// Null query result set returned
}
// Update Product data in cache to trigger callback
Product updatedProduct = new Product();
updatedProduct.setProductID(1001);
updatedProduct.setProductName("Tea");
String key = "Product:" + updatedProduct.getProductID();
CacheItem cacheItem = new CacheItem(updatedProduct);
// Trigger add notifications
CacheItemVersion version = cache.insert(key, cacheItem);
// This will add item to the result set as it matches query criteria
}
catch (OperationFailedException ex)
{
if (ex.getErrorCode() == NCacheErrorCodes.INCORRECT_FORMAT)
{
// Make sure that the query format is correct
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any generic exception like IllegalArgumentException or NullPointerException
}
try {
// Precondition: Cache is already connected
// Query for required operation
val query = "SELECT Values FROM FQN.Product WHERE Category = ?"
val queryCommand = new QueryCommand(query)
queryCommand.setParameters(Map("Category" -> "Beverages"))
// Create continuous query
val continuousQuery = new ContinuousQuery(queryCommand)
val listener = new CQListener()
val eventType = List(EventType.ItemAdded, EventType.ItemRemoved, EventType.ItemUpdated)
// Item remove notification
// EventDataFilter.Metadata returns cache keys + item metadata on updation
continuousQuery.addDataModificationListener(listener, eventType, EventDataFilter.None)
// Register continuous query on server
cache.getMessagingService.registerCQ(continuousQuery)
val reader = cache.getSearchService.executeReader(queryCommand)
if (reader.getFieldCount > 0)
{
while (reader.read)
{
val result = reader.getValue(1, classOf[Product])
// Perform operations
}
}
else
{
// Null query result set returned
}
// Update Product data in cache to trigger callback
val updatedProduct = Product()
updatedProduct.setProductId(1001)
updatedProduct.setProductName("Tea")
val key = "Product:" + updatedProduct.getProductId
val cacheItem = new CacheItem(updatedProduct)
// Trigger add notifications
val version = cache.insert(key, cacheItem)
// This will add item to the result set as it matches query criteria
}
catch {
case exception: Exception => {
// Handle any errors
}
}
// This is an async method
try
{
// Precondition: Cache is already connected
// Query for required operation
let query = "SELECT Values FROM FQN.Product WHERE Category = ?";
var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Category", "Beverages");
// Create continuous query
let continuousQuery = new ncache.ContinuousQuery(queryCommand);
let listener = new ncache.EventFilter();
var eventType = ncache.EnumSet.of(ncache.EventType.ItemAdded, ncache.EventType.ItemRemoved, ncache.EventType.ItemUpdated);
// Item remove notification
// EventDataFilter.Metadata returns cache keys + item metadata on updation
continuousQuery.addDataModificationListener(listener, eventType, ncache.EventDataFilter.None);
// Register continuous query on server
await this.cache.getMessagingService().registerCQ(continuousQuery);
let reader = await this.cache.getSearchService().executeReader(queryCommand);
if (reader.getFieldCount() > 0)
{
while (reader.read())
{
let result = reader.getValue(1, Product);
// Perform operations
}
}
else
{
// Null query result set returned
}
// Update Product data in cache to trigger callback
let updatedProduct = new Product();
updatedProduct.setProductID(1001);
updatedProduct.setProductName("Tea");
let key = "Product:" + updatedProduct.getProductID();
let cacheItem = new ncache.CacheItem(updatedProduct);
// Trigger add notifications
let version = await this.cache.insert(key, cacheItem);
// This will add item to the result set as it matches query criteria
}
catch (error)
{
// Handle any errors
}
try:
# Precondition: Cache is already connected
# Query for required operation
query = "SELECT $Value$ FROM FQN.Product WHERE category = ?"
query_command = ncache.QueryCommand(query)
query_command.set_parameters({"Category": "Beverages"})
# Create continuous query
continuous_query = ncache.ContinuousQuery(query_command)
event_type = [ncache.EventType.ITEM_REMOVED]
# Item remove notification
# EventDataFilter.Metadata returns cache keys + item metadata on updation
continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE)
# Register continuous query on server
cache.get_messaging_service().register_cq(continuous_query)
reader = cache.get_search_service().execute_reader(query_command)
if reader.get_field_count() > 0:
while reader.read():
result = reader.get_value(Product, 1)
# Perform operations
else:
# None query result set returned
print("Query result is None")
# Update Product data in cache to trigger callback
updated_product = Product()
updated_product.set_product_id(1001)
updated_product.set_product_name("Tea")
key = "Product:" + updated_product.get_product_id()
cache_item = ncache.CacheItem(updated_product)
# Trigger add notifications
version = cache.insert(key, cache_item)
# This will add item to the result set as it matches query criteria
except Exception as exp:
# Handle errors
Step 3: Unregister Notifications from Continuous Query
Notifications can be unregistered from Continuous Query when they are no longer required in application. You can unregister notifications for a specific event type if multiple event types have been registered using UnRegisterNotification
method.
For example, if ItemAdded
and ItemRemoved
event types were registered but your business logic no longer requires events for ItemAdded
, you specifically unregister notifications for ItemAdded
events.
// Unregister notifications for ItemAdded events only
cQuery.UnRegisterNotification(new QueryDataNotificationCallback(QueryItemCallBack), EventType.ItemAdded);
// Unregister notifications for ItemAdded events only
continuousQuery.removeDataModificationListener(new QueryModificationListener listener, EnumSet<EventType.ADDED> eventType);
// Unregister notifications for ItemAdded events only
cQuery.removeDataModificationListener(CQListener(), List(EventType.ItemAdded))
// Unregister notifications for ItemAdded events only
var eventType = ncache.EnumSet.of(ncache.EventType.ADDED);
continuousQuery.removeDataModificationListener(listener, eventType);
# Unregister notifications for ItemAdded events only
event_type = [ncache.EventType.ITEM_ADDED]
c_query.remove_data_modification_listener(cq_event_listener, event_type)
Step 4: Unregister Continuous Query from Server
Once the application is no more interested in receiving notifications for changes in a query result set, the registered continuous query should be unregistered from server.
UnregisterCQ
takes as argument an object of ContinuousQuery
to unregister the callbacks which are no more fired after this call.
// Unregister Continuous Query from server
cache.MessagingService.UnRegisterCQ(cQuery);
// Unregister cq from server
cache.getMessagingService().unRegisterCQ(continuousQuery);
// Unregister cq from server
cache.getMessagingService.unRegisterCQ(cQuery)
// Unregister cq from server
await this.cache.getMessagingService().unRegisterCQ(continuousQuery);
# Unregister cq from server
cache.get_messaging_service().un_register_cq(c_query)
Additional Resources
NCache provides a sample application for Continuous Queries on GitHub.
See Also
SQL Reference for NCache
Event Notifications in Cache
Pub/Sub Messaging