Frakture Worker Bots
An overview of FraktureBots and their inner workings and standard implementations.
There are several prefixes/postfixes that should be kept in mind when building out standard Frakture (and bot specific) methods:
-
get*
implies getting something from another system (usually in the form of a file or stream) - a CRM, FTP, etc. Examples of this aregetPeopleFile
,getTransactionFile
, etc. The Postfix should indicate the resulting object (stream or file). -
list*
is a version of get that returns an array of objects, without writing them to a file. Examples of this arelistPeople
,listMessages
,listTransactions
, etc. -
load*
implies loading a table in the frakture warehouse or mongo database (almost always the warehouse) using data from an external system - it usually makes an implicit call to one of theget*
orlist*
functions. (This does not include the warehouse methodsloadTable
andloadTableFromQuery
). Examples areloadPeople
,loadMessages
, etc. -
load*File
is a version of theload
that expects theget*
method to have been called already, and usually has an option forfilename
. These methods are used internally toload*
methods, and should almost never be called explicitly. -
build*
methods are used to load tables within Frakture based on other tables that are already in Frakture (this makes them distinct fromload
methods).
- Non-global tables should always have the bot prefix (defaults to the bot_id, but can be overridden), and the object name should always be singular (ie bot_id_person).
- Get the bot prefix using
this.getTablePrefix()
. - Access the global table prefix using
this.getGlobalTablePrefix(options,(e,global_table_prefix) => {...})
There are a few standard conventions to follow in option and output structures.
Most DBBot methods working with a specific table will have an option table
, while many will have a sql
option if they are interacting with a query.
If a Bot method results in a table being loaded, the result of the method should include a field table
and the number of rows loaded as records
. These are already included in the result of loadTable
, loadTableFromQuery
- so if your bot method uses these methods, it's probably that you use that result as the final result.
Bot methods that work on a file should have an option field filename
which is the path to the file being used or modified.
Bot methods that produce a file should have a result field filename
which is the final file. Most FileManipulatorBot
bot methods follow this paradigm.
The callback interface of Bot methods is:
(e,r,m) => { ... }
Where e
is the error (if one occurred), r
is the result (if the method completed successfully), and m
is the "modification" to send to the job framework.
Most methods will only use the (e,r) syntax - those that have a result after a single run should stick to this method.
If you want to save state to the job framework between runs and signal the framework to call the method again later (if, for instance, the bot will start a job and then wait for it), specify m as follows:
{
status: 'pending', // signals framework to try running this later
start_after_timestamp, // when the job should run, typically written as js.getRelativeDate("+5m") or similar
options: {
// modifications to the job options that will be passed to the method on the next run
}
}
Bot methods that take an object and callback should always have the signature:
function(options,callback) {
}
Callbacks passed in should always use e
to refer to the first (error) parameter, and m
to refer to the third (modification) parameter if present.
The result parameter should be representative of what is returned, with the following common parameter names as examples:
t
should be used to represent the result of a function that loads a table and has a table
result field:
dbbot.loadTable({...}, (e,t) => ...)
f
should be used to represent the result of a function that writes a file and has a filename
result field:
filebot.objectSreamToFile({...}, (e,f) => ...)
s
should be used to represent the result of a function that creates a stream
result field.
DBBot
is the overarching implementation for interacting with a warehouse.
A Bot can get access to it's warehouse bot using the following call:
this.getWarehouseBot({}, (e,dbbot) => {
if(e) return callback(e);
...
dbbot.loadQuery(...);
})
The most commonly used methods are: loadTable: Used to load a file into a table (a temporary table if not specified). It has the following interface:
dbbot.loadTable({
filename: `...`, // csv, txt, xls supported. a required field
table: `table_name`, // not required
upsert: (true|false),
ignore_dupes: (true|false)
}, (e,t) => {
...
});
loadTableFromQuery: Used to load the result of a sql query into a table (a temporary table if not specified):
dbbot.loadTableFromQuery({
sql: `...`, // required
table: `table_name`, // not required
upsert: (true|false),
ignore_dupes: (true|false)
}, (e,t) => {
...
});
Bot loadTable
and loadTableFromQuery
have the same result structure:
{
table, // name of the table loaded into
records // the number of records that were loaded into the table
}
runQuery: Used to run a query - the results are provided as the result in the callback. Specifying target_format
will cause the results to be written to a file instead
dbbot.runQuery({
sql: `...`, // required,
target_format: undefined | 'csv' | 'txt' // if specified, the results will be written to file
}, (e,r) => {
...
});
If target_format
is specified, the result will be an object with field filename
, containing the name of the results file. Otherwise, r
will be an array of results.
FileManipulatorBot
provides methods for manipulating files on the local file system. Most of it's methods take a filename
(possibly multiple) as a parameter and will result in an object that itself has a filename
containing the result of the method.
Access FileManipulatorBot in a bot method using: this.getFilebot()
(synchronously) or this.getFilebot({}, (e,fbot) => {...})
(asynchronously).
objectStreamToFile: Used to write a stream (or array) to a file.
this.getFilebot().objectStreamToFile({
stream: [...], // array or object stream (required)
target_format: 'csv' | 'txt' // not required
}, (e,f) => {
})
The result r
will have the format:
{
filename: ' ... ' // the name of the resulting file
}
transform: Used to create a new file based on the contents of another file, based on the transform
method provided to the method.
this.getFilebot().transform({
filename: '...', // required,
transform: (o,enc,cb) => {
...
// cb _must_ be called
}, // transform method, called per row. required
target_format: undefined | 'csv' | 'txt' // specifies the format of the resulting file
}, (e,f) => {
})
The result r
will have the following structure:
{
filename, // the result of the file, if there were records
records, // the number of records in the resulting file
no_data // will be `true` if there were no files to write
}
The method provided to transform has the following interface:
(o,enc,cb) => {
}
where o
is the record currently being transformed, enc
is the "encoding" (meaningless in most contexts), and cb
is used to signal the end of the transformation. cb(e,r)
takes up to two parameters:
cb()
signals to the transform that there was no result - this record will be ignored, and no row will be written to file
cb(e)
signals to transform that there was an error - the final callback will be called with the error provided as the first parameter
cb(null, {...})
will write the value provided as the second parameter to the result file
fraktureAutoInject: Recreates the behavior of async.autoInject
, making use of the Frakture checkpoint/modification infrastructure, isolating the calls so that they can call checkpoint
internally without breaking the overall structure of the job. Instead of the original autoInject
format, functions passed to this method must use the function(...) {}
syntax, instead of the array () => {}
syntax, and have two final arguments that are opts,cb
. The first is the options that have previously been set using checkpoint
or modify
(it's an empty object at first). The second is the callback used to indicate the function is done, and to specify the error, the result, or the modification.
writeStreamToTempFile: Creates a temporary file and writes the contents of a text stream to that file. Used as:
this.writeStreamToTempFile({
stream: request({...}),
target_format: 'csv|txt'
}, (e,f) => {
if(e) return callback(e);
debug(f.filename);
....
})
Many systems support "exporting" data - this usually (not always) requires starting a job, waiting for that job to complete, and then pulling the results. While a bot could continue running while it waits for the data, this would consume resources that should be available by the system. So, instead, it should use the checkpoint/modifying system instead.
The body of these functions should look something like:
function export(options,callback) {
if(options.external_job_id) return this.getJobStatus(options, (e,r) => {
if(e) return callback(e);
if(r.job_still_running) return callback(null, null, {status:'pending', start_after_timestamp: js.getRelativeDate("+1m")});
this.getJobResults(r, callback);
}
this.startJob(options,(e,r) => {
if(e) return callback(e);
const {external_job_id} = r;
callback(null, null, {status: 'pending', start_after_timestamp:js.getRelativeDate("+1m")});
});
}
Where the external_job_id
is the unique identifier of the job within the external system. Note - this can make use of fraktureAutoInject
- like so:
this.fraktureAutoInject({
job: function(opts,cb) {
this.startJob(options, (e,r) => cb(e,r))
},
results: function(job,opts,cb) {
this.getJobStatus(job, (e,status) => {
if(e) return cb(e);
if(status.is_still_running) return cb(null,null, {status:'pending' ...});
this.getJobResults(job, cb);
})
}
}, (e,r,m) => {
if(e||m) return callback(e,null,m);
callback(null, r.results);
});
Channel Bots typically have a number of submodules representing different datatypes within an object. These types will usually map to a Frakture standard data type (although some systems have addition types). A bot will represent these submodules in it's metadata - for example:
{
metadata: {
auth_fields: {
...
}
},
People: require('./People),
Transactions: require('./Transactions),
...
}
The above shows how to include the People and Transaction submodules for a Bot (assuming the files exist).
Message Submodules should inherit from MessageBase
:
const util = require('util');
const MessageBase = require('frakture-workerbots')('MessageBase');
function Bot(bot) { MessageBase.call(this,bot); }
util.inherits(Bot, MessageBase);
And implement the following methods:
listMessages: Called by loadMessages
:
Bot.prototype.listMessages = function(options, callback) {
...
};
Which should result in an object of the form:
{
messages: [...], // array of message objects
records: , // number of records
}
Each item in messages should be an object that maps to the Frakture standard (and can have additional, object structured data). It will ultimately be stored in mongo, and then mapped from mongo to the message
table in the warehouse by the loadMessages
call.
Important elements on the message objects are:
{
remote_id, // the unique identifier for the message in the source system
}
People Submodules should inherit from PersonBase
:
const util = require('util');
const PersonBase = require('frakture-workerbots')('PersonBase');
function Bot(bot) { PersonBase.call(this,bot); }
util.inherits(Bot,Personbase);
And should implement the following methods (based on support within for the system):
getPeopleFile: It is called by loadPeople
, and should have the following interface:
Bot.prototype.getPeopleFile = function(options, callback) {
...
};
Where options will be provided a sub set of the following options:
{
start, // date or relative date ("2020-01-01", or "-1y", etc)
end, // date or relative date ("2020-01-01", or "-1y", or "now")
custom_fields: // optional list of custom fields to narrow the result to
}
The method should ideally result in an object of the form:
{
filename, // the resulting filename
records, // the number of records
no_data // true if there was no filename, or if the number of records is 0
}
Where the records follow the Frakture standard:
{
remote_person_id,
last_modified,
date_created
...
}
And the records are filtered to only those that were modified (or created) within the start/date range. However, not all systems support this. If the underlying system only supports date_created instead of last_modified, use that instead.
pushPerson: For those systems that support creating or updating person records.
Once implemented, this method should not be called directly, but managed using queuedPushPerson
, which manages the records using a queue
table.
request - used to interact with web services, websites, apis, etc
async - used to manage control flow using callback based asynchronous functions