Introduction
This document provides instructions for configuring and utilizing the File Transport Adapter (TA) within FIXEdge. The File TA is a specialized software component designed to efficiently process and manipulate financial information exchange (FIX) protocol messages, subsequently storing them in Comma Separated Values (CSV) files for easy accessibility and readability. This adapter effectively serves as a bridge, facilitating secure and orderly storage of FIX messages captured during financial transactions.
Key Functionalities
Message Processing and Storage: The core function of the File Transport Adapter is to receive FIX protocol messages, commonly used in trading and investment environments. After processing, these messages are converted and stored as CSV files, enabling a structured format that is both human-readable and machine-friendly.
Sorting of Messages: Optionally, this component can be configured to sort the FIX messages based on specific criteria such as timestamp, message type, or any other relevant header or data fields within the messages. This sorting functionality aids in organized data analysis and reporting.
Group Splitting: To enhance data segmentation and improve clarity, the adapter supports splitting message groups into individual rows within the output CSV files. This feature allows users to fine-tune how data is segmented in the resultant files, making it more digestible and easier to analyze distinct elements of grouped data.
Max Rows Limit: For ease of management and to prevent excessively large output files, the adapter includes an option to set maximum row limits for each CSV file. Once the limit is reached, the adapter can start populating a new file, thereby segmenting data logistically over multiple files when necessary.
Configurable Start/Stop Time: The adapter can be configured to operate between specified start and stop times, thereby enabling it to handle data during peak hours, after hours batch processing, or specific operational windows tailored to business needs. This scheduling feature ensures that system resources are optimized and that data handling is aligned with user requirements.
High Configurability: Users can tailor functionalities to fit specific needs through a simple configuration interface. This flexibility allows for changes in parameters like sorting field, group splitting, file size management, and operational timings without needing deep dives into the codebase.
Performance and Scalability: Designed with performance in mind, the File Transport Adapter ensures minimal latency in message processing and is scalable to handle high volumes of messages typical of financial market environments.
Transport Adapter Configuration
To use the File Transport Adapter, the following properties should be specified in the FIXEdge.properties file:
... TransportLayer.TransportAdapters = TransportLayer.FileTA TransportLayer.FileTA.Description = File Transport Adaptor TransportLayer.FileTA.DllName = bin/FileTADll.dll TransportLayer.FileTA.FileTASessions = CSVInstrumentsExport TransportLayer.FileTA.FileTASession.CSVInstrumentsExport.StreamsConfigurationFile = ./conf/streams.json TransportLayer.FileTA.FileTASession.CSVInstrumentsExport.Schedule.StartTime = 30 * * * * * TransportLayer.FileTA.FileTASession.CSVInstrumentsExport.Schedule.StopTime = 0 * * * * * ... |
Detailed description:
Add the transport adapter configuration to the list of adapters loaded by FIXEdge at startup:
TransportLayer.TransportAdapters = TransportLayer.FileTA
Set the adapter's name and the path to the dll/so file containing the adapter:
TransportLayer.FileTA.Description = File Transport Adaptor TransportLayer.FileTA.DllName = bin/FileTADll.dll
Specify the list of sessions to be created by the transport adapter (session names are up to the user):
TransportLayer.FileTA.FileTASessions = CSVInstrumentsExport,...
For each session, the following parameters should be specified:
The location of the streams configuration file:
TransportLayer.FileTA.FileTASession.CSVInstrumentsExport.StreamsConfigurationFile = ./conf/streams.json
The start and stop time of the session in the cron format:
TransportLayer.FileTA.FileTASession.CSVInstrumentsExport.Schedule.StartTime = 30 * * * * * TransportLayer.FileTA.FileTASession.CSVInstrumentsExport.Schedule.StopTime = 0 * * * * *
Note: The adapter will flush all the data and generate the manifest file only when the StopTime expression triggers.
CRON has the following syntax:
┌──────────── second (0 - 59) │ ┌───────────── minute (0 - 59) | │ ┌───────────── hour (0 - 23) | │ │ ┌───────────── day of month (1 - 31) | │ │ │ ┌───────────── month (1 - 12) | │ │ │ │ ┌───────────── day of week (1 - 7: Sunday to Saturday) | │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ * * * * * * |
Support of the CRON expressions is limited by quartz. In particular, specifying both a day-of-week and day-of-month values is prohibited (you'll need to use the '*' character in one of these fields).
The only note here is that the '*' character is used instead of the '?' character in comparison with quartz.
OutMapper session: Used to convert received FIX messages to CSV. The StreamsConfigurationFile should be configured for such sessions:
TransportLayer.FileTA.FileTASession.CSVInstrumentsExport_M-W-F.StreamsConfigurationFile = /etc/fixedge/streams.json TransportLayer.FileTA.FileTASession.CSVInstrumentsExport_M-W-F.Type = OutMapper TransportLayer.FileTA.FileTASession.CSVInstrumentsExport_M-W-F.Schedule.StartTime = 0 02 21 * * 0,2,4 TransportLayer.FileTA.FileTASession.CSVInstrumentsExport_M-W-F.Schedule.StopTime = 0 0 21 * * 1,3,5 |
InFromSessionLog session: Used to replay logs. Example:
TransportLayer.FileTA.FileTASession.CSVInstrumentsReplay.Type = InFromSessionLog TransportLayer.FileTA.FileTASession.CSVInstrumentsReplay.SessionLogDirectory = /home/fixedge/ssn_log/archive |
Streams Configuration
The File Transport Adapter operates with entities named "streams". Each stream can handle messages coming from the FIXEdge business layer, filter, process, and map the received messages, and output the results to the configured "output". Optionally, the messages that failed the validation rules could be sent to the configured "error output".
Below is the layout of the streams configuration file:
{ "streams": [ { "name": "stream name", "_comment": "Comments could be added by adding an underscore at the start of a property name", "output": { ... }, "errorOutput": { ... }, "exports": { ... }, "index": [ ... ], "process": { ... }, "split": 123, "map": { ... } }, ... ], "manifestFile": { "workdir": "./", "fileName" : "Manifest.csv" } } |
Streams Configuration Details
streams section [required]: This section contains an array of stream configurations. Each stream contains the following sections:
name parameter [required]: The stream name. Can be used to reference exports exposed from a stream, in another stream.
output section [required]: This section contains the output configuration. Example:
"output": { "type": "csv", "workdir": "./", "fileMask": "{date}_{SomeExportedVariable}_Data_{n}.csv", "sizeLimit": 500000, "entryLimit": 500000, "customHeaders": [ "{fileName},{entryCount}" ], "ignoreFields": ["FieldName"] } |
The type property describes the output type and specifies the list of available properties. Currently, only the csv type is supported.
errorOutput section [optional] - if specified, contains the configuration of the error output. This output is used for all mapped messages that violate the validation rules of the mapping. Configuration is the same as for the output section.
Properties available for type == csv
workdir [required] - the directory where output files should be created.
fileMask [required] - the file name template to use. This value can contain the following placeholders:
{date} - current date in YYYYMMDD format.
{n} - file number. 1 by default. If limits are configured, the value will increase each time the current file reaches its limit, and a new file will be created with {n} being replaced with the previous value + 1.
{SomeExportedVariable} - a variable name from the list of exported variables. This value will be calculated for every processed mapped message.
sizeLimit [optional] - resulting file size limit in bytes. If specified, requires the placeholder {n} to be present in the fileMask property. For details, please see the placeholder's description.
entryLimit [optional] - maximum number of mapped messages allowed in the resulting file. If specified, requires the placeholder {n} to be present in the fileMask property. For details, please see the placeholder's description.
customHeaders [optional] - a list of strings that will be added at the beginning of each resulting file. This value can contain the following placeholders:
{fileName} - the name of the generated file.
{entryCount} - the number of mapped messages in this file.
{fileSize} - current file size in bytes.
ignoreFields [optional] - a list of fields present in a mapped message that should not be processed by the output. This is useful if some field should be exported by the stream but should not be written to a resulting file.
Manifest File
manifestFile section [optional]: This section contains the manifest file configuration. The manifest file is generated when the StopTime schedule expression triggers and contains data about all data files generated by this File Transport Adapter session in CSV format. Example:
Date,File Name,Number of Records 20240219,20240219_data_123.csv,6 20240219,20240219_data_456.csv,6 |
The section, if present, should contain the following parameters:
workdir - the directory where the manifest file should be created.
fileName - the name of the manifest file. In the file name, the {date} placeholder could be used to insert the date of the file generation in the YYYYMMDD format.
Example:
"manifestFile": { "workdir": "./", "fileName" : "{date}_manifest.csv" } |
This configuration may create a file with the following name: 20240219_manifest.csv
Mapping Details
Below is a detailed description of the elements you can use within the "map" section of your stream configuration:
Simple Tag Mapping
To create a separate record for each group entry instead of one per message, set leading tag of repeating group to "split" tag. Only top level repeating groups supported, top level fields will be passed to each record. E.g. "split": 146 - new record will be added to CSV for each entry of group 146. To add tag to mapping, add description to top level "map" element of a stream.
To map a FIX tag to a CSV field and mark it as optional, use the following syntax:
"<csvFieldName>": <TagNum> |
To specify a tag as required, use the following syntax:
"<csvFieldName>": [{"source": <TagNum>}, {"required": true}] |
To map a tag conditionally, based on the value of another tag, use the "if" condition:
"<csvFieldName>": [{"source": <TagNum>}, {"required": true, "if": <ConditionTagNum>, "==": "<Value>"}] |
This will include the field in the output only if the ConditionTagNum equals the specified <Value>.
To check if a tag's value is present in a list of allowed values, use the "in" condition. This ensures that only the specified values are considered valid for the tag:
"<csvFieldName>": [{"source": <TagNum>}, {"required": true, "if": <ConditionTagNum>, "in": [<List of values>]}] |
To map a tag only when a specific condition is met, you can use the following syntax. This will map the tag only if the ConditionTagNum equals the provided <Value>:
"<csvFieldName>": [ { "source": <TagNum> }, { "if": <ConditionTagNum>, "==": "<Value>" } ] |
To specify a list of valid values for a tag, use the following. This ensures that the tag's value must be present in the provided list.
"<csvFieldName>": [{ "source": <TagNum> }, {"in": [<list of values>]}] |
To create a field that automatically increments its value for each record, use the "autoincrement" keyword:
"<csvFieldName>": [ { "source": "autoincrement" } ] |
To concatenate the value of the current tag with another tag, use the "concat" option. Replace <ConcatTagNum> with the tag number to concatenate with and <Delimiter> with the string to use as a separator:
{ "concat": <ConcatTagNum>, "delimeter": "<Delimiter>" } |
To extract and use a specific portion of a tag's value based on a regular expression, use the "regex" option. Replace <expression> with your regular expression. The matching portion of the tag's value will be used:
{ "regex": "<expression>" } |
To convert the date format of a tag's value, use the "convertDateFormat" and "toDateFormat" options. Replace <Format1> with the current format of the date and <Format2> with the desired format:
{"convertDateFormat":"<Format1>", "toDateFormat":"<Format2>"} |
To convert a date and time value to an EPOCH timestamp with microsecond precision, use the following configuration. This will convert the date and time information to a numerical timestamp:
{ "asTimestamp": "%Y%m%d%H:%M:%S.%i", "precision": "us" } |
Repeating Group Mapping
To represent FIX repeating groups in your CSV output, you can use the "split" and "leadingTag" options:
<csvGroupFieldName": { "leadingTag": <TagNum>, "convert": "json", "simplifyJsonBy": "<csvFieldName>" "fieldsJoinBy": " ", "joinBy": ";", "flattern": true, "map": { <list of fields of repeating group to be mapped> } } |
This configuration will create a new CSV record for each entry within the specified repeating group.
leadingTag: This parameter indicates the tag number that marks the beginning of a repeating group within a FIX message.
convert: This parameter allows you to specify how you want to represent the repeating group data in your CSV.
"json": This option will output the data for each repeating group entry as a JSON string.
"simplifyJsonBy": If you have repeating groups that result in one-dimensional or two-dimensional JSON structures (arrays or key-value pairs), you can use this option to simplify the output. It converts one-value tables to JSON arrays and two-value tables to key-value pairs. The "<csvFieldName>" you provide acts as the key for the key-value pairs.
"fieldsJoinBy": Sometimes you want to combine all the fields within a repeating group into a single CSV record. This option concatenates all the fields of each repeating group entry into a single string, separated by the delimiter you specify. This operation is applied before "joinBy" if both are specified.
"joinBy": Similar to "fieldsJoinBy," this option also creates a single record for the entire repeating group. However, instead of concatenating all fields for each repeating group entry, it merges the values of each field across all entries using the specified delimiter. This results in a single set of fields representing the entire group.
"flattern": This option is used when your repeating group represents a set of unique attributes. When "flatten" is set to true, the adapter will create a single set of fields for the entire repeating group and populate these fields with corresponding values if present within the group.
"map": This section is where you define how to map the individual fields within the repeating group to CSV columns. You would include the same mapping options as you would for simple tags within this "map" section.
Configuring FIX to CSV Mapping
You define the FIX to CSV mapping in the file specified by the StreamsConfigurationFile property of your FileTA session. This file uses JSON format for the mapping configuration. Each message type (or a group of message types that share the same mapping logic) should have a separate stream defined within the "streams" array.
Deduplication Logic Configuration
To ensure that your CSV output only contains unique records based on specific fields, you can use the "index" array in your stream configuration. This array lists the fields that should be considered when determining the uniqueness of a record.
"index": [ "SecurityID" ] |
In this example, only one entry for a particular SecurityID will be added to the output file.
Configuring Messages to be Processed
The "process" tag allows you to define criteria to filter which FIX messages are processed by a specific stream. You can specify conditions based on FIX tags and their values. For example:
"process": { "35": ["y"] } |
In this case, only messages where Tag 35 (MsgType) has a value of "y" will be processed by this stream.
Available Commands
The FileTA provides several commands for managing sessions:
clear
This command removes all data from the specified FileTA session memory.
Example:
8=FIX.4.4|9=173|35=C|49=fake|56=fake|34=1|52=99990909-17:17:17|164=1|94=0|42=20240225-23:36:23|147=clear|33=4|58= |58= |58= |58=|10=011| |
flush
This command creates CSV files based on the data stored in the FileTA session memory. After creating the files, it clears the session memory.
Example:
8=FIX.4.4|9=173|35=C|49=fake|56=fake|34=1|52=99990909-17:17:17|164=1|94=0|42=20240225-23:36:23|147=flush|33=4|58= |58= |58= |58=|10=011| |
replay
This command replays session logs from a specified storage. This command requires the following parameters:
StorageName: The name of the session storage (without the extension).
FIXParser: The FIX version used for parsing messages. For custom versions, provide the name of the parser from the AdditionalParsrsList.
BatchSize: The maximum number of messages to be held in memory during replay.
Example:
8=FIX.4.4|9=173|35=C|49=fake|56=fake|34=1|52=99990909-17:17:17|164=1|94=0|42=20240225-23:36:23|147=replay|33=4|58=StorageName=TARGET-SENDER 2402201409256881|58=FIXParser=Spectrum@FIXT11:FIX50SP2SPECTRUM|58=BatchSize=10000|58= |10=011| |
setBusinessDate
This command sets the business date for the FileTA adapter.
Example:
8=FIXT.1.1|9=164|35=C|49=FCS|56=FE|34=65|52=20240729-09:39:35.472|164=CSVInstrumentsExport|94=0|42=20240225-23:36:23|147=setBusinessDate|33=4|58=BusinessDate=20240731|58= |58= |58= |10=051| |
Replay Mechanism
This section provides a step-by-step walkthrough of the replay mechanism, utilizing details from other sections of the document to offer a comprehensive understanding.
Prerequisites:
File Transport Adapter (FileTA) Configuration: Ensure you have correctly configured two FileTA sessions in your FIXEdge.properties file:
InFromSessionLog Session: This session handles the replay of your session logs.
Set the Type parameter to InFromSessionLog.
Define the SessionLogDirectory parameter, pointing to the location where you'll store the session log files for replay.
OutMapper Session: This session is responsible for receiving the replayed messages, applying the defined mapping logic, and ultimately generating the CSV output file.
Set the Type parameter to OutMapper.
Specify the StreamsConfigurationFile parameter with the path to your stream.json file. This file defines the mapping rules from FIX messages to CSV fields, output directory (workdir), and other configurations as detailed in the "Streams Configuration" and "Mapping Details" sections of this document.
Business Rule: A predefined business rule should be in place to automatically route the messages processed by the InFromSessionLog session to the OutMapper session.
Replay Process:
Move Session Logs to Archive:
Gather Log: Collect the five required session log files (with extensions .conf, .in, .ndx.in, .ndx.out, and .out) that you want to replay.
[user@host ~]$ ls -l <your_session_log_directory> total 16392 -rw-r--r-- 1 user group 5869 Mar 13 11:54 <session_name>.conf -rw-r--r-- 1 user group 14991360 Mar 13 11:54 <session_name>.in -rw-r--r-- 1 user group 1302528 Mar 13 11:54 <session_name>.ndx.in -rw-r--r-- 1 user group 90112 Mar 13 11:54 <session_name>.ndx.out -rw-r--r-- 1 user group 999424 Mar 13 11:54 <session_name>.out
Transfer to SessionLogDirectory: Move these files to the directory you defined as the SessionLogDirectory in the InFromSessionLog session configuration.
Initiate Replay:
Connect: Establish a connection to your FIXEdge instance, enabling communication with the FileTA.
Set Business Date: Send the following setBusinessDate command to the InFromSessionLog session:
8=FIXT.1.1|9=164|35=C|49=FCS|56=FE|34=6|52=20240729-09:39:35.472|164=CSVReplayExport|94=0|42=20240225-23:36: 23|147=setBusinessDate|33=4|58=BusinessDate=YYYYMMDD|58= |58= |58=|10=051|
Replace YYYYMMDD with the desired business date in the format YearMonthDay (e.g., 20240801). This date will be incorporated into the generated CSV file names.
Execute Replay Command: Send the replay command to the InFromSessionLog session to start processing the logs. This command requires:
StorageName: Name of the log file (without extension).
FIXParser: Parser type based on the FIX version. Ensure the correct parser (defined in the configuration’s FIXParser or AdditionalParsersList) is specified for accurate log file interpretation, e.g., FIXParser=FIX50SP2
BatchSize: Maximum number of messages held in memory for replay.
8=FIX.4.4|9=173|35=C|49=Sender|56=Receiver|34=1|52=DateTime|164=CSVInstrumentsReplay|147=replay|58=StorageName=LOG_NAME| 58=FIXParser=FIX50SP2|58=BatchSize=10000|10=011|
Automated Routing and Mapping:
The preconfigured business rule will automatically detect the replayed messages and route them to the OutMapper session.
The OutMapper session will process these messages according to your defined mapping logic in the stream.json file. This includes extracting data from FIX tags, performing any required transformations or calculations, and structuring the output as CSV rows and columns.
CSV File Generation:
Flush Execution: The OutMapper session will generate the CSV file based on either its configured schedule (refer to the Schedule parameters in the OutMapper session configuration) or upon manual execution of the flush command.
Output Location: The CSV file will be created in the directory specified by the workdir element in your stream.json file.
csv_processor Utility for FIXEdge File TA
This section describes the csv_processor utility, a command-line tool for processing CSV files generated by the FIXEdge File Transport Adapter. It efficiently sorts, merges, and filters market data records, addressing specific merge requirements. The utility outputs processed data to a new CSV file or a set of files. This documentation outlines its parameters and data handling logic.
Utility Parameters
The csv_processor utility accepts the following parameters:
Parameter | Description | Default Value | Required | Data Type |
---|---|---|---|---|
help,h | Show help | No | String | |
input,i | Input file path | Yes | String | |
output-dir,o | Output directory | Yes | String | |
sort-direction,d | Sort direction (asc, desc) | asc | No | String |
chunk_size,c | Maximum rows per sorting chunk | 0 (unlimited) | No | Size_t |
rows-limit,r | Maximum rows per output file | 0 (unlimited) | No | Size_t |
manifest-file,m | Manifest file to patch summary | "" (empty) | No | String |
fields-properties,p | Fields configuration file name | fields.properties | No | String |
Fields Configuration File (fields.properties)
This file, typically named fields.properties, defines mappings for CSV columns used for merging and sorting.
Property | Description | Required | Example |
---|---|---|---|
EpocTimeField | Column name for EPOC time | Yes | EPOC Time |
SecurityIDField | Column name for Security ID | Yes | SecurityID |
IndexField | Column name for Index; used for deduplication | Yes | Index |
UpdateTypeField | Column name for MDUpdateAction | Yes | MDUpdateAction |
MergeFields | Comma-separated list of columns used for merging; each column must have - 0, - 1, etc. to identify specific sub-fields. | Yes | MDEntryPx - 0, MDEntryPx - 1, ... , MDEntrySize - 0, MDEntrySize - 1, ... |
Example fields.properties:
EpocTimeField=EPOC Time SecurityIDField=SecurityID IndexField=Index UpdateTypeField=MDUpdateAction MergeFields=MDEntryPx - 0,MDEntryPx - 1,MDEntryPx - 2,MDEntryPx - 4,MDEntryPx - 5,MDEntryPx - 7,MDEntryPx - 8,MDEntryPx - 9,MDEntryPx - B,MDEntryPx - b,MDEntryPx - c,MDEntrySize - 0,MDEntrySize - 1,MDEntrySize - 2,MDEntrySize - B |
Merge Requirements
The utility merges market data records based on the specified columns, ensuring that:
Records with the same SecurityID and EPOC time are merged into a single row based on the arrival order from the source.
Records without an EPOC timestamp are not merged and are placed at the end of the output file.
MDUpdateAction values are handled according to defined cases:
If all records have MDUpdateAction = 0, values in the first record override values in subsequent ones. MDUpdateAction is set to 0 in the final record.
If all records have MDUpdateAction = 1, values in the first record override values in subsequent ones, unless explicitly overwritten by subsequent records. MDUpdateAction is set to 1 in the final record.
If some records have MDUpdateAction = 0 and some 1, records are merged based on the input order, with update values from the first record overriding subsequent ones. MDUpdateAction is set to 1 in the final record.
If MDUpdateAction = 2, the row prior to the row with MDUpdateAction = 2 is removed if both entries have matching SecurityID, MDEntry Type, and MDUpdateAction being blank, 0, or 1.
MDUpdateAction is missing for some records but not others. Merges records with matching keys, preserving values from the first record, and overwrites the MDUpdateAction with 1 in the final record.
MDUpdateAction is missing for all records. Merges records with matching keys, preserving values from the first record, and sets MDUpdateAction=0 in the final record.
Usage Example
Important Note: The specific names of the input and output files, and other parameters, are important to adjust based on the configuration of your application.
csv_processor -i unsorted.csv -o out_dir/ -d desc -c 5000000 -r 20000000 -m manifest.csv -p fields.properties |
This command processes unsorted.csv, sorts in descending order, limits chunks to 5,000,000 rows, and output file to 20,000,000 rows, uses manifest.csv for patching, and fields.properties for field definitions. The output will be in the out_dir directory.