The Kaa event subsystem enables messages delivery across the endpoints (EP). Events can be thought of as commands or structured chunks of data. For example, an event from a smartphone application can toggle the lights in the room.

Structure of the data carried by events is defined by an Event Class (EC) data schema configured on the Kaa server and built into Kaa SDK. ECs are grouped into Event Class Families (ECF) by the topic. Kaa allows delivering events among EPs that belong to the same or different applications. As of Kaa r0.6, endpoints must be associated with the same user in order to be able to send events to each other. Please review the events system design reference for more background.

From this guide you will learn how to use Kaa events functionality for enabling communication across the endpoints.

Basic architecture

The following diagram illustrates basic entities and data flows in scope of the event management:

  • Events are generated based on the event class schema created by the developer for the event class family
  • The event class family can be used by one or multiple applications, thus the event can be shared between applications
  • Several applications belonging to the same user share events between each other  according to the application mapping and through the Kaa server

Configuring Kaa

This section provides guidance on how to configure ECFs in Kaa.

Creating ECFs

ECFs are shared between applications and there is no default ECFs created automatically. A tenant admin can create multiple ECFs on the Kaa server using the Admin UI or REST API.

In this guide, we will use the following ECF that allows you to remotely control a thermostat using your cell phone.

[  
    {  
        "namespace":"org.kaaproject.kaa.schema.sample.thermo",
        "type":"record",
        "classType":"event",
        "name":"ThermostatInfoRequest",
        "fields":[  

        ]
    },
    {  
        "namespace":"org.kaaproject.kaa.schema.sample.thermo",
        "type":"record",
        "classType":"object",
        "name":"ThermostatInfo",
        "fields":[  
            {  
                "name":"currentTemperature",
                "type":"int"
            },
            {  
                "name":"targetTemperature",
                "type":"int"
            }
        ]
    },
    {  
        "namespace":"org.kaaproject.kaa.schema.sample.thermo",
        "type":"record",
        "classType":"event",
        "name":"ThermostatInfoResponse",
        "fields":[  
            {  
                "name":"thermostatInfo",
                "type":"org.kaaproject.kaa.schema.sample.thermo.ThermostatInfo"
            }
        ]
    },
    {  
        "namespace":"org.kaaproject.kaa.schema.sample.thermo",
        "type":"record",
        "classType":"event",
        "name":"ChangeTemperatureCommand",
        "fields":[  
            {  
                "name":"temperature",
                "type":"int"
            }
        ]
    }
]

 

Application mapping

Our sample ECF contains three events: ThermostatInfoRequestThermostatInfoResponse and ChangeTemperatureCommand. We will create the following two applications in this guide: Controller and Thermostat. It is logical that Controller should be able to send the ThermostatInfoRequest event (act as a source) and receive the ThermostatInfoResponse event (act as a sink), while Thermostat should be able to send the ThermostatInfoResponse event and receive the ThermostatInfoRequest event. Based on such logic of our application, we will create the following application mapping for our sample ECF:

ApplicationThermostatInfoRequestThermostatInfoResponseChangeTemperatureCommand
Controllersourcesinksource
Thermostatsinksourcesink

A tenant admin can set up application mapping using the Admin UI or REST API.

Generating SDK

During the SDK generation, the Control server generates the event object model and extends APIs to support methods for sending events and registering event listeners. An application SDK can support multiple ECFs. However, it cannot simultaneously support multiple versions of the same ECF.

Coding

This section provides code samples which illustrate practical usage of events in Kaa. The event subsystem API varies depending on the target SDK platform, but the general approach is the same.

Attach endpoint to user

To enable sending/receiving events to/from endpoints, at first the client should attach the endpoint to the user as shown in the following screenshot.

import org.kaaproject.kaa.client.KaaClient;
import org.kaaproject.kaa.client.KaaDesktop;
import org.kaaproject.kaa.client.event.registration.UserAuthResultListener;
 
kaaClient.attachUser("userExternalId", "userAccessToken", new UserAttachCallback()
{
    @Override
    public void onAttachResult(UserAttachResponse response) {
        System.out.println("Attach response" + response.getResult());
    }
});
#include <memory>
#include <iostream>

#include <kaa/Kaa.hpp>
#include <kaa/event/registration/SimpleUserAttachCallback.hpp>
 
using namespace kaa;
 
class SimpleUserAttachCallback : public IUserAttachCallback {
public:
    virtual void onAttachSuccess()
    {
		std::cout << "Endpoint is attached to a user" << std::endl;
	}

