MigratoryData Client API for Java
Developer's Guide and Reference Manual
|
This class implements all the necessary operations for connecting to a cluster of one or more MigratoryData servers, subscribing to subjects, getting real-time messages for the subscribed subjects, and publishing real-time messages. More...
Public Member Functions | |
MigratoryDataClient () | |
Create a MigratoryDataClient object. | |
void | setLogging (MigratoryDataLogLevel logLevel, File logFile, int logRotateLimit) throws IOException |
Configure the logging parameters. More... | |
void | setListener (MigratoryDataListener listener) |
Attach a MigratoryDataListener for handling real-time messages and status notifications. More... | |
MigratoryDataListener | getListener () |
Get the MigratoryDataListener object defined for handling real-time messages and status notifications. More... | |
void | setServers (String[] servers) throws UnknownHostException |
Specify a cluster of one or more MigratoryData servers to which the client will connect to. More... | |
void | connect () |
Connect to a MigratoryData cluster. More... | |
void | subscribe (List< String > subjects) |
Subscribe to one or more subjects. More... | |
void | subscribeWithConflation (List< String > subjects, int conflationMillis) |
Subscribe to one or more subjects with conflation. More... | |
void | subscribeWithHistory (List< String > subjects, int numberOfHistoricalMessages) |
Subscribe to one or more subjects after getting historical messages for those subjects. More... | |
void | unsubscribe (List< String > subjects) |
Unsubscribe from one or more subjects. More... | |
void | setEncryption (boolean b) |
Configure whether to use SSL/TLS encryption when connecting to a MigratoryData server. More... | |
void | setEntitlementToken (String token) |
Assign an authorization token to the client. More... | |
Collection< String > | getSubjects () |
Return the list of subscribed subjects. More... | |
void | setServersDownBeforeNotify (int n) |
Define the number of failed attempts to connect to one or more MigratoryData servers before triggering a status notification MigratoryDataListener.NOTIFY_SERVER_DOWN. More... | |
void | notifyAfterReconnectRetries (int retries) |
Define the number of failed attempts to connect to one or more MigratoryData servers before triggering a status notification MigratoryDataClient.NOTIFY_SERVER_DOWN. More... | |
void | disconnect () |
Disconnect from the connected MigratoryData server and dispose the resources used by the connection. More... | |
void | publish (MigratoryDataMessage message) throws Exception |
Publish a message. More... | |
void | setQuickReconnectMaxRetries (int retries) |
Define the maximum number of retries for the Quick Reconnect failover phase. More... | |
void | setQuickReconnectInitialDelay (int seconds) |
Define the number of seconds to wait before attempting to reconnect to the cluster after a connection failure is detected. More... | |
void | setReconnectPolicy (String policy) |
Define the reconnect policy to be used after the Quick Reconnect phase. More... | |
void | setReconnectTimeInterval (int seconds) |
Define the time interval used for the reconnect schedule after the Quick Reconnect phase. More... | |
void | setReconnectMaxDelay (int seconds) |
Define the maximum reconnect delay for the MigratoryDataListener.TRUNCATED_EXPONENTIAL_BACKOFF policy. More... | |
void | setInteractiveEventFilters (Set< InteractiveEventType > interactiveEventTypes, List< String > subjectPrefixes) |
Define a filter for receiving entitlement and subscription notifications for a group of subjects. More... | |
void | setExternalToken (String externalToken) |
Assign an external token to a client. More... | |
void | setTransport (String type) |
This class implements all the necessary operations for connecting to a cluster of one or more MigratoryData servers, subscribing to subjects, getting real-time messages for the subscribed subjects, and publishing real-time messages.
void MigratoryDataClient.setLogging | ( | MigratoryDataLogLevel | logLevel, |
File | logFile, | ||
int | logRotateLimit | ||
) | throws IOException |
Configure the logging parameters.
It is advisable to configure this first if you want to log as much as possible. The default log level is MigratoryDataLogLevel.INFO.
logLevel | The particular MigratoryDataLogLevel configured as the logging threshold |
logFile | The file used to output the logs. For Android applications, set this parameter to null . |
logRotateLimit | Define the maximum file size in bytes of the logging file to be used before creating a new logging file (log rotation). To disable log rotation, set this parameter on 0 . |
IOException | If there is an IO error while trying to configure the logging file. |
void MigratoryDataClient.setListener | ( | MigratoryDataListener | listener | ) |
Attach a MigratoryDataListener for handling real-time messages and status notifications.
listener | An instance of a class which implements the MigratoryDataListener interface |
MigratoryDataListener MigratoryDataClient.getListener | ( | ) |
Get the MigratoryDataListener object defined for handling real-time messages and status notifications.
void MigratoryDataClient.setServers | ( | String [] | servers | ) | throws UnknownHostException |
Specify a cluster of one or more MigratoryData servers to which the client will connect to.
If you specify two or more MigratoryData servers, then all these MigratoryData servers should provide the same level of data redundancy, by making available for subscription the same set of subjects. This is required for achieving (weighted) load balancing, failover, and guaranteed message delivery of the system. In this way, the MigratoryData servers of the servers
list form a cluster.
For example, to connect to a cluster formed of two MigratoryData servers installed at the addresses p1.example.com
and p2.example.com
, and configured to accept clients on the standard HTTP port 80
, the following code can be used:
client.setServers(new String[] {"p1.example.com:80", "p2.example.com:80"});
To achieve load-balancing, the API connects the client to a MigratoryData server chosen randomly from the servers
list. In this way, the load is balanced among all the members of the cluster.
Moreover, the API supports weighted load-balancing. This feature is especially useful if the MigratoryData servers in the cluster are installed on machines with different capacities. You can assign to each member of the cluster a weight ranging from
0
to100
. This weight assignment is a hint provided to the API to select with a higher probability a MigratoryData server with a higher weight either initially when the client connects to the cluster or later during a failover reconnection.Supposing the address
p1.example.com
corresponds to a machine that is twice more powerful than the machine having the addressp2.example.com
, then you can assign top1.example.com
a weight100
and top2.example.com
a weight50
by prefixing each address with the assigned weight as follows:
client.setServers(new String[] {"100 p1.example.com:80", "50 p2.example.com:80"});
The API assigns a default weight
100
to the addresses not prefixed with a specific weight.
To achieve failover, if the connection between the client and a MigratoryData server is broken, then the API will automatically detect the failure and will select another MigratoryData server from the servers
list. If the client fails to connect to the new selected server, a status notification MigratoryDataListener.NOTIFY_SERVER_DOWN will be triggered (unless you modify the number of failed attempts with MigratoryDataClient.setServersDownBeforeNotify()), and a new MigratoryData server in the cluster will be selected again and again until the client will be able to connect to one of the MigratoryData servers in the cluster. When successfully connected, the API will trigger a status notification MigratoryDataListener.NOTIFY_SERVER_UP.
Furthermore, if guaranteed message delivery is enabled, then the potential messages published for a subscribed subject during the failover period, will be automatically retrieved from the cache of the MigratoryData server to which the client reconnects to and a status notification MigratoryDataListener.NOTIFY_DATA_SYNC will be triggered for that subject.
If, for example, the failover period is abnormally long, and the client is not able to retrieve, after a failover reconnection, the messages published during the failover period for one of its subscribed subjects, then the API will retrieve only the most recent message available for that subject and will trigger a MigratoryDataListener.NOTIFY_DATA_RESYNC status notification for that subject, the client behaving as a new client which connects to the cluster at the moment of the failover reconnection.
For a complete discussion related to load balancing, failover, and guaranteed message delivery features see the MigratoryData Architecture Guide (PDF, HTML).
servers | An array of strings where each string represents the network address (IP address or DNS domain name and its corresponding port) of a MigratoryData server, optionally prefixed by a weight ranging from 0 to 100 . If the weight prefix is not provided to an address, then the API will automatically assign to that address a default weight 100 . |
UnknownHostException | If the address of a MigratoryData server could not be determined |
void MigratoryDataClient.connect | ( | ) |
Connect to a MigratoryData cluster.
This API call can be used to connect to one of the MigratoryData servers specified with MigratoryDataClient.setServers().
Please note that a connection is automatically made during the first subscription using the API call MigratoryDataClient.subscribe() or during the first publication using the API call MigratoryDataClient.publish().
Therefore, if the creation a MigratoryDataClient object is immediately followed by a subscribe or publish operation, then the use of this API call is not necessary. Otherwise, use this API call to connect to a MigratoryData cluster.
void MigratoryDataClient.subscribe | ( | List< String > | subjects | ) |
Subscribe to one or more subjects.
Subscribe for real-time messages having as subjects the strings provided in the subjects
parameter.
As an example, supposing messages are market data updates having as subjects stock names. Then, to subscribe for the messages having as subjects /stocks/NYSE/IBM
and /stocks/Nasdaq/MSFT
the following code will be used:
List<String> subjects = new ArrayList<String>(); subjects.add("/stocks/NYSE/IBM"); subjects.add("/stocks/Nasdaq/MSFT"); client.subscribe(subjects);
The subjects are strings having a particular syntax. See the Chapter "Concepts" in the MigratoryData Architecture Guide (PDF, HTML) to learn about the syntax of the subjects.
subjects | An array of strings representing subjects. |
void MigratoryDataClient.subscribeWithConflation | ( | List< String > | subjects, |
int | conflationMillis | ||
) |
Subscribe to one or more subjects with conflation.
Subscribe for real-time messages having as subjects the strings provided in the subjects
parameter.
If the optional parameter conflationMillis
is used, then for each subject in the subjects
list given in argument, its messages will be aggregated in the MigratoryData server and published every conflationMillis
milliseconds as aggregated data (containing only the latest value for that subject and its latest field values). The value of conflationMillis
should be a multiple of 100
milliseconds, otherwise the MigratoryData server will round it to the nearest value multiple of 100
milliseconds (e.g. 76
will be rounded to 0
, 130
will be rounded to 100
, 789
will be rounded to 700
, ...). If the value of conflationMillis
is 0
(or is rounded to 0
), then no conflation will apply, and data publication will be message-by-message with no message aggregation.
As an example, supposing the messages are market data updates having as subjects stock names. Then, to subscribe for the messages having as subjects /stocks/NYSE/IBM
and /stocks/Nasdaq/MSFT
using 1-second conflation the following code will be used:
List<String> subjects = new ArrayList<String>(); subjects.add("/stocks/NYSE/IBM"); subjects.add("/stocks/Nasdaq/MSFT"); client.subscribeWithConflation(subjects, 1000);
The subjects are strings having a particular particular syntax. See the Chapter "Concepts" in the MigratoryData Architecture Guide (PDF, HTML) to learn about the syntax of the subjects.
subjects | An array of strings representing subjects. |
conflationMillis | An optional argument defining the number of milliseconds used to aggregate ("conflate") the messages for each subject in the subjects list; default value is 0 meaning that no conflation will apply, and data publication will be message-by-message with no message aggregation. |
void MigratoryDataClient.subscribeWithHistory | ( | List< String > | subjects, |
int | numberOfHistoricalMessages | ||
) |
Subscribe to one or more subjects after getting historical messages for those subjects.
Attempt to get the number of historical messages as defined by the argument numberOfHistoricalMessages
, for each subject in the argument subjects
, then subscribe for real-time messages having as subjects the strings provided in the subjects
parameter.
When Guranteed Message Delivery is enabled, each MigratoryData server in the cluster maintains an in-memory cache with historical messages for each subject. The cache of each subject is available in all servers of the cluster. The maximum number of messages held in cache is defined by the parameter MaxCachedMessagesPerSubject
of the MigratoryData server which defaults to 1,000 messages. The historical messages are continuously removed from the cache, but it is guaranteed that they are available in the cache at least the number of seconds defined by the parameter CacheExpireTime
which defaults to 180 seconds.
If the value of numberOfHistoricalMessages
is higher than the number of historical messages available in the cache, then the client will receive only the messages available in the cache. As a consequence, if you use a value higher than the value of the parameter MaxCachedMessagesPerSubject
of the MigratoryData server (which defaults to 1000), then you will get the entire cache before subscribing for real-time messages for the subjects specified with the API call.
client.subscribeWithHistory(Arrays.asList("/stocks/NYSE/IBM", "/stocks/Nasdaq/MSFT"), 10);
The subjects are strings having a particular syntax. See the Chapter "Concepts" in the MigratoryData Architecture Guide (PDF, HTML) to learn about the syntax of the subjects.
subjects | An array of strings representing subjects. |
numberOfHistoricalMessages | The number of historical messages to be retrieved from the cache of the MigratoryData server. A value 0 means that no historical messages has to be retrieved and, in this case, this API method is equivalent to the API method MigratoryDataClient.subscribe(). A value larger that the value of the parameter MaxCachedMessagesPerSubject means the entire cache is retrieved. |
void MigratoryDataClient.unsubscribe | ( | List< String > | subjects | ) |
Unsubscribe from one or more subjects.
Unsubscribe from the subscribed subjects provided in the subjects
parameter.
subjects | An array of strings representing subjects. |
void MigratoryDataClient.setEncryption | ( | boolean | b | ) |
Configure whether to use SSL/TLS encryption when connecting to a MigratoryData server.
When using encryption you have to connect to the ports of the MigratoryData servers that are configured to listen for encrypted connections. See the parameter ListenEncrypted
in the MigratoryData Configuration Guide (PDF, HTML).
b | Determine whether the client connects to the MigratoryData server using an encrypted SSL/TLS connection |
void MigratoryDataClient.setEntitlementToken | ( | String | token | ) |
Assign an authorization token to the client.
To define which users of your application have access to which subjects, you will first have to set the parameter Entitlement
on true
in the configuration file of the MigratoryData server — see the parameter Entitlement
in the MigratoryData Configuration Guide (PDF, HTML).
Then, you will have to use the entitlement-related part of the MigratoryData Extension API to allow or deny certain users to subscribe / publish to certain subjects.
token | A string representing an authorization token. |
Collection<String> MigratoryDataClient.getSubjects | ( | ) |
Return the list of subscribed subjects.
void MigratoryDataClient.setServersDownBeforeNotify | ( | int | n | ) |
Define the number of failed attempts to connect to one or more MigratoryData servers before triggering a status notification MigratoryDataListener.NOTIFY_SERVER_DOWN.
n | The number of the failed attempts to connect to one or more MigratoryData servers before triggering a status notification MigratoryDataListener.NOTIFY_SERVER_DOWN; default value is 1 . |
void MigratoryDataClient.notifyAfterReconnectRetries | ( | int | retries | ) |
Define the number of failed attempts to connect to one or more MigratoryData servers before triggering a status notification MigratoryDataClient.NOTIFY_SERVER_DOWN.
retries | The number of the failed attempts to connect to one or more MigratoryData servers before triggering a status notification MigratoryDataClient.NOTIFY_SERVER_DOWN; default value is 1 . |
void MigratoryDataClient.disconnect | ( | ) |
Disconnect from the connected MigratoryData server and dispose the resources used by the connection.
This method should be called when the connection is no longer necessary.
void MigratoryDataClient.publish | ( | MigratoryDataMessage | message | ) | throws Exception |
Publish a message.
If the message includes a closure data, then a status notification will be provided via MigratoryDataListener.onStatus() to inform whether the message publication has been successful or failed.
message | A MigratoryDataMessage message |
void MigratoryDataClient.setQuickReconnectMaxRetries | ( | int | retries | ) |
Define the maximum number of retries for the Quick Reconnect failover phase.
retries | The maximum number of quick reconnect retries; default value is 3 . |
void MigratoryDataClient.setQuickReconnectInitialDelay | ( | int | seconds | ) |
Define the number of seconds to wait before attempting to reconnect to the cluster after a connection failure is detected.
Connection failure is detected immediately for almost all users. For a few users which are subject to temporary, atypical network conditions, connection failure is detected after 30-40 seconds.
When a connection failure is detected, the API will attempt to reconnect to the servers of the MigratoryData cluster as follows: First, it will attempt to reconnect up to a number of times as defined by MigratoryDataClient.setQuickReconnectMaxRetries() using small delays between retries (Quick Reconnection Phase). If the connection cannot be established after the Quick Reconnection Phase, then the API will attempt to reconnect less frequently according to the policy defined by MigratoryDataClient.setReconnectPolicy().
The delays between retries are computed according to the following algorithm where the values of the variables involved are defined by the API methods having substantially the same names:
Quick Reconnect Phase (retries <= quickReconnectMaxRetries) -----------------------------------------------------------
(retries starts with 1 and increment by 1 at each quick reconnect)
reconnectDelay = quickReconnectInitialDelay * retries - random(0, quickReconnectInitialDelay)
After Quick Reconnect Phase (retries > quickReconnectMaxRetries) ----------------------------------------------------------------
(reset retries to start with 1 and increment by 1 at each reconnect)
If reconnectPolicy is CONSTANT_WINDOW_BACKOFF, then
reconnectDelay = reconnectTimeInterval
else if reconnectPolicy is TRUNCATED_EXPONENTIAL_BACKOFF, then
reconnectDelay = min(reconnectTimeInterval * (2 ^ retries) - random(0, reconnectTimeInterval * retries), reconnectMaxDelay)
For a few users which are subject to temporary, atypical network conditions, if reconnectDelay
computed with the algorithm above is less than 10 seconds, then it is rounded to 10 seconds.
seconds | The number of seconds to wait before attempting to reconnect to the cluster; default value is 5 seconds. |
void MigratoryDataClient.setReconnectPolicy | ( | String | policy | ) |
Define the reconnect policy to be used after the Quick Reconnect phase.
See MigratoryDataClient.setQuickReconnectInitialDelay() to learn about the Quick Reconnect phase and the reconnect schedule for the policy defined by this method.
policy | The reconnect policy to be used after the Quick Reconnect phase. The possible values are MigratoryDataListener.CONSTANT_WINDOW_BACKOFF and MigratoryDataListener.TRUNCATED_EXPONENTIAL_BACKOFF; the default value is MigratoryDataListener.TRUNCATED_EXPONENTIAL_BACKOFF. |
void MigratoryDataClient.setReconnectTimeInterval | ( | int | seconds | ) |
Define the time interval used for the reconnect schedule after the Quick Reconnect phase.
See MigratoryDataClient.setQuickReconnectInitialDelay() to learn about the Quick Reconnect phase and how the value defined by this API method is used for the reconnect schedule.
seconds | A time interval expressed in seconds used for reconnect schedule; default is 20 seconds. |
void MigratoryDataClient.setReconnectMaxDelay | ( | int | seconds | ) |
Define the maximum reconnect delay for the MigratoryDataListener.TRUNCATED_EXPONENTIAL_BACKOFF policy.
See MigratoryDataClient.setQuickReconnectInitialDelay() to learn how the value defined by this API method is used.
seconds | The maximum reconnect delay when the policy MigratoryDataListener.TRUNCATED_EXPONENTIAL_BACKOFF is used; default value is 360 seconds. |
void MigratoryDataClient.setInteractiveEventFilters | ( | Set< InteractiveEventType > | interactiveEventTypes, |
List< String > | subjectPrefixes | ||
) |
Define a filter for receiving entitlement and subscription notifications for a group of subjects.
The Coupling extension of the MigratoryData server is required in order to be able to use this function.
interactiveEventTypes | Define the list of event types for which you will receive notifications. Currently, there are two types: entitlement notifications and subscription notifications. |
subjectPrefixes | One or more strings representing the prefixes (the first subject segment) of the subjects for which you desire to receive notifications. For example, if you define "stocks" in this list, you will receive entitlement and subscriptions notifications for all subjects starting with the prefix "stocks", for example "/stocks/IBM", "/stocks/MFTS", etc. |
void MigratoryDataClient.setExternalToken | ( | String | externalToken | ) |
Assign an external token to a client.
An external token which is provided by a client using this method is typically used by a MigratoryData plugin to enable that client to communicate with an external service.
For example the MigratoryData plugin for Firebase needs an FCM token in order to be able to push notifications via the Firebase service to a mobile client. The mobile client can provide the FCM token to the plugin using this method.
externalToken | A string representing an external token |
void MigratoryDataClient.setTransport | ( | String | type | ) |
Define the transport type used by the client to communicate with the MigratoryData cluster.
type | the possible values are: MigratoryDataListener.TRANSPORT_HTTP and MigratoryDataListener.TRANSPORT_WEBSOCKET; the default value is the first one |