Classes

namespace migratorydata
class MigratoryDataClient
#include <client-cpp/MigratoryDataClient.h>

This class implements all the necessary operations for connecting to a cluster of one or more MigratoryData servers, subscribing to subjects, getting real-time messages for the subscribed subjects, and publishing real-time messages.

Public Functions

MigratoryDataClient()

Create a MigratoryDataClient object.

void connect()

Use this method to connect this client to one of the MigratoryData servers you specified with MigratoryDataClient.setServers(), and subscribe to the subjects you specified with MigratoryDataClient.subscribe(), if any.

void disconnect()

Disconnect from the connected MigratoryData server and dispose the resources used by the connection.

This method should be called when the connection is no longer necessary.

void setLogListener(MigratoryDataLogListener *logListener, MigratoryDataLogLevel logLevel)

Attach a listener for handling log messages outputted by the library.

It is advisable to configure this API call first if you want to log as much as possible.

Parameters
  • logListener: an implementation of the MigratoryDataLogListener interface

  • logLevel: a particular MigratoryDataLogLevel configured as the logging threshold

void setListener(MigratoryDataListener *listener)

Attach a listener for handling the received real-time messages as well as the status notifications.

Parameters

void setServers(std::vector<std::string> &servers)

Specify a cluster of one or more MigratoryData servers to which the client will connect to.

For example, to connect to a cluster formed of two MigratoryData servers installed at the addresses p1.example.com and p2.example.com, and configured to accept clients on the standard HTTP port 80, the following code can be used:

vector<string> servers;
servers.push_back("p1.example.com:80");
servers.push_back("p2.example.com:80");
client->setServers(servers);

To achieve load-balancing, the API connects the client to a MigratoryData server chosen randomly from the servers list. In this way, the load is balanced among all the members of the cluster.

Moreover, the API supports weighted load-balancing. This feature is especially useful if the MigratoryData servers of the cluster are installed on machines with different capacities. You can assign to each member of the cluster a weight ranging from 0 to 100. This weight assignment is a hint provided to the API to select with a higher probability a MigratoryData server with a higher weight either initially when the client connects to the cluster or later during a failover reconnection.

Supposing the address p1.example.com corresponds to a machine that is twice more powerful than the machine having the address p2.example.com, then you can assign to p1.example.com a weight 100 and to p2.example.com a weight 50 by prefixing each address with the assigned weight as follows:

vector<string> servers;
servers.push_back("100 p1.example.com:80");
servers.push_back("50 p2.example.com:80");
client->setServers(servers);

The API assigns a default weight 100 to the addresses not prefixed with a specific weight.

To achieve failover, if the connection between the client and a MigratoryData server is broken, then the API will automatically detect the failure and will select another MigratoryData server from the servers list. If the client fails to connect to the newly selected server, a status notification MigratoryDataClient.NOTIFY_SERVER_DOWN will be triggered, unless this is modified using MigratoryDataClient.notifyAfterFailedConnectionAttempts()), and a new MigratoryData server of the cluster will be selected again and again until the client will be able to connect to one of the MigratoryData servers of the cluster. When successfully connected, the API will trigger MigratoryDataClient.NOTIFY_SERVER_UP.

Furthermore, if the Guaranteed Message Delivery feature is enabled for the MigratoryData cluster, then the messages potentialy published for a subscribed subject during the failover period will be automatically recovered from the cache of the MigratoryData server to which the client reconnects to and a status notification MigratoryDataClient.NOTIFY_DATA_SYNC will be triggered for that subject.

If, for example, the failover period is abnormally long, and the client is not able to recover all the messages made available during the failover period for one of its subscribed subjects, then the API will retrieve only the most recent retained message available for that subject and will trigger a MigratoryDataClient.NOTIFY_DATA_RESYNC status notification for that subject, the client behaving as a new client.

For a complete discussion about load balancing, failover, and guaranteed message delivery features see the Architecture Guide.

Parameters
  • servers: an array of strings where each string represents the network address (IP address or DNS domain name and its corresponding port) of a MigratoryData server, optionally prefixed by a weight ranging from 0 to 100; if the weight prefix is not provided to an address, then the API will automatically assign to that address a default weight 100

void subscribe(std::vector<std::string> &subjects)

Subscribe to one or more subjects.