    virtual void onAttachFailed(UserAttachErrorCode errorCode, const std::string& reason)
	{
		std::cout << "Failed to attach endpoint to a user: error code " << errorCode << ", reason '" << reason << "'" << std::endl;
	}
};

...
 
// Create an endpoint instance
auto kaaClient = Kaa::newClient();
 
// Start an endpoint
kaaClient->start();
 
// Try to attach an endpoint to a user
kaaClient->attachUser("userExternalId", "userAccessToken", std::make_shared<SimpleUserAttachCallback>());
#include <kaa/kaa_user.h>
#include <kaa/platform/ext_user_callback.h>

kaa_client_t *kaa_client = /* ... */;
 
kaa_error_t on_attached(void *context, const char *user_external_id, const char *endpoint_access_token)
{
    return KAA_ERR_NONE;
}
kaa_error_t on_detached(void *context, const char *endpoint_access_token)
{
    return KAA_ERR_NONE;
}
kaa_error_t on_attach_success(void *context)
{
    return KAA_ERR_NONE;
}
kaa_error_t on_attach_failed(void *context, user_verifier_error_code_t error_code, const char *reason)
{
    return KAA_ERR_NONE;
}
kaa_attachment_status_listeners_t attachement_listeners = 
{
        NULL,
        &on_attached,
        &on_detached,
        &on_attach_success,
        &on_attach_failed
};
/* Assume Kaa SDK is already initialized */
kaa_error_t error_code = kaa_user_manager_set_attachment_listeners(kaa_client_get_context(kaa_client)->user_manager
			                                                     , &attachement_listeners);
/* Check error code */
error_code = kaa_user_manager_default_attach_to_user(kaa_client_get_context(kaa_client)->user_manager
												   , "userExternalId"
									   		       , "userAccessToken");
/* Check error code */
#import <Kaa/Kaa.h>
 
@interface ViewController () <UserAttachDelegate>
 
...
 
- (void)prepareUserForMessaging {
	[self.kaaClient attachUserWithId:@"userExternalId" accessToken:@"userAccessToken" delegate:self];
}
- (void)onAttachResult:(UserAttachResponse *)response {
    NSLog(@"Attach response: %i", response.result);
}

Assisted attach

Specific endpoint may not be able to attach itself independently. E.g. in case if endpoint doesn't have an user token. Another endpoint that already attached can assist in attachment process of the new endpoint. Below are examples of assisted attachment.

 

 

#include <stdint.h>
#include <kaa/kaa_error.h>
#include <kaa/platform/kaa_client.h>
#include <kaa/kaa_user.h>
#include <kaa/platform/ext_user_callback.h>

kaa_client_t *kaa_client = /* ... */;

kaa_attachment_status_listeners_t assisted_attachment_listeners =  /* ... */;

/* Assume Kaa SDK is already initialized */

kaa_error_t error_code = kaa_user_manager_default_attach_to_user(kaa_client_get_context(kaa_client)->user_manager
                                                                , "userExternalId"
                                                                , "userAccessToken");

/* Check error code */

error_code = kaa_user_manager_attach_endpoint(kaa_client_get_context(kaa_client)->user_manager, "externalAccessToken", &assisted_attachment_listeners);

/* Check error code */

 

 

 

 

 

 

 

Get ECF factory and create ECF object

To access the Kaa event functionality, the client should implement the two following blocks of code.

Get ECF factory from Kaa

import org.kaaproject.kaa.client.event.EventFamilyFactory;
 
EventFamilyFactory eventFamilyFactory = kaaClient.getEventFamilyFactory();
#include <kaa/event/gen/EventFamilyFactory.hpp>

...
EventFamilyFactory& eventFamilyFactory = kaaClient->getEventFamilyFactory();
#import <Kaa/Kaa.h>
 
...
 
EventFamilyFactory *eventFamilyFactory = [kaaClient getEventFamilyFactory];

Get specific ECF object from ECF factory

import org.kaaproject.kaa.demo.smarthouse.thermo.ThermoEventClassFamily;
 
ThermoEventClassFamily tecf = eventFamilyFactory.getThermoEventClassFamily();
#include <kaa/event/gen/ThermoEventClassFamily.hpp>

...
ThermoEventClassFamily& tecf = eventFamilyFactory.getThermoEventClassFamily();
#import<Kaa/Kaa.h>
 
...
 
ThermostatEventClassFamily *tecf = [self.eventFamilyFactory getThermostatEventClassFamily];

Send events

