...
This sample will aggregate incoming messages into batches and propagate them to the RabbitMQ topic:
Code Block | ||
---|---|---|
| ||
import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy;
rule.
from("fixedge:toBatchSample").
process("unwrapFixedgePayload").
aggregate(new GroupedBodyAggregationStrategy()).constant(true).
completionSize(20).
completionTimeout(5000).
process("listToString").
to("rabbitmq://localhost/fix-in?exchangeType=direct&queue=poc-from-fixedge&autoDelete=false&username=rabbit&password=rabbitpassword");
/*
== FIXEdge.properties ==
TransportLayer.JVMOptionsFile = /etc/fixedge/camel-ta/JVM_Options.jvmopts
TransportLayer.TransportAdapters = TransportLayer.CamelTA
TransportLayer.CamelTA.DllName = /usr/lib64/fixedge/${FE_VERSION}/libUniversalTADll.so
TransportLayer.CamelTA.Description = Camel Routes Adapter
TransportLayer.CamelTA.JavaClass = com.epam.feta.impl.CamelTA
TransportLayer.CamelTA.ClientNames = toBatchSample
TransportLayer.CamelTA.Client.toBatchSample.Version = FIX44
TransportLayer.CamelTA.RoutesFile = /etc/fixedge/camel-ta/routes.aggregate.groovy
== BL_Config.xml ==
<?xml version="1.0" encoding="UTF-8"?>
<FIXEdge>
<BusinessLayer>
<DllHandlers>
<Handler Name="JSONMapping" DllName="libJSONMappingHandler.so" Description="JSONMapping Handler" />
</DllHandlers>
<Rule>
<Source Name="FE-SC"/>
<Action>
<HandlerAction Name="JSONMapping" Direction="FIX-to-JSON" IncludeCustomTags="true"/>
<Send Name="toBatchSample"/>
</Action>
</Rule>
</BusinessLayer>
</FIXEdge>
*/ |
Example 2
This sample expects a JSON-encoded FIX message, as defined in the Encoding FIX using JSON FIX Trading Community standard.
...
json2csv - JSON will be transformed to a CSV file in the '/var/lib/fixedge/work' folder, with a header row as defined below.
dailyMoveFilesToUpload - every day at 12:00AM move files from the '/var/lib/fixedge/work' to the '/var/lib/fixedge/upload'.
uploadToFtpAndMoveToDoneRoute - detect files in the '/var/lib/fixedge/upload' folder, and upload them to the FTP server.
Code Block | ||
---|---|---|
| ||
import org.apache.camel.dataformat.csv.CsvDataFormat;
import com.epam.feta.camel.processor.JsonFlattener;
CsvDataFormat customCsv = new CsvDataFormat()
String[] csvHeader = ["MsgType", "TransactTime", "ClOrdID", "OrigClOrdID", "OrdType", "Side", "Symbol", "OrderQty", "Price", "ExecID", "ExecType", "OrdStatus", "LastQty", "LastPx", "CumQty", "AvgPx"] as String[]
String headerLine = String.join(",", csvHeader) + "\n"
customCsv.setHeader(csvHeader)
customCsv.setSkipHeaderRecord(true)
rule.
from("fixedge:toCsvSample").
routeId("json2csv").
unmarshal().json().
process(exchange -> {
Map headerData = exchange.in.body.get("Header")
Map bodyData = exchange.in.body.get("Body")
Map flattenedData = JsonFlattener.flatten(headerData) + JsonFlattener.flatten(bodyData)
exchange.setProperty("flattenedData", flattenedData)
exchange.in.body = headerLine
}).
to("file:/var/lib/fixedge/work?fileName=from-camel.csv&fileExist=Ignore").
process(exchange -> {
exchange.in.body = exchange.getProperty("flattenedData")
}).
marshal(customCsv).
to("file:/var/lib/fixedge/work?fileName=from-camel.csv&fileExist=Append");
rule.
// trigger daily at 12:00AM. For debugging you can use scheduler.cron=0+*/5+*+*+*+? to trigger every 5 minutes
from("file:/var/lib/fixedge/work?delete=true&maxMessagesPerPoll=100&scheduler=quartz&scheduler.cron=0+0+12+*+*+?").
routeId("dailyMoveFilesToUpload").
to("file:/var/lib/fixedge/upload");
rule.
from("file:/var/lib/fixedge/upload").
routeId("uploadToFtpAndMoveToDoneRoute").
// get credentials at https://dlptest.com/ftp-test/
to("ftp://ftp.dlptest.com?binary=true&passiveMode=true&username=xxxx&password=yyyy").
to("file:/var/lib/fixedge/done");
/*
== FIXEdge.properties ==
TransportLayer.JVMOptionsFile = /etc/fixedge/camel-ta/JVM_Options.jvmopts
TransportLayer.TransportAdapters = TransportLayer.CamelTA
TransportLayer.CamelTA.DllName = /usr/lib64/fixedge/${FE_VERSION}/libUniversalTADll.so
TransportLayer.CamelTA.Description = Camel Routes Adapter
TransportLayer.CamelTA.JavaClass = com.epam.feta.impl.CamelTA
TransportLayer.CamelTA.ClientNames = toCsvSample
TransportLayer.CamelTA.Client.toCsvSample.Version = FIX44
TransportLayer.CamelTA.Client.toCsvSample.SmartXMLProcessing = true
TransportLayer.CamelTA.RoutesFile = /etc/fixedge/camel-ta/routes.csv.groovy
== BL_Config.xml ==
<?xml version="1.0" encoding="UTF-8"?>
<FIXEdge>
<BusinessLayer>
<DllHandlers>
<Handler Name="JSONMapping" DllName="libJSONMappingHandler.so" Description="JSONMapping Handler" />
</DllHandlers>
<Rule>
<Source Name="FE-SC"/>
<Action>
<HandlerAction Name="JSONMapping" Direction="FIX-to-JSON" IncludeCustomTags="true"/>
<Send Name="toCsvSample"/>
</Action>
</Rule>
</BusinessLayer>
</FIXEdge>
*/ |
Built-in Processors
To support some typical scenarios in FIXEdge, these custom processors were implemented:
...