The MigratoryData subjects are strings having a particular syntax. See the Chapter “Concepts” of the Architecture Guide to learn about the syntax of the subjects.

As an example, supposing messages are market data updates having as subjects stock names. Then, to subscribe for the messages having as subjects /stocks/NYSE/IBM and /stocks/Nasdaq/MSFT use:

vector<string> subjects;
subjects.push_back("/stocks/NYSE/IBM");
subjects.push_back("/stocks/Nasdaq/MSFT");
client->subscribe(subjects);

Parameters
  • subjects: An array of strings representing subjects.

void subscribeWithHistory(std::vector<std::string> &subjects, int numberOfHistoricalMessages)

Subscribe to one or more subjects after getting historical messages for those subjects.

The MigratoryData subjects are strings having a particular syntax. See the Chapter “Concepts” of the Architecture Guide to learn about the syntax of the subjects.

Attempt to get the number of historical messages as defined by the argument numberOfHistoricalMessages, for each subject in the argument subjects, then subscribe for real-time messages having as subjects the strings provided in the subjects parameter.

When Guaranteed Message Delivery is enabled, each MigratoryData server in the cluster maintains an in-memory cache with historical messages for each subject. The cache of each subject is available in all servers of the cluster. The maximum number of messages held in cache is defined by the parameter MaxCachedMessagesPerSubject of the MigratoryData server which defaults to 1,000 messages. The historical messages are continuously removed from the cache, but it is guaranteed that they are available in the cache at least the number of seconds defined by the parameter CacheExpireTime which defaults to 180 seconds.

If the value of numberOfHistoricalMessages is higher than the number of historical messages available in the cache, then the client will receive only the messages available in the cache. As a consequence, if you use a value higher than the value of the parameter MaxCachedMessagesPerSubject of the MigratoryData server (which defaults to 1000), then you will get the entire cache before subscribing for real-time messages for the subjects specified with the API call.

If the value of numberOfHistoricalMessages is 0, then no historical messages have to be retrieved from the cache and, in this case, this API method is equivalent to the API method MigratoryDataClient.subscribe().

Parameters
  • subjects: An array of strings representing subjects.

  • numberOfHistoricalMessages: the number of historical messages to be retrieved from the cache of the server

void unsubscribe(std::vector<std::string> &subjects)

Unsubscribe from one or more subjects.

Parameters
  • subjects: subjects to unsubscribe

void publish(MigratoryDataMessage &message)

Publish a message.

If the message includes a closure data, then a status notification will be provided via MigratoryDataListener.onStatus() to inform whether the message publication has been successful or failed.

Parameters

void setEntitlementToken(std::string &token)

Assign an entitlement token to the client.

To define which users of your application have access to which subjects, you will first have to configure the parameter Entitlement, see the Configuration Guide. If you set this parameter on Custom, then you can use the MigratoryData Extension SDK for Entitlement to build an extension plugin for the MigratoryData server to allow or deny certain users to subscribe to or publish on certain subjects.

Parameters
  • token: a string representing an entitlement token

void getSubjects(std::vector<std::string> &subjects)

Return the list of subscribed subjects.

Return

The list of strings representing the subscribed subjects.

void notifyAfterFailedConnectionAttempts(int n)

Define the number of failed attempts to connect to one or more MigratoryData servers before triggering a status notification MigratoryDataClient.NOTIFY_SERVER_DOWN.

Parameters

void notifyWhenReconnectRateExceedsThreshold(int n)

Define the number of reconnect attempts to one or more MigratoryData servers per 3-minute window before triggering a status notification NOTIFY_RECONNECT_RATE_EXCEEDED.

Parameters
  • n: The number of reconnect attempts to one or more MigratoryData servers per 3-minute window before triggering a status notification NOTIFY_RECONNECT_RATE_EXCEEDED; default value is 15.

void setEncryption(bool encryption)

Configure whether to use SSL/TLS encryption when connecting to a MigratoryData server.

When using encryption you should connect to the ports of the MigratoryData server that are configured with the parameter ListenEncrypted to listen for encrypted connections.

Parameters
  • encrypted: indicate whether or not to use an encrypted SSL/TLS connection to communicate with the server

void setQuickReconnectInitialDelay(int seconds)

