MigratoryData Client API for Java
Developer's Guide and Reference Manual
MigratoryDataClient Class Reference

This class implements all the necessary operations for connecting to a cluster of one or more MigratoryData servers, subscribing to subjects, getting real-time messages for the subscribed subjects, and publishing real-time messages. More...

Public Member Functions

 MigratoryDataClient ()
 Create a MigratoryDataClient object.
 
void setLogging (MigratoryDataLogLevel logLevel, File logFile, int logRotateLimit) throws IOException
 Configure the logging parameters. More...
 
void setListener (MigratoryDataListener listener)
 Attach a MigratoryDataListener for handling real-time messages and status notifications. More...
 
MigratoryDataListener getListener ()
 Get the MigratoryDataListener object defined for handling real-time messages and status notifications. More...
 
void setServers (String[] servers) throws UnknownHostException
 Specify a cluster of one or more MigratoryData servers to which the client will connect to. More...
 
void connect ()
 Connect to a MigratoryData cluster. More...
 
void subscribe (List< String > subjects)
 Subscribe to one or more subjects. More...
 
void subscribeWithConflation (List< String > subjects, int conflationMillis)
 Subscribe to one or more subjects with conflation. More...
 
void subscribeWithHistory (List< String > subjects, int numberOfHistoricalMessages)
 Subscribe to one or more subjects after getting historical messages for those subjects. More...
 
void unsubscribe (List< String > subjects)
 Unsubscribe from one or more subjects. More...
 
void setEncryption (boolean b)
 Configure whether to use SSL/TLS encryption when connecting to a MigratoryData server. More...
 
void setEntitlementToken (String token)
 Assign an authorization token to the client. More...
 
Collection< String > getSubjects ()
 Return the list of subscribed subjects. More...
 
void setServersDownBeforeNotify (int n)
 Define the number of failed attempts to connect to one or more MigratoryData servers before triggering a status notification MigratoryDataListener.NOTIFY_SERVER_DOWN. More...
 
void notifyAfterReconnectRetries (int retries)
 Define the number of failed attempts to connect to one or more MigratoryData servers before triggering a status notification MigratoryDataClient.NOTIFY_SERVER_DOWN. More...
 
void disconnect ()
 Disconnect from the connected MigratoryData server and dispose the resources used by the connection. More...
 
void publish (MigratoryDataMessage message) throws Exception
 Publish a message. More...
 
void setQuickReconnectMaxRetries (int retries)
 Define the maximum number of retries for the Quick Reconnect failover phase. More...
 
void setQuickReconnectInitialDelay (int seconds)
 Define the number of seconds to wait before attempting to reconnect to the cluster after a connection failure is detected. More...
 
void setReconnectPolicy (String policy)
 Define the reconnect policy to be used after the Quick Reconnect phase. More...
 
void setReconnectTimeInterval (int seconds)
 Define the time interval used for the reconnect schedule after the Quick Reconnect phase. More...
 
void setReconnectMaxDelay (int seconds)
 Define the maximum reconnect delay for the MigratoryDataListener.TRUNCATED_EXPONENTIAL_BACKOFF policy. More...
 
void setInteractiveEventFilters (Set< InteractiveEventType > interactiveEventTypes, List< String > subjectPrefixes)
 Define a filter for receiving entitlement and subscription notifications for a group of subjects. More...
 
void setExternalToken (String externalToken)
 Assign an external token to a client. More...
 
void setTransport (String type)
 

Detailed Description

This class implements all the necessary operations for connecting to a cluster of one or more MigratoryData servers, subscribing to subjects, getting real-time messages for the subscribed subjects, and publishing real-time messages.

Member Function Documentation

◆ setLogging()

void MigratoryDataClient.setLogging ( MigratoryDataLogLevel  logLevel,
File  logFile,
int  logRotateLimit 
) throws IOException

Configure the logging parameters.

It is advisable to configure this first if you want to log as much as possible. The default log level is MigratoryDataLogLevel.INFO.

Parameters
logLevelThe particular MigratoryDataLogLevel configured as the logging threshold
logFileThe file used to output the logs. For Android applications, set this parameter to null.
logRotateLimitDefine the maximum file size in bytes of the logging file to be used before creating a new logging file (log rotation). To disable log rotation, set this parameter on 0.
Exceptions
IOExceptionIf there is an IO error while trying to configure the logging file.

◆ setListener()