To send one or more events, the client should proceed as described in this section.

Get endpoint addresses

Execute the asynchronous findEventListeners method to request a list of the endpoints supporting all specified EC FQNs (FQN stands for fully qualified name).

 

import org.kaaproject.kaa.client.event.FindEventListenersCallback;
 
List<String> FQNs = new LinkedList<>();
FQNs.add(ThermostatInfoRequest.class.getName());
FQNs.add(ChangeTemperatureCommand.class.getName());

kaaClient.findEventListeners(FQNs, new FindEventListenersCallback() {
    @Override
    public void onEventListenersReceived(List<String> eventListeners) {
        // Some code
    }    
	@Override
    public void onRequestFailed() {
        // Some code
    }
});
#include <list>
#include <memory>
#include <string>
#include <vector>
 
#include <kaa/event/IFetchEventListeners.hpp>

class SimpleFetchEventListeners : public IFetchEventListeners {
public:
    virtual void onEventListenersReceived(const std::vector<std::string>& eventListeners)
	{
        // Some code
	}

    virtual void onRequestFailed()
	{
        // Some code
	}
};

...
std::list<std::string> FQNs = {"org.kaaproject.kaa.schema.sample.thermo.ThermostatInfoRequest"
                              ,"org.kaaproject.kaa.schema.sample.thermo.ChangeTemperatureCommand"};

kaaClient->findEventListeners(FQNs, std::make_shared<SimpleFetchEventListeners>());
#include <kaa/event.h>
#include <kaa/platform/ext_event_listeners_callback.h>
 
const char *fqns[] = { "org.kaaproject.kaa.schema.sample.thermo.ThermostatInfoRequest"
					 , "org.kaaproject.kaa.schema.sample.thermo.ChangeTemperatureCommand" };
 
kaa_error_t event_listeners_callback(void *context, const kaa_endpoint_id listeners[], size_t listeners_count)
{
	/* Process response */
    return KAA_ERR_NONE;
}

kaa_error_t event_listeners_request_failed(void *context)
{
	/* Process failure */
    return KAA_ERR_NONE;
}
 
kaa_event_listeners_callback_t callback = { NULL
										  , &event_listeners_callback
										  , &event_listeners_request_failed };
 
kaa_error_t error_code = kaa_event_manager_find_event_listeners(kaa_client_get_context(kaa_client)->event_manager
														      , fqns
 															  , 2
															  , &callback);
 
/* Check error code */
#import <Kaa/Kaa.h>
 
@interface ViewController () <FindEventListenersDelegate>
 
...
 
	NSArray *listenerFQNs = @[[ThermostatInfoRequest FQN], [ChangeDegreeRequest FQN]];
	[self.kaaClient findListenersForEventFQNs:listenerFQNs delegate:self];
 
- (void)onEventListenersReceived:(NSArray *)eventListeners {
	// Some code
}

- (void)onRequestFailed {
	// Some code
}

Send one event to all endpoints

To send an event to all endpoints which were previously located by the findEventListeners method, execute the sendEventToAll method upon the specific ECF object.

import org.kaaproject.kaa.schema.sample.thermo.ThermostatInfoRequest;
 
tecf.sendEventToAll(new ThermostatInfoRequest());
#include <kaa/event/gen/ThermoEventClassFamilyGen.hpp>

...
nsThermoEventClassFamily::ThermostatInfoRequest thermoRequest;
tecf.sendEventToAll(thermoRequest);
#include <kaa/gen/kaa_thermo_event_class_family.h>
 
/* Create and send an event */
kaa_thermo_event_class_family_thermostat_info_request_t* thermo_request = kaa_thermo_event_class_family_thermostat_info_request_create();
 
kaa_error_t error_code = kaa_event_manager_send_kaa_thermo_event_class_family_thermostat_info_request(kaa_client_get_context(kaa_client)->event_manager
																									, thermo_request
																									, NULL);
 
/* Check error code */
 
thermo_request->destroy(thermo_request);
#import <Kaa/Kaa.h>
 
...
 
ThermostatInfoRequest *request = [[ThermostatInfoRequest alloc] init];
[self.tecf sendThermostatInfoRequestToAll:request];

Send one event to one endpoint

To send an event to a single endpoint which was previously located by the findEventListeners method, execute the sendEvent method upon the specific ECF object and this endpoint.

import org.kaaproject.kaa.schema.sample.thermo.ChangeTemperatureCommand;
 