Define the number of seconds to wait before attempting to reconnect to the cluster after a connection failure is detected.

Connection Failure Detection

Connection failure is detected immediately for almost all users. For a few users which are subject to temporary, atypical network conditions, connection failure is detected after 30-40 seconds.

Reconnection Phases and Policies

When a connection failure is detected, the API will attempt to reconnect to the servers of the MigratoryData cluster as follows: First, it will attempt to reconnect up to a number of times as defined by MigratoryDataClient.setQuickReconnectMaxRetries() using small delays between retries (Quick Reconnection Phase). If the connection cannot be established after the Quick Reconnection Phase, then the API will attempt to reconnect less frequently according to the policy defined by MigratoryDataClient.setReconnectPolicy().

The delays between retries are computed according to the following algorithm where the values of the variables involved are defined by the API methods having substantially the same names:

Quick Reconnect Phase (retries <= quickReconnectMaxRetries)
-----------------------------------------------------------

   (retries starts with 1 and increment by 1 at each quick reconnect)

   reconnectDelay = quickReconnectInitialDelay * retries - random(0, quickReconnectInitialDelay)

After Quick Reconnect Phase (retries > quickReconnectMaxRetries)
----------------------------------------------------------------

   (reset retries to start with 1 and increment by 1 at each reconnect)

   If reconnectPolicy is CONSTANT_WINDOW_BACKOFF, then

      reconnectDelay = reconnectTimeInterval

   else if reconnectPolicy is TRUNCATED_EXPONENTIAL_BACKOFF, then

      reconnectDelay = min(reconnectTimeInterval * (2 ^ retries) - random(0, reconnectTimeInterval * retries), reconnectMaxDelay)

For a few users which are subject to temporary, atypical network conditions, if reconnectDelay computed with the algorithm above is less than 10 seconds, then it is rounded to 10 seconds.

Parameters
  • seconds: The number of seconds to wait before attempting to reconnect to the cluster; default value is 5 seconds.

void setQuickReconnectMaxRetries(int retries)

Define the maximum number of retries for the Quick Reconnect phase.

Parameters
  • retries: The maximum number of quick reconnect retries; default value is 3.

void setReconnectPolicy(std::string policy)

Define the reconnect policy to be used after the Quick Reconnect phase.

See MigratoryDataClient.setQuickReconnectInitialDelay() to learn about the Quick Reconnect phase and the reconnect schedule for the policy defined by this method.

Parameters

void setReconnectTimeInterval(int seconds)

Define the time interval used for the reconnect schedule after the Quick Reconnect phase.

See MigratoryDataClient.setQuickReconnectInitialDelay() to learn about the Quick Reconnect phase and the reconnect schedule for the policy defined by this method.

Parameters
  • seconds: A time interval expressed in seconds used for reconnect schedule; default is 20 seconds.

void setReconnectMaxDelay(int seconds)

Define the maximum reconnect delay for the MigratoryDataClient.TRUNCATED_EXPONENTIAL_BACKOFF policy.

See MigratoryDataClient.setQuickReconnectInitialDelay() to learn how the value defined by this method is used.

Parameters

void setTransport(std::string transport)

Define the transport type used by the client to communicate with the MigratoryData cluster.

Parameters

~MigratoryDataClient()

Destructor.

Public Members

const std::string NOTIFY_SERVER_UP

A constant which indicates that the client successfully connected to a MigratoryData server.

const std::string NOTIFY_SERVER_DOWN

A constant which indicates that the client failed to connect to a MigratoryData server.

const std::string NOTIFY_DATA_SYNC

A constant which indicates that after a failover reconnection, the client successfully synchronized a subscribed subject with the latest retained message available for that subject, as well as with all messages made available during the failover period for that subject.

const std::string NOTIFY_DATA_RESYNC

A constant which indicates that after a failover reconnection, the client successfully synchronized a subscribed subject with the latest retained message available for that subject, but not with the potential messages made available during the failover period, therefore behaving as a new client.

const std::string NOTIFY_SUBSCRIBE_ALLOW

A constant which indicates that the client was authorized to subscribe to a subject.

const std::string NOTIFY_SUBSCRIBE_DENY

A constant which indicates that the client was not authorized to subscribe to a subject.

const std::string NOTIFY_PUBLISH_DENIED

