The Kaa event subsystem enables messages delivery across the endpoints (EP). Events can be thought of as commands or structured chunks of data. For example, an event from a smartphone application can toggle the lights in the room.
Structure of the data carried by events is defined by an Event Class (EC) data schema configured on the Kaa server and built into Kaa SDK. ECs are grouped into Event Class Families (ECF) by the topic. Kaa allows delivering events among EPs that belong to the same or different applications. As of Kaa r0.6, endpoints must be associated with the same user in order to be able to send events to each other. Please review the events system design reference for more background.
From this guide you will learn how to use Kaa events functionality for enabling communication across the endpoints.
The following diagram illustrates basic entities and data flows in scope of the event management:
This section provides guidance on how to configure ECFs in Kaa.
ECFs are shared between applications and there is no default ECFs created automatically. A tenant admin can create multiple ECFs on the Kaa server using the Admin UI or REST API.
In this guide, we will use the following ECF that allows you to remotely control a thermostat using your cell phone.
[ { "namespace":"org.kaaproject.kaa.schema.sample.thermo", "type":"record", "classType":"event", "name":"ThermostatInfoRequest", "fields":[ ] }, { "namespace":"org.kaaproject.kaa.schema.sample.thermo", "type":"record", "classType":"object", "name":"ThermostatInfo", "fields":[ { "name":"currentTemperature", "type":"int" }, { "name":"targetTemperature", "type":"int" } ] }, { "namespace":"org.kaaproject.kaa.schema.sample.thermo", "type":"record", "classType":"event", "name":"ThermostatInfoResponse", "fields":[ { "name":"thermostatInfo", "type":"org.kaaproject.kaa.schema.sample.thermo.ThermostatInfo" } ] }, { "namespace":"org.kaaproject.kaa.schema.sample.thermo", "type":"record", "classType":"event", "name":"ChangeTemperatureCommand", "fields":[ { "name":"temperature", "type":"int" } ] } ]
Our sample ECF contains three events: ThermostatInfoRequest
, ThermostatInfoResponse
and ChangeTemperatureCommand
. We will create the following two applications in this guide: Controller and Thermostat. It is logical that Controller should be able to send the ThermostatInfoRequest
event (act as a source) and receive the ThermostatInfoResponse
event (act as a sink), while Thermostat should be able to send the ThermostatInfoResponse
event and receive the ThermostatInfo
event. Based on such logic of our application, we will create the following application mapping for our sample ECF:Request
Application | ThermostatInfoRequest | ThermostatInfoResponse | ChangeTemperatureCommand |
---|---|---|---|
Controller | source | sink | source |
Thermostat | sink | source | sink |
A tenant admin can set up application mapping using the Admin UI or REST API.
During the SDK generation, the Control server generates the event object model and extends APIs to support methods for sending events and registering event listeners. An application SDK can support multiple ECFs. However, it cannot simultaneously support multiple versions of the same ECF.
This section provides code samples which illustrate practical usage of events in Kaa. The event subsystem API varies depending on the target SDK platform, but the general approach is the same.
To enable sending/receiving events to/from endpoints, at first the client should attach the endpoint to the user as shown in the following screenshot.
import org.kaaproject.kaa.client.KaaClient; import org.kaaproject.kaa.client.KaaDesktop; import org.kaaproject.kaa.client.event.registration.UserAuthResultListener; kaaClient.attachUser("userExternalId", "userAccessToken", new UserAttachCallback() { @Override public void onAttachResult(UserAttachResponse response) { System.out.println("Attach response" + response.getResult()); } });
#include <memory> #include <iostream> #include <kaa/Kaa.hpp> #include <kaa/event/registration/SimpleUserAttachCallback.hpp> using namespace kaa; class SimpleUserAttachCallback : public IUserAttachCallback { public: virtual void onAttachSuccess() { std::cout << "Endpoint is attached to a user" << std::endl; } virtual void onAttachFailed(UserAttachErrorCode errorCode, const std::string& reason) { std::cout << "Failed to attach endpoint to a user: error code " << errorCode << ", reason '" << reason << "'" << std::endl; } }; ... // Create an endpoint instance auto kaaClient = Kaa::newClient(); // Start an endpoint kaaClient->start(); // Try to attach an endpoint to a user kaaClient->attachUser("userExternalId", "userAccessToken", std::make_shared<SimpleUserAttachCallback>());
#include <kaa/kaa_user.h> #include <kaa/platform/ext_user_callback.h> kaa_client_t *kaa_client = /* ... */; kaa_error_t on_attached(void *context, const char *user_external_id, const char *endpoint_access_token) { return KAA_ERR_NONE; } kaa_error_t on_detached(void *context, const char *endpoint_access_token) { return KAA_ERR_NONE; } kaa_error_t on_attach_success(void *context) { return KAA_ERR_NONE; } kaa_error_t on_attach_failed(void *context, user_verifier_error_code_t error_code, const char *reason) { return KAA_ERR_NONE; } kaa_attachment_status_listeners_t attachement_listeners = { NULL, &on_attached, &on_detached, &on_attach_success, &on_attach_failed }; /* Assume Kaa SDK is already initialized */ kaa_error_t error_code = kaa_user_manager_set_attachment_listeners(kaa_client_get_context(kaa_client)->user_manager , &attachement_listeners); /* Check error code */ error_code = kaa_user_manager_default_attach_to_user(kaa_client_get_context(kaa_client)->user_manager , "userExternalId" , "userAccessToken"); /* Check error code */
#import <Kaa/Kaa.h> @interface ViewController () <UserAttachDelegate> ... - (void)prepareUserForMessaging { [self.kaaClient attachUserWithId:@"userExternalId" accessToken:@"userAccessToken" delegate:self]; } - (void)onAttachResult:(UserAttachResponse *)response { NSLog(@"Attach response: %i", response.result); }
Specific endpoint may not be able to attach itself independently. E.g. in case if endpoint doesn't have an user token. Another endpoint that already attached can assist in attachment process of the new endpoint. Below are examples of assisted attachment.
#include <stdint.h> #include <kaa/kaa_error.h> #include <kaa/platform/kaa_client.h> #include <kaa/kaa_user.h> #include <kaa/platform/ext_user_callback.h> kaa_client_t *kaa_client = /* ... */; kaa_attachment_status_listeners_t assisted_attachment_listeners = /* ... */; /* Assume Kaa SDK is already initialized */ kaa_error_t error_code = kaa_user_manager_default_attach_to_user(kaa_client_get_context(kaa_client)->user_manager , "userExternalId" , "userAccessToken"); /* Check error code */ error_code = kaa_user_manager_attach_endpoint(kaa_client_get_context(kaa_client)->user_manager, "externalAccessToken", &assisted_attachment_listeners); /* Check error code */
To access the Kaa event functionality, the client should implement the two following blocks of code.
import org.kaaproject.kaa.client.event.EventFamilyFactory; EventFamilyFactory eventFamilyFactory = kaaClient.getEventFamilyFactory();
#include <kaa/event/gen/EventFamilyFactory.hpp> ... EventFamilyFactory& eventFamilyFactory = kaaClient->getEventFamilyFactory();
#import <Kaa/Kaa.h> ... EventFamilyFactory *eventFamilyFactory = [kaaClient getEventFamilyFactory];
import org.kaaproject.kaa.demo.smarthouse.thermo.ThermoEventClassFamily; ThermoEventClassFamily tecf = eventFamilyFactory.getThermoEventClassFamily();
#include <kaa/event/gen/ThermoEventClassFamily.hpp> ... ThermoEventClassFamily& tecf = eventFamilyFactory.getThermoEventClassFamily();
#import<Kaa/Kaa.h> ... ThermostatEventClassFamily *tecf = [self.eventFamilyFactory getThermostatEventClassFamily];
To send one or more events, the client should proceed as described in this section.
Execute the asynchronous findEventListeners method to request a list of the endpoints supporting all specified EC FQNs (FQN stands for fully qualified name).
import org.kaaproject.kaa.client.event.FindEventListenersCallback; List<String> FQNs = new LinkedList<>(); FQNs.add(ThermostatInfoRequest.class.getName()); FQNs.add(ChangeTemperatureCommand.class.getName()); kaaClient.findEventListeners(FQNs, new FindEventListenersCallback() { @Override public void onEventListenersReceived(List<String> eventListeners) { // Some code } @Override public void onRequestFailed() { // Some code } });
#include <list> #include <memory> #include <string> #include <vector> #include <kaa/event/IFetchEventListeners.hpp> class SimpleFetchEventListeners : public IFetchEventListeners { public: virtual void onEventListenersReceived(const std::vector<std::string>& eventListeners) { // Some code } virtual void onRequestFailed() { // Some code } }; ... std::list<std::string> FQNs = {"org.kaaproject.kaa.schema.sample.thermo.ThermostatInfoRequest" ,"org.kaaproject.kaa.schema.sample.thermo.ChangeTemperatureCommand"}; kaaClient->findEventListeners(FQNs, std::make_shared<SimpleFetchEventListeners>());
#include <kaa/event.h> #include <kaa/platform/ext_event_listeners_callback.h> const char *fqns[] = { "org.kaaproject.kaa.schema.sample.thermo.ThermostatInfoRequest" , "org.kaaproject.kaa.schema.sample.thermo.ChangeTemperatureCommand" }; kaa_error_t event_listeners_callback(void *context, const kaa_endpoint_id listeners[], size_t listeners_count) { /* Process response */ return KAA_ERR_NONE; } kaa_error_t event_listeners_request_failed(void *context) { /* Process failure */ return KAA_ERR_NONE; } kaa_event_listeners_callback_t callback = { NULL , &event_listeners_callback , &event_listeners_request_failed }; kaa_error_t error_code = kaa_event_manager_find_event_listeners(kaa_client_get_context(kaa_client)->event_manager , fqns , 2 , &callback); /* Check error code */
#import <Kaa/Kaa.h> @interface ViewController () <FindEventListenersDelegate> ... NSArray *listenerFQNs = @[[ThermostatInfoRequest FQN], [ChangeDegreeRequest FQN]]; [self.kaaClient findListenersForEventFQNs:listenerFQNs delegate:self]; - (void)onEventListenersReceived:(NSArray *)eventListeners { // Some code } - (void)onRequestFailed { // Some code }
To send an event to all endpoints which were previously located by the findEventListeners method, execute the sendEventToAll method upon the specific ECF object.
import org.kaaproject.kaa.schema.sample.thermo.ThermostatInfoRequest; tecf.sendEventToAll(new ThermostatInfoRequest());
#include <kaa/event/gen/ThermoEventClassFamilyGen.hpp> ... nsThermoEventClassFamily::ThermostatInfoRequest thermoRequest; tecf.sendEventToAll(thermoRequest);
#include <kaa/gen/kaa_thermo_event_class_family.h> /* Create and send an event */ kaa_thermo_event_class_family_thermostat_info_request_t* thermo_request = kaa_thermo_event_class_family_thermostat_info_request_create(); kaa_error_t error_code = kaa_event_manager_send_kaa_thermo_event_class_family_thermostat_info_request(kaa_client_get_context(kaa_client)->event_manager , thermo_request , NULL); /* Check error code */ thermo_request->destroy(thermo_request);
#import <Kaa/Kaa.h> ... ThermostatInfoRequest *request = [[ThermostatInfoRequest alloc] init]; [self.tecf sendThermostatInfoRequestToAll:request];
To send an event to a single endpoint which was previously located by the findEventListeners method, execute the sendEvent method upon the specific ECF object and this endpoint.
import org.kaaproject.kaa.schema.sample.thermo.ChangeTemperatureCommand; ChangeTemperatureCommand ctc = new ChangeTemperatureCommand(-30); // Assume the target variable is one of the received in the findEventListeners method tecf.sendEvent(ctc, target);
#include <kaa/event/gen/ThermoEventClassFamilyGen.hpp> ... nsThermoEventClassFamily::ChangeTemperatureCommand ctc; ctc.temperature = -30; // Assume the target variable is one of the received in the findEventListeners method tecf.sendEvent(ctc, target);
#include <kaa/geb/kaa_thermo_event_class_family.h> /* Create and send an event */ kaa_endpoint_id target_endpoint; kaa_thermo_event_class_family_change_temperature_command_t* change_command = kaa_thermo_event_class_family_change_temperature_command_create(); change_command->temperature = -30; kaa_error_t error_code = kaa_event_manager_send_kaa_thermo_event_class_family_change_temperature_command(kaa_client_get_context(kaa_client)->event_manager , change_command , target_endpoint); /* Check error code */ change_command->destroy(change_command);
#import<Kaa/Kaa.h> ... KAAUnion *degree = [KAAUnion unionWithBranch:KAA_UNION_INT_OR_NULL_BRANCH_0 data:@(-30)]; ChangeDegreeRequest *changeDegree = [[ChangeDegreeRequest alloc] initWithDegree:degree]; // Assume the target variable is one of the received in the findEventListeners method [tecf sendChangeDegreeRequest:changeDegree to:target];
To send a batch of events at once to a single or all endpoints, execute the following code.
import org.kaaproject.kaa.client.event.EventFamilyFactory; import org.kaaproject.kaa.demo.smarthouse.thermo.ThermoEventClassFamily; import org.kaaproject.kaa.schema.sample.thermo.ThermostatInfoRequest; import org.kaaproject.kaa.schema.sample.thermo.ChangeTemperatureCommand; // Get instance of EventFamilyFactory EventFamilyFactory eventFamilyFactory = kaaClient.getEventFamilyFactory(); ThermoEventClassFamily tecf = eventFamilyFactory.getThermoEventClassFamily(); // Register a new event block and get a unique block id TransactionId trxId = eventFamilyFactory.startEventsBlock(); // Add events to the block // Adding a broadcasted event to the block tecf.addEventToBlock(trxId, new ThermostatInfoRequest()); // Adding a targeted event to the block tecf.addEventToBlock(trxId, new ChangeTemperatureCommand(-30), "home_thermostat"); // Send an event batch eventFamilyFactory.submitEventsBlock(trxId); // Or cancel an event batch eventFamilyFactory.removeEventsBlock(trxId);
#include <kaa/event/gen/EventFamilyFactory.hpp> #include <kaa/event/gen/ThermoEventClassFamily.hpp> #include <kaa/event/gen/ThermoEventClassFamilyGen.hpp> using namespace kaa; // Get an instance of EventFamilyFactory EventFamilyFactory& eventFamilyFactory = kaaClient->getEventFamilyFactory(); ThermoEventClassFamily& tecf = eventFamilyFactory.getThermoEventClassFamily(); // Register a new event block and get a unique block id TransactionIdPtr trxId = eventFamilyFactory.startEventsBlock(); // Add events to the block // Adding a broadcasted event to the block nsThermoEventClassFamily::ThermostatInfoRequest thermoRequest; tecf.addEventToBlock(trxId, thermoRequest); // Adding a targeted event to the block nsThermoEventClassFamily::ChangeTemperatureCommand ctc; ctc.temperature = -30; tecf.addEventToBlock(trxId, ctc, "home_thermostat"); // Send an event batch eventFamilyFactory.submitEventsBlock(trxId); // Or cancel an event batch eventFamilyFactory.removeEventsBlock(trxId);
#include <kaa/kaa_event.h> #include <kaa/gen/kaa_thermo_event_class_family.h> kaa_event_block_id transaction_id; kaa_error_t error_code = kaa_event_create_transaction(kaa_context->event_manager, &transaction_id); /* Check error code */ kaa_thermo_event_class_family_thermostat_info_request_t* thermo_request = kaa_thermo_event_class_family_thermostat_info_request_create(); kaa_thermo_event_class_family_change_temperature_command_t* change_command = kaa_thermo_event_class_family_change_temperature_command_create(); change_command->temperature = 5; error_code = kaa_event_manager_add_kaa_thermo_event_class_family_thermostat_info_request_event_to_block(kaa_client_get_context(kaa_client)->event_manager , thermo_request , NULL , transaction_id); /* Check error code */ kaa_endpoint_id target_endpoint; error_code = kaa_event_manager_add_kaa_thermo_event_class_family_change_temperature_command_event_to_block(kaa_client_get_context(kaa_client)->event_manager , change_command , target_endpoint , transaction_id); /* Check error code */ error_code = kaa_event_finish_transaction(kaa_client_get_context(kaa_client)->event_manager, transaction_id); /* Check error code */ thermo_request->destroy(thermo_request); change_command->destroy(change_command);
#import <Kaa/Kaa.h> ... // Get instance of EventFamilyFactory EventFamilyFactory *eventFamilyFactory = [kaaClient getEventFamilyFactory]; ThermostatEventClassFamily *tecf = [eventFamilyFactory getThermostatEventClassFamily]; // Register a new event block and get a unique block id TransactionId *trxId = [eventFamilyFactory startEventsBlock]; // Add events to the block // Adding a broadcasted event to the block [tecf addThermostatInfoRequestToBlock:[[ThermostatInfoRequest alloc] init] withTransactionId:trxId]; // Adding a targeted event to the block ChangeDegreeRequest *request = [[ChangeDegreeRequest alloc] init]; request.degree = [KAAUnion unionWithBranch:KAA_UNION_INT_OR_NULL_BRANCH_0 data:@(-30)]; [tecf addChangeDegreeRequestToBlock:request withTransactionId:trxId target:@"home_thermostat"]; // Send an event batch [eventFamilyFactory submitEventsBlockWithTransactionId:trxId]; // Or cancel an event batch [eventFamilyFactory removeEventsBlock:trxId];
To start listening to incoming events, execute the addListener method upon the specific ECF object.
import org.kaaproject.kaa.demo.smarthouse.thermo.ThermoEventClassFamily; tecf.addListener(new ThermoEventClassFamily.Listener() { @Override public void onEvent(ChangeTemperatureCommand event, String source) { // Some code } @Override public void onEvent(ThermostatInfoResponse event, String source) { // Some code } @Override public void onEvent(ThermostatInfoRequest event, String source) { // Some code } });
#include <kaa/event/gen/ThermoEventClassFamilyGen.hpp> class SimpleThermoEventClassFamilyListener: public ThermoEventClassFamily::ThermoEventClassFamilyListener { public: virtual void onEvent(const nsThermoEventClassFamily :: ThermostatInfoRequest& event, const std::string& source) { // Some code } virtual void onEvent(const nsThermoEventClassFamily :: ThermostatInfoResponse& event, const std::string& source) { // Some code } virtual void onEvent(const nsThermoEventClassFamily :: ChangeTemperatureCommand& event, const std::string& source) { // Some code } }; ... SimpleThermoEventClassFamilyListener eventsListener; tecf.addEventFamilyListener(eventsListener);
#include <kaa/kaa_event.h> #include <kaa/gen/kaa_thermo_event_class_family.h> void on_thermo_event_class_family_change_temperature_command(void *context, kaa_thermo_event_class_family_change_temperature_command_t *event, kaa_endpoint_id_p source) { /* Process event */ event->destroy(event); } kaa_error_t error_code = kaa_event_manager_set_kaa_thermo_event_class_family_change_temperature_command_listener(kaa_client_get_context(kaa_client)->event_manager , &on_thermo_event_class_family_change_temperature_command , NULL); /* Check error code */
#import <Kaa/Kaa.h> @interface ViewController () <ThermostatEventClassFamilyDelegate> ... [self.tecf addDelegate:self]; - (void)onThermostatInfoRequest:(ThermostatInfoRequest *)event fromSource:(NSString *)source { // Some code } - (void)onThermostatInfoResponse:(ThermostatInfoResponse *)event fromSource:(NSString *)source { // Some code } - (void)onChangeDegreeRequest:(ChangeDegreeRequest *)event fromSource:(NSString *)source { // Some code }
Copyright © 2014-2015, CyberVision, Inc.