ChangeTemperatureCommand ctc = new ChangeTemperatureCommand(-30);
// Assume the target variable is one of the received in the findEventListeners method
tecf.sendEvent(ctc, target);
#include <kaa/event/gen/ThermoEventClassFamilyGen.hpp>

...
nsThermoEventClassFamily::ChangeTemperatureCommand ctc;
ctc.temperature = -30;
 
// Assume the target variable is one of the received in the findEventListeners method
tecf.sendEvent(ctc, target);
#include <kaa/geb/kaa_thermo_event_class_family.h>

/* Create and send an event */
kaa_endpoint_id target_endpoint;
kaa_thermo_event_class_family_change_temperature_command_t* change_command = kaa_thermo_event_class_family_change_temperature_command_create();
change_command->temperature = -30;

kaa_error_t error_code = kaa_event_manager_send_kaa_thermo_event_class_family_change_temperature_command(kaa_client_get_context(kaa_client)->event_manager
     																								   , change_command
																									   , target_endpoint);
/* Check error code */

change_command->destroy(change_command);
#import<Kaa/Kaa.h>
 
...
 
KAAUnion *degree = [KAAUnion unionWithBranch:KAA_UNION_INT_OR_NULL_BRANCH_0 data:@(-30)];
ChangeDegreeRequest *changeDegree = [[ChangeDegreeRequest alloc] initWithDegree:degree];
 
// Assume the target variable is one of the received in the findEventListeners method
[tecf sendChangeDegreeRequest:changeDegree to:target];

Send batch of events to endpoint(s)

To send a batch of events at once to a single or all endpoints, execute the following code.

import org.kaaproject.kaa.client.event.EventFamilyFactory;
import org.kaaproject.kaa.demo.smarthouse.thermo.ThermoEventClassFamily;
import org.kaaproject.kaa.schema.sample.thermo.ThermostatInfoRequest;
import org.kaaproject.kaa.schema.sample.thermo.ChangeTemperatureCommand;

// Get instance of EventFamilyFactory
EventFamilyFactory eventFamilyFactory = kaaClient.getEventFamilyFactory();
ThermoEventClassFamily tecf = eventFamilyFactory.getThermoEventClassFamily();
 
// Register a new event block and get a unique block id
TransactionId trxId = eventFamilyFactory.startEventsBlock();

// Add events to the block
// Adding a broadcasted event to the block
tecf.addEventToBlock(trxId, new ThermostatInfoRequest());
// Adding a targeted event to the block
tecf.addEventToBlock(trxId, new ChangeTemperatureCommand(-30), "home_thermostat");


// Send an event batch
eventFamilyFactory.submitEventsBlock(trxId);
// Or cancel an event batch
eventFamilyFactory.removeEventsBlock(trxId);
#include <kaa/event/gen/EventFamilyFactory.hpp>
#include <kaa/event/gen/ThermoEventClassFamily.hpp>
#include <kaa/event/gen/ThermoEventClassFamilyGen.hpp>

using namespace kaa;

// Get an instance of EventFamilyFactory
EventFamilyFactory& eventFamilyFactory = kaaClient->getEventFamilyFactory();
ThermoEventClassFamily& tecf = eventFamilyFactory.getThermoEventClassFamily();

// Register a new event block and get a unique block id
TransactionIdPtr trxId = eventFamilyFactory.startEventsBlock();

// Add events to the block
// Adding a broadcasted event to the block
nsThermoEventClassFamily::ThermostatInfoRequest thermoRequest;
tecf.addEventToBlock(trxId, thermoRequest);
// Adding a targeted event to the block
nsThermoEventClassFamily::ChangeTemperatureCommand ctc;
ctc.temperature = -30;
tecf.addEventToBlock(trxId, ctc, "home_thermostat");


// Send an event batch
eventFamilyFactory.submitEventsBlock(trxId);
 
// Or cancel an event batch
eventFamilyFactory.removeEventsBlock(trxId); 
#include <kaa/kaa_event.h>
#include <kaa/gen/kaa_thermo_event_class_family.h>
 
kaa_event_block_id transaction_id;

kaa_error_t error_code = kaa_event_create_transaction(kaa_context->event_manager, &transaction_id);
/* Check error code */

kaa_thermo_event_class_family_thermostat_info_request_t* thermo_request = kaa_thermo_event_class_family_thermostat_info_request_create();
kaa_thermo_event_class_family_change_temperature_command_t* change_command = kaa_thermo_event_class_family_change_temperature_command_create();
change_command->temperature = 5;