A constant which indicates that the client was unable to publish a message because it is not allowed by the entitlement system.

const std::string NOTIFY_PUBLISH_OK

A constant which indicates that the client successfully published a message.

const std::string NOTIFY_PUBLISH_FAILED

A constant which indicates that the client was unable to publish a message.

const std::string NOTIFY_MESSAGE_SIZE_LIMIT_EXCEEDED

A constant which indicates that the client was unable to publish a message because the size of the message exceeds the message size limit allowed by the server (see the server parameter MAX_MESSAGE_SIZE).

const std::string NOTIFY_RECONNECT_RATE_EXCEEDED

A constant which indicates that the reconnect rate threshold per 3-minute window has been reached.

See MigratoryDataClient.notifyWhenReconnectRateExceedsThreshold() for more details about this policy.

const std::string CONSTANT_WINDOW_BACKOFF

A constant used to define the reconnect policy.

See MigratoryDataClient.setQuickReconnectInitialDelay() for more details about this policy.

const std::string TRUNCATED_EXPONENTIAL_BACKOFF

A constant used to define the reconnect policy.

See MigratoryDataClient.setQuickReconnectInitialDelay() for more details about this policy.

const std::string TRANSPORT_HTTP

A constant used to define the transport type.

See MigratoryDataClient.setTransport() for more details about this policy.

const std::string TRANSPORT_WEBSOCKET

A constant used to define the transport type.

See MigratoryDataClient.setTransport() for more details about this policy.

Private Members

PushClientImpl *clientImpl
namespace migratorydata
class MigratoryDataListener
#include <client-cpp/MigratoryDataListener.h>

The implementation of this interface will handle the messages received from the server for the subscribed subjects as well as various status notifications.

Use the API method MigratoryDataClient.setListener() to register your listener implementation.

Public Functions

void onMessage(const MigratoryDataMessage &message) = 0

This method handles the real-time messages received from a MigratoryData server for the subscribed subjects.

Parameters

void onStatus(const std::string &status, std::string &info) = 0

This method handles the status notifications.

The possible values of the status parameter are:

  • MigratoryDataClient.NOTIFY_DATA_SYNC indicates that, after a failover reconnection, the client successfully synchronized the subject given in the detail information of the status notification. Moreover, the client recovered all messages made available for that subject during the failover period, if any

  • MigratoryDataClient.NOTIFY_DATA_RESYNC indicates that, after a failover reconnection, the client successfully

    • synchronized the subject given in the detail information; however, the potential messages made available for that subject during

    • the failover period have not been recovered, the client behaving like a new client which only received the most

    • recent retained message available for that subject

  • MigratoryDataClient.NOTIFY_PUBLISH_OK indicates that the client successfully published the message having the closure data provided in the detail information of the status notification

Parameters
  • status: The type of the status notification (see the possible values above).

  • info: The detail information of the status notification.

namespace migratorydata

Enums

enum MigratoryDataLogLevel

This class enumerates the MigratoryData logging levels.

The available log levels ordered by verbosity are:

  • LOG_ERROR (less verbose)

  • LOG_INFO

  • LOG_DEBUG

  • LOG_TRACE (most verbose)

Values:

enumerator LOG_ERROR

The LOG_ERROR level turns on the error logs of the API.

enumerator LOG_INFO

The LOG_INFO level turns on the info, warning, and error logs of the API.

enumerator LOG_DEBUG

The LOG_DEBUG level turns on the debug, info, warning, and error logs of the API.

enumerator LOG_TRACE

The LOG_TRACE level turns on all the logs of the API.

namespace migratorydata
class MigratoryDataLogListener
#include <client-cpp/MigratoryDataLogListener.h>

The implementation of this interface will handle the log messages produced by the library.

Use the API method MigratoryDataClient.setLogListener() to register your log listener implementation.

Public Functions

void onLog(std::string &log, MigratoryDataLogLevel logLevel) = 0

This method handles the logs received from the API.

Parameters
  • log: A string representing a log message.

namespace migratorydata
class MigratoryDataMessage
#include <client-cpp/MigratoryDataMessage.h>

Represent a message.

Public Functions

MigratoryDataMessage()

Default constructor.

MigratoryDataMessage(const MigratoryDataMessage &message)

Copy constructor.