void MigratoryDataClient.setListener ( MigratoryDataListener  listener)

Attach a MigratoryDataListener for handling real-time messages and status notifications.

Parameters
listenerAn instance of a class which implements the MigratoryDataListener interface

◆ getListener()

MigratoryDataListener MigratoryDataClient.getListener ( )

Get the MigratoryDataListener object defined for handling real-time messages and status notifications.

Returns
The instance of a class which implements the MigratoryDataListener interface defined with MigratoryDataClient.setListener()

◆ setServers()

void MigratoryDataClient.setServers ( String []  servers) throws UnknownHostException

Specify a cluster of one or more MigratoryData servers to which the client will connect to.

If you specify two or more MigratoryData servers, then all these MigratoryData servers should provide the same level of data redundancy, by making available for subscription the same set of subjects. This is required for achieving (weighted) load balancing, failover, and guaranteed message delivery of the system. In this way, the MigratoryData servers of the servers list form a cluster.

For example, to connect to a cluster formed of two MigratoryData servers installed at the addresses p1.example.com and p2.example.com, and configured to accept clients on the standard HTTP port 80, the following code can be used:

   client.setServers(new String[] {"p1.example.com:80", "p2.example.com:80"});

To achieve load-balancing, the API connects the client to a MigratoryData server chosen randomly from the servers list. In this way, the load is balanced among all the members of the cluster.

Moreover, the API supports weighted load-balancing. This feature is especially useful if the MigratoryData servers in the cluster are installed on machines with different capacities. You can assign to each member of the cluster a weight ranging from 0 to 100. This weight assignment is a hint provided to the API to select with a higher probability a MigratoryData server with a higher weight either initially when the client connects to the cluster or later during a failover reconnection.

Supposing the address p1.example.com corresponds to a machine that is twice more powerful than the machine having the address p2.example.com, then you can assign to p1.example.com a weight 100 and to p2.example.com a weight 50 by prefixing each address with the assigned weight as follows:

client.setServers(new String[] {"100 p1.example.com:80", "50 p2.example.com:80"});

The API assigns a default weight 100 to the addresses not prefixed with a specific weight.

To achieve failover, if the connection between the client and a MigratoryData server is broken, then the API will automatically detect the failure and will select another MigratoryData server from the servers list. If the client fails to connect to the new selected server, a status notification MigratoryDataListener.NOTIFY_SERVER_DOWN will be triggered (unless you modify the number of failed attempts with MigratoryDataClient.setServersDownBeforeNotify()), and a new MigratoryData server in the cluster will be selected again and again until the client will be able to connect to one of the MigratoryData servers in the cluster. When successfully connected, the API will trigger a status notification MigratoryDataListener.NOTIFY_SERVER_UP.

Furthermore, if guaranteed message delivery is enabled, then the potential messages published for a subscribed subject during the failover period, will be automatically retrieved from the cache of the MigratoryData server to which the client reconnects to and a status notification MigratoryDataListener.NOTIFY_DATA_SYNC will be triggered for that subject.

If, for example, the failover period is abnormally long, and the client is not able to retrieve, after a failover reconnection, the messages published during the failover period for one of its subscribed subjects, then the API will retrieve only the most recent message available for that subject and will trigger a MigratoryDataListener.NOTIFY_DATA_RESYNC status notification for that subject, the client behaving as a new client which connects to the cluster at the moment of the failover reconnection.

For a complete discussion related to load balancing, failover, and guaranteed message delivery features see the MigratoryData Architecture Guide (PDF, HTML).

Parameters
serversAn array of strings where each string represents the network address (IP address or DNS domain name and its corresponding port) of a MigratoryData server, optionally prefixed by a weight ranging from 0 to 100. If the weight prefix is not provided to an address, then the API will automatically assign to that address a default weight 100.
Exceptions
UnknownHostExceptionIf the address of a MigratoryData server could not be determined

◆ connect()

void MigratoryDataClient.connect ( )

Connect to a MigratoryData cluster.

This API call can be used to connect to one of the MigratoryData servers specified with MigratoryDataClient.setServers().

Please note that a connection is automatically made during the first subscription using the API call MigratoryDataClient.subscribe() or during the first publication using the API call MigratoryDataClient.publish().

Therefore, if the creation a MigratoryDataClient object is immediately followed by a subscribe or publish operation, then the use of this API call is not necessary. Otherwise, use this API call to connect to a MigratoryData cluster.