error_code = kaa_event_manager_add_kaa_thermo_event_class_family_thermostat_info_request_event_to_block(kaa_client_get_context(kaa_client)->event_manager
                                                                                                      , thermo_request
                                                                                                      , NULL
                                                                                                      , transaction_id);
/* Check error code */

kaa_endpoint_id target_endpoint;
error_code = kaa_event_manager_add_kaa_thermo_event_class_family_change_temperature_command_event_to_block(kaa_client_get_context(kaa_client)->event_manager
                                                                                                         , change_command
                                                                                                         , target_endpoint
                                                                                                         , transaction_id);
/* Check error code */

error_code = kaa_event_finish_transaction(kaa_client_get_context(kaa_client)->event_manager, transaction_id);
/* Check error code */

thermo_request->destroy(thermo_request);
change_command->destroy(change_command);
#import <Kaa/Kaa.h>

...
// Get instance of EventFamilyFactory
EventFamilyFactory *eventFamilyFactory = [kaaClient getEventFamilyFactory];
ThermostatEventClassFamily *tecf = [eventFamilyFactory getThermostatEventClassFamily];
 
// Register a new event block and get a unique block id
TransactionId *trxId = [eventFamilyFactory startEventsBlock];

// Add events to the block
// Adding a broadcasted event to the block
[tecf addThermostatInfoRequestToBlock:[[ThermostatInfoRequest alloc] init] withTransactionId:trxId];
// Adding a targeted event to the block
ChangeDegreeRequest *request = [[ChangeDegreeRequest alloc] init];
request.degree = [KAAUnion unionWithBranch:KAA_UNION_INT_OR_NULL_BRANCH_0 data:@(-30)];
[tecf addChangeDegreeRequestToBlock:request withTransactionId:trxId target:@"home_thermostat"];


// Send an event batch
[eventFamilyFactory submitEventsBlockWithTransactionId:trxId];
// Or cancel an event batch
[eventFamilyFactory removeEventsBlock:trxId];

Receive events

To start listening to incoming events, execute the addListener method upon the specific ECF object.

import org.kaaproject.kaa.demo.smarthouse.thermo.ThermoEventClassFamily;
 
tecf.addListener(new ThermoEventClassFamily.Listener() {
    @Override
    public void onEvent(ChangeTemperatureCommand event, String source) {
        // Some code
    }
    @Override
    public void onEvent(ThermostatInfoResponse event, String source) {
        // Some code
    }
    @Override
    public void onEvent(ThermostatInfoRequest event, String source) {
        // Some code
    }
});
#include <kaa/event/gen/ThermoEventClassFamilyGen.hpp>

class SimpleThermoEventClassFamilyListener: public ThermoEventClassFamily::ThermoEventClassFamilyListener {
public:
    virtual void onEvent(const nsThermoEventClassFamily :: ThermostatInfoRequest& event, const std::string& source) 
    {
        // Some code
    }
    virtual void onEvent(const nsThermoEventClassFamily :: ThermostatInfoResponse& event, const std::string& source) 
    {
        // Some code
    }
    virtual void onEvent(const nsThermoEventClassFamily :: ChangeTemperatureCommand& event, const std::string& source) 
    {
        // Some code
    }
};
...
SimpleThermoEventClassFamilyListener eventsListener;
tecf.addEventFamilyListener(eventsListener);
#include <kaa/kaa_event.h>
#include <kaa/gen/kaa_thermo_event_class_family.h>

void on_thermo_event_class_family_change_temperature_command(void *context, kaa_thermo_event_class_family_change_temperature_command_t *event, kaa_endpoint_id_p source)
{
    /* Process event */
    event->destroy(event);
}
kaa_error_t error_code = kaa_event_manager_set_kaa_thermo_event_class_family_change_temperature_command_listener(kaa_client_get_context(kaa_client)->event_manager
                                                                                                  			   , &on_thermo_event_class_family_change_temperature_command
			                                                                                                   , NULL);
/* Check error code */
#import <Kaa/Kaa.h>
 
@interface ViewController () <ThermostatEventClassFamilyDelegate>
 
...
 
[self.tecf addDelegate:self];
- (void)onThermostatInfoRequest:(ThermostatInfoRequest *)event fromSource:(NSString *)source {
    // Some code
}

- (void)onThermostatInfoResponse:(ThermostatInfoResponse *)event fromSource:(NSString *)source {
    // Some code
}

- (void)onChangeDegreeRequest:(ChangeDegreeRequest *)event fromSource:(NSString *)source {
    // Some code
}

Copyright © 2014-2015, CyberVision, Inc.