Parameters

MigratoryDataMessage(const std::string &subject, const std::string &content)

Create a MigratoryDataMessage object.

Parameters
  • subject: The subject of the message

  • content: The content of the message

MigratoryDataMessage(const std::string &subject, const std::string &content, const std::string &closure)

Create a MigratoryDataMessage object.

Parameters
  • subject: The subject of the message

  • content: The content of the message

  • closure: The closure of the message (OPTIONAL)

MigratoryDataMessage(const std::string &subject, const std::string &content, const std::string &closure, QoS qos, bool retained, const std::string &replySubject)

Create a MigratoryDataMessage object.

If a reply subject is attached to a message, the message acts as a request message. The clients which receive a request message will be able to reply by sending back one or more messages having as subject the reply subject.

Note: If the reply subject is not already subscribed, it will be subscribed by the library implicitly, and it can be reused for request/reply interactions (and even for receiving multiple replies to one request). When it is not needed anymore, it should be unsubscribed explicitly.

Parameters
  • subject: The subject of the message

  • content: The content of the message

  • closure: The closure of the message (OPTIONAL)

  • qos: the QoS level of the message; the default is QoS.GUARANTEED (OPTIONAL)

  • retained: indicate whether or not the message should be/was retained by the server; the default is true (OPTIONAL)

  • replySubject: the reply subject of the message (OPTIONAL)

std::string getSubject() const

Get the subject of the message.

Return

A string representing the subject of the message

std::string getContent() const

Get the content of the message.

Return

A string representing the content of the message

std::string getClosure() const

Get the closure of the message.

Return

The closure data of the message

bool isRetained() const

Indicate whether or not the message should be/was retained by the server.

Return

true if the message should be, or was, retained by the server

std::string getReplySubject() const

Get the subject to be used to reply to this message.

A client which receives a message containing a reply subject should interpret the message as a request. It has the option to use the reply subject - extracted from the message with this method - to send a reply.

Return

The subject to be used to reply to this message.

QoS getQos() const

Get the QoS level of the message.

Return

the QoS level of the message

MessageType getMessageType() const

Get the MessageType of the message.

Return

the message type of the message

void setCompressed(bool compressed)

Set whether to compress the content of the message or not.

Note: If compression is enabled with this method but the size of the content of the message is smaller than the size of compressed content, then the message will be sent uncompressed to save bandwidth and CPU cycles with the decompression on the receiver side.

Parameters
  • compressed: if true, the content of the message will be published in ZLIB-compressed format; if false no compression format will be used; the default value is false.

bool isCompressed() const

Indicate whether or not the message should be/was compressed.

Return

Return whether or not the message should be/was compressed. If the received message was compressed, the library automatically decompresses its content without any additional code.

std::string toString() const

Return a string representation of the message.

~MigratoryDataMessage()

Destructor.

Protected Attributes

int seq
int epoch
MessageType messageType

Private Members

std::string subject
std::string content
std::string closure
std::string replySubject
bool retained
QoS qos
namespace migratorydata

Enums

enum MessageType

Return a string representation of the message.

Values:

enumerator SNAPSHOT = 1

the message from the server is snapshot type

enumerator UPDATE

the message from the server is update type

enumerator RECOVERED

the message from the server is recovered type

enumerator HISTORICAL

the message from the server is historical type

namespace migratorydata

Enums

enum QoS

The quality of service (QoS) levels for MigratoryData messaging.

Values:

enumerator STANDARD = 0

The QoS.STANDARD should be used for noncritical messages which will not be included in the cache of the MigratoryData cluster.

In this way, after a connection recovery, a client will not receive as part of the recovery process the messages with QoS.STANDARD.

Note: This QoS level corresponds to the at-most-once delivery semantics.

enumerator GUARANTEED

The QoS.GUARANTEED should be used for critical messages which will be included in the cache of the MigratoryData cluster.

In this way, after a connection recovery, a client will receive as part of the recovery process the messages with QoS.GUARANTEED.

Note: This QoS level corresponds to the at-least-once delivery semantics. Also, it might correspond to the exactly-once delivery semantics provided that subscribers are responsible for filtering out duplicate receptions, if it at all matters for the application. Typically, a small buffer containing the identifiers of recently-received messages is sufficient for this task.