◆ subscribe()

void MigratoryDataClient.subscribe ( List< String >  subjects)

Subscribe to one or more subjects.

Subscribe for real-time messages having as subjects the strings provided in the subjects parameter.

As an example, supposing messages are market data updates having as subjects stock names. Then, to subscribe for the messages having as subjects /stocks/NYSE/IBM and /stocks/Nasdaq/MSFT the following code will be used:

    List<String> subjects = new ArrayList<String>();
    subjects.add("/stocks/NYSE/IBM");
    subjects.add("/stocks/Nasdaq/MSFT");
    client.subscribe(subjects);

The subjects are strings having a particular syntax. See the Chapter "Concepts" in the MigratoryData Architecture Guide (PDF, HTML) to learn about the syntax of the subjects.

Parameters
subjectsAn array of strings representing subjects.

◆ subscribeWithConflation()

void MigratoryDataClient.subscribeWithConflation ( List< String >  subjects,
int  conflationMillis 
)

Subscribe to one or more subjects with conflation.

Subscribe for real-time messages having as subjects the strings provided in the subjects parameter.

If the optional parameter conflationMillis is used, then for each subject in the subjects list given in argument, its messages will be aggregated in the MigratoryData server and published every conflationMillis milliseconds as aggregated data (containing only the latest value for that subject and its latest field values). The value of conflationMillis should be a multiple of 100 milliseconds, otherwise the MigratoryData server will round it to the nearest value multiple of 100 milliseconds (e.g. 76 will be rounded to 0, 130 will be rounded to 100, 789 will be rounded to 700, ...). If the value of conflationMillis is 0 (or is rounded to 0), then no conflation will apply, and data publication will be message-by-message with no message aggregation.

As an example, supposing the messages are market data updates having as subjects stock names. Then, to subscribe for the messages having as subjects /stocks/NYSE/IBM and /stocks/Nasdaq/MSFT using 1-second conflation the following code will be used:

    List<String> subjects = new ArrayList<String>();
    subjects.add("/stocks/NYSE/IBM");
    subjects.add("/stocks/Nasdaq/MSFT");
    client.subscribeWithConflation(subjects, 1000);

The subjects are strings having a particular particular syntax. See the Chapter "Concepts" in the MigratoryData Architecture Guide (PDF, HTML) to learn about the syntax of the subjects.

Parameters
subjectsAn array of strings representing subjects.
conflationMillisAn optional argument defining the number of milliseconds used to aggregate ("conflate") the messages for each subject in the subjects list; default value is 0 meaning that no conflation will apply, and data publication will be message-by-message with no message aggregation.

◆ subscribeWithHistory()

void MigratoryDataClient.subscribeWithHistory ( List< String >  subjects,
int  numberOfHistoricalMessages 
)

Subscribe to one or more subjects after getting historical messages for those subjects.

Attempt to get the number of historical messages as defined by the argument numberOfHistoricalMessages, for each subject in the argument subjects, then subscribe for real-time messages having as subjects the strings provided in the subjects parameter.

When Guranteed Message Delivery is enabled, each MigratoryData server in the cluster maintains an in-memory cache with historical messages for each subject. The cache of each subject is available in all servers of the cluster. The maximum number of messages held in cache is defined by the parameter MaxCachedMessagesPerSubject of the MigratoryData server which defaults to 1,000 messages. The historical messages are continuously removed from the cache, but it is guaranteed that they are available in the cache at least the number of seconds defined by the parameter CacheExpireTime which defaults to 180 seconds.

If the value of numberOfHistoricalMessages is higher than the number of historical messages available in the cache, then the client will receive only the messages available in the cache. As a consequence, if you use a value higher than the value of the parameter MaxCachedMessagesPerSubject of the MigratoryData server (which defaults to 1000), then you will get the entire cache before subscribing for real-time messages for the subjects specified with the API call.

    client.subscribeWithHistory(Arrays.asList("/stocks/NYSE/IBM", "/stocks/Nasdaq/MSFT"), 10);

The subjects are strings having a particular syntax. See the Chapter "Concepts" in the MigratoryData Architecture Guide (PDF, HTML) to learn about the syntax of the subjects.

Parameters
subjectsAn array of strings representing subjects.
numberOfHistoricalMessagesThe number of historical messages to be retrieved from the cache of the MigratoryData server. A value 0 means that no historical messages has to be retrieved and, in this case, this API method is equivalent to the API method MigratoryDataClient.subscribe(). A value larger that the value of the parameter MaxCachedMessagesPerSubject means the entire cache is retrieved.

◆ unsubscribe()

void MigratoryDataClient.unsubscribe ( List< String >  subjects)

Unsubscribe from one or more subjects.

Unsubscribe from the subscribed subjects provided in the subjects parameter.

Parameters
subjectsAn array of strings representing subjects.

◆ setEncryption()

void MigratoryDataClient.setEncryption ( boolean  b)

Configure whether to use SSL/TLS encryption when connecting to a MigratoryData server.

When using encryption you have to connect to the ports of the MigratoryData servers that are configured to listen for encrypted connections. See the parameter ListenEncrypted in the MigratoryData Configuration Guide (PDF, HTML).

Parameters
bDetermine whether the client connects to the MigratoryData server using an encrypted SSL/TLS connection

◆ setEntitlementToken()

void MigratoryDataClient.setEntitlementToken ( String  token)

Assign an authorization token to the client.

To define which users of your application have access to which subjects, you will first have to set the parameter Entitlement on true in the configuration file of the MigratoryData server — see the parameter Entitlement in the MigratoryData Configuration Guide (PDF, HTML).

Then, you will have to use the entitlement-related part of the MigratoryData Extension API to allow or deny certain users to subscribe / publish to certain subjects.

Parameters
tokenA string representing an authorization token.

◆ getSubjects()

Collection<String> MigratoryDataClient.getSubjects ( )

Return the list of subscribed subjects.

Returns
The list of strings representing the subscribed subjects.

◆ setServersDownBeforeNotify()

void MigratoryDataClient.setServersDownBeforeNotify ( int  n)

Define the number of failed attempts to connect to one or more MigratoryData servers before triggering a status notification MigratoryDataListener.NOTIFY_SERVER_DOWN.

Deprecated:
use notifyAfterReconnectRetries
Parameters
nThe number of the failed attempts to connect to one or more MigratoryData servers before triggering a status notification MigratoryDataListener.NOTIFY_SERVER_DOWN; default value is 1.

◆ notifyAfterReconnectRetries()

void MigratoryDataClient.notifyAfterReconnectRetries ( int  retries)

Define the number of failed attempts to connect to one or more MigratoryData servers before triggering a status notification MigratoryDataClient.NOTIFY_SERVER_DOWN.

Parameters
retriesThe number of the failed attempts to connect to one or more MigratoryData servers before triggering a status notification MigratoryDataClient.NOTIFY_SERVER_DOWN; default value is 1.

◆ disconnect()

void MigratoryDataClient.disconnect ( )

Disconnect from the connected MigratoryData server and dispose the resources used by the connection.

This method should be called when the connection is no longer necessary.

◆ publish()

void MigratoryDataClient.publish ( MigratoryDataMessage  message) throws Exception

Publish a message.

If the message includes a closure data, then a status notification will be provided via MigratoryDataListener.onStatus() to inform whether the message publication has been successful or failed.

Parameters
messageA MigratoryDataMessage message

◆ setQuickReconnectMaxRetries()

void MigratoryDataClient.setQuickReconnectMaxRetries ( int  retries)

Define the maximum number of retries for the Quick Reconnect failover phase.

Parameters
retriesThe maximum number of quick reconnect retries; default value is 3.

◆ setQuickReconnectInitialDelay()

void MigratoryDataClient.setQuickReconnectInitialDelay ( int  seconds)

Define the number of seconds to wait before attempting to reconnect to the cluster after a connection failure is detected.

Connection Failure Detection

Connection failure is detected immediately for almost all users. For a few users which are subject to temporary, atypical network conditions, connection failure is detected after 30-40 seconds.

Reconnection Phases and Policies

When a connection failure is detected, the API will attempt to reconnect to the servers of the MigratoryData cluster as follows: First, it will attempt to reconnect up to a number of times as defined by MigratoryDataClient.setQuickReconnectMaxRetries() using small delays between retries (Quick Reconnection Phase). If the connection cannot be established after the Quick Reconnection Phase, then the API will attempt to reconnect less frequently according to the policy defined by MigratoryDataClient.setReconnectPolicy().

The delays between retries are computed according to the following algorithm where the values of the variables involved are defined by the API methods having substantially the same names:

   Quick Reconnect Phase (retries <= quickReconnectMaxRetries)
   -----------------------------------------------------------
      (retries starts with 1 and increment by 1 at each quick reconnect)
      reconnectDelay = quickReconnectInitialDelay * retries - random(0, quickReconnectInitialDelay)
   After Quick Reconnect Phase (retries > quickReconnectMaxRetries)
   ----------------------------------------------------------------
      (reset retries to start with 1 and increment by 1 at each reconnect)
      If reconnectPolicy is CONSTANT_WINDOW_BACKOFF, then
         reconnectDelay = reconnectTimeInterval
      else if reconnectPolicy is TRUNCATED_EXPONENTIAL_BACKOFF, then
         reconnectDelay = min(reconnectTimeInterval * (2 ^ retries) - random(0, reconnectTimeInterval * retries), reconnectMaxDelay)

For a few users which are subject to temporary, atypical network conditions, if reconnectDelay computed with the algorithm above is less than 10 seconds, then it is rounded to 10 seconds.

Parameters
secondsThe number of seconds to wait before attempting to reconnect to the cluster; default value is 5 seconds.

◆ setReconnectPolicy()

void MigratoryDataClient.setReconnectPolicy ( String  policy)

Define the reconnect policy to be used after the Quick Reconnect phase.

See MigratoryDataClient.setQuickReconnectInitialDelay() to learn about the Quick Reconnect phase and the reconnect schedule for the policy defined by this method.

Parameters
policyThe reconnect policy to be used after the Quick Reconnect phase. The possible values are MigratoryDataListener.CONSTANT_WINDOW_BACKOFF and MigratoryDataListener.TRUNCATED_EXPONENTIAL_BACKOFF; the default value is MigratoryDataListener.TRUNCATED_EXPONENTIAL_BACKOFF.

◆ setReconnectTimeInterval()

void MigratoryDataClient.setReconnectTimeInterval ( int  seconds)

Define the time interval used for the reconnect schedule after the Quick Reconnect phase.

See MigratoryDataClient.setQuickReconnectInitialDelay() to learn about the Quick Reconnect phase and how the value defined by this API method is used for the reconnect schedule.

Parameters
secondsA time interval expressed in seconds used for reconnect schedule; default is 20 seconds.

◆ setReconnectMaxDelay()

void MigratoryDataClient.setReconnectMaxDelay ( int  seconds)

Define the maximum reconnect delay for the MigratoryDataListener.TRUNCATED_EXPONENTIAL_BACKOFF policy.

See MigratoryDataClient.setQuickReconnectInitialDelay() to learn how the value defined by this API method is used.

Parameters
secondsThe maximum reconnect delay when the policy MigratoryDataListener.TRUNCATED_EXPONENTIAL_BACKOFF is used; default value is 360 seconds.

◆ setInteractiveEventFilters()

void MigratoryDataClient.setInteractiveEventFilters ( Set< InteractiveEventType >  interactiveEventTypes,
List< String >  subjectPrefixes 
)

Define a filter for receiving entitlement and subscription notifications for a group of subjects.

The Coupling extension of the MigratoryData server is required in order to be able to use this function.

Parameters
interactiveEventTypesDefine the list of event types for which you will receive notifications. Currently, there are two types: entitlement notifications and subscription notifications.
subjectPrefixesOne or more strings representing the prefixes (the first subject segment) of the subjects for which you desire to receive notifications. For example, if you define "stocks" in this list, you will receive entitlement and subscriptions notifications for all subjects starting with the prefix "stocks", for example "/stocks/IBM", "/stocks/MFTS", etc.

◆ setExternalToken()

void MigratoryDataClient.setExternalToken ( String  externalToken)

Assign an external token to a client.

An external token which is provided by a client using this method is typically used by a MigratoryData plugin to enable that client to communicate with an external service.

For example the MigratoryData plugin for Firebase needs an FCM token in order to be able to push notifications via the Firebase service to a mobile client. The mobile client can provide the FCM token to the plugin using this method.

Parameters
externalTokenA string representing an external token

◆ setTransport()

void MigratoryDataClient.setTransport ( String  type)

Define the transport type used by the client to communicate with the MigratoryData cluster.

Parameters
typethe possible values are: MigratoryDataListener.TRANSPORT_HTTP and MigratoryDataListener.TRANSPORT_WEBSOCKET; the default value is the first one