Ce plugin propose une série d'instructions natives. Elles sont constamment disponibles car chargées automatiquement.
npm install @ezs/core
Plusieurs instructions permettent de créer des sous-flux (sub pipeline), à partir d'un fichier d’instructions ou d'instructions imbriquées. Si elles s'utilisent toutes de la même manière (avec les mêmes paramètres) certaines peuvent apparaître comme similaires mais leur fonctionnement est différent :
-
[delegate]
: 1 sous-flux pour tous les éléments -
[swing]
: 1 sous-flux pour tous les éléments filtrés selon une condition -
[spawn]
: 1 sous-flux par élément -
[loop]
: 1 sous-flux par élément -
[expand]
: 1 sous-flux pour N éléments (N = size), seul le champ sélectionné est envoyé dans le pipeline -
[combine]
: 1 sous-flux pour tous les éléments, seul le champ sélectionné est comparé avec le résultat du sous-flux -
[singleton]
: 1 sous-flux pour le premier élément
- assign
- combine
- concat
- debug
- dedupe
- delegate
- delegate
- dispatch
- dump
- env
- exchange
- expand
- extract
- fork
- group
- identify
- ignore
- keep
- loop
- map
- metrics
- overturn
- pack
- parallel
- pop
- remove
- replace
- shift
- shuffle
- singleton
- spawn
- swing
- throttle
- time
- tracer
- transit
- truncate
- ungroup
- unpack
- validate
- See: exchange
Affecte une valeur à un champ de l'objet courant. Si le champ existe déjà, sa valeur est écrasée, sinon il est créé
Entrée:
[{
"nom": "un",
"valeur": 1
},
{
"nom": "deux",
"valeur": 2
},
{
"nom": "trois",
"valeur": 3
},
{
"nom": "quatre",
"valeur": 4
}]
Script:
[assign]
path = valeur
value = get("valeur").multiply(2)
Output:
[{
"nom": "un",
"valeur": 2
},
{
"nom": "deux",
"valeur": 4
},
{
"nom": "trois",
"valeur": 6
},
{
"nom": "quatre",
"valeur": 8
}]
Le path
peut être le nom simple d'un champ présent à la racine de l'élément
traité, ou un chemin en notation
pointée, en utilisant
une syntaxe proche de celle de la fonction
get
de Lodash.
Returns Object
Takes an Object
and substitute a field with the corresponding value found in a external pipeline
the internal pipeline must produce a stream of special object (id, value)
[
{ year: 2000, dept: 54 },
{ year: 2001, dept: 55 },
{ year: 2003, dept: 54 },
]
Script:
[use]
plugin = analytics
[combine]
path = dept
file = ./departement.ini
Output:
[
{ year: 2000, dept: { id: 54, value: 'Meurthe et moselle' } },
{ year: 2001, dept: { id: 55, value: 'Meuse' } },
{ year: 2003, dept: { id: 54, value: 'Meurthe et moselle' } },
]
-
path
String? the path to substitute -
default
String? value if no substitution (otherwise value stay unchanged) -
primer
String Data to send to the external pipeline (optional, defaultn/a
) -
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors -
cacheName
String? Enable cache, with dedicated name
Returns Object
Take all String
, concat them and throw just one.
[
"a",
"b",
"c"
]
Script:
[concat]
beginWith = <
joinWith = |
endWith = >
Output:
[
"<a|b|c>"
]
-
beginWith
String? Add value at the begin -
joinWith
String? use value to join 2 chunk -
endWith
String? Add value at the end
Returns String
Take Object
, print it (with its number), and throw the same object.
with ezs debug enabled: every object will be stringify for printed and all others ezs debug traces will be print
with ezs debug disabled: every objects will be inspected (indented and colorized) and print on stderr (error level) or stdout (log level)
if ezs parameter is set, every object are not log (it's a global action)
-
level
String console level : log or error or silent (optional, defaulterror
) -
text
String text before the dump (optional, defaultvalueOf
) -
path
String? path of field to print -
ezs
Boolean? enable or disable ezs global debug traces
Returns Object
Take Object
, and check that the object identifier has not already been used previously
-
data
-
feed
-
path
String path containing the object Identifier (optional, defaulturi
) -
ignore
Boolean Just ignore duplicate object (optional, defaultfalse
)
Returns Object
Break the stream if the control file cannot be checked
-
fusible
String? file to check
Returns Object
Delegate processing to an external pipeline.
Note: works like spawn, but each chunk share the same external pipeline.
-
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors
Returns Object
Dispatch processing to an external pipeline on one or more servers.
-
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command
Returns Object
Take all Object
s and generate a JSON array
[
{ "a": 1 },
{ "a": 2 },
{ "a": 3 },
{ "a": 4 },
{ "a": 5 }
]
Script:
[dump]
indent = true
Output:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}
]
-
indent
boolean indent JSON (optional, defaultfalse
)
Returns String
Crée une variable d'environnement globale à tout le script.
On l'utilise en général au début du script (après [use]
).
Pour utiliser la variable, il faut employer la fonction env()
.
Entrée:
[{
"nom": "un",
"valeur": 1
},
{
"nom": "deux",
"valeur": 2
}]
Script:
[use]
plugin = basics
[env]
path = nom
value = NOM GÉNÉRIQUE
[JSONParse]
[assign]
path = nom
value = env("nom")
[dump]
indent = true
Sortie:
[{
"nom": "NOM GÉNÉRIQUE",
"valeur": 1
},
{
"nom": "NOM GÉNÉRIQUE",
"valeur": 2
}]
Returns Object
Remplace tout un objet par un autre (au sens JSON).
Entrée:
[{
"nom": "un",
"valeur": 1
},
{
"nom": "deux",
"valeur": 2
}]
Script:
```ini
[use] plugin = basics
[JSONParse]
[exchange] value = get("nom")
[dump]
Sortie:
["un","deux"]
Ici, 'objet {"nom":"un","valeur":1}
a été remplacé par l'« objet » (au sens
JSON, une chaîne de caractères, tout autant qu'un nombre, constitue un objet)
"un"
.
Note: assign
ne permet pas de remplacer tout l'objet, mais seulement une de
ses propriétés.
Entrée:
[{
"a": "abcdefg", "b": "1234567", "c": "XXXXXXX"
},
{
"a": "abcdefg", "b": "1234567", "c": "XXXXXXX"
}]
Script:
[exchange]
value = omit('c')
Output:
[{
"a": "abcdefg",
"b": "1234567"
},
{
"a": "abcdefg",
"b": "1234567"
}]
Ici, on a remplacé un objet avec trois propriétés par le même objet sans la
propriété c
(voir la function
omit
de Lodash).
-
value
String? la valeur de remplacement de l'objet courant
Returns Object
Takes an Object
and substitute a field with the corresponding value found in a external pipeline
the internal pipeline receive a special object { id, value } id is the item identifier & value is the item path value
The internal pipeline can expand value with another
[
{ year: 2000, dept: 54 },
{ year: 2001, dept: 55 },
{ year: 2003, dept: 54 },
]
Script:
[use]
plugin = analytics
[expand]
path = dept
file = ./departement.ini
Output:
[
{ year: 2000, dept: { id: 54, value: 'Meurthe et moselle' } },
{ year: 2001, dept: { id: 55, value: 'Meuse' } },
{ year: 2003, dept: { id: 54, value: 'Meurthe et moselle' } },
]
-
path
String? the path to substitute -
size
Number How many chunk for sending to the external pipeline (optional, default1
) -
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors -
cacheName
String? Enable cache, with dedicated name -
token
String? add token values in the subpipeline (optional)
Returns Object
Extrait de l'objet courant les valeurs de certains champs, et renvoie directement les valeurs dans le flux de sortie.
Note:
extract
ne peut pas fournir des valeursundefined
ounull
.
Entrée:
[{
"nom": "un",
"valeur": 1,
"important": false
},
{
"nom": "deux",
"valeur": 2,
"important": true
}]
Script:
[use]
plugin = basics
[JSONParse]
[extract]
path = valeur
path = nom
[dump]
Sortie:
[[1,"un"],[2,"deux"]]
-
path
String? chemin d'un champ à extraire
Returns Object
fork the current pipeline
Note: but each chunk is sent to the same external pipeline.
-
standalone
Boolean The current pipeline will be able to end without waiting for the end of the external pipeline (optional, defaultfalse
) -
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors -
target
String choose the key to set with the forked request identifier (optional, defaultx-request-id
)
Returns Object
Take all chunk
s, and throw them grouped by length
.
See also ungroup.
[
"a",
"b",
"c",
"d",
"e",
"f",
"g",
"h"
]
Script:
[group]
length = 3
Output:
[
[ "a", "b", "c" ],
[ "d", "e", "f" ],
[ "g", "h" ]
]
-
length
Number? Size of each partition
Returns String
Take Object
, and compute & add an identifier
-
data
-
feed
-
scheme
String scheme to use (uid or sha) (optional, defaultuid
) -
path
String path containing the object Identifier (optional, defaulturi
)
Returns String
Takes all the chunks, and ignore the firtst N chunk
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[ignore]
length = 3
Output:
[{
"a": 4
},
{
"a": 5
}]
-
length
Number? Length of the feed to ignore
Returns any
Throw input Object
but keep only specific fields.
Input file:
[{
"a": "abcdefg",
"b": "1234567",
"c": "XXXXXXX"
},
{
"a": "abcdefg",
"b": "1234567",
"c": "XXXXXXX"
}]
Script:
[keep]
path = a
path = b
Output:
[{
"a": "abcdefg",
"b": "1234567"
},
{
"a": "abcdefg",
"b": "1234567"
}]
-
path
String? path of field to keep
Returns Object
Loop on external pipeline, until test will be true
Note: works like delegate, but each chunk use its own external pipeline
-
test
String? if test is true -
reverse
Boolean to reverse the test (optional, defaultfalse
) -
maxDepth
Number to limit the number of loops (optional, default100000
) -
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in an object -
command
String? the external pipeline is described in an URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors -
fusible
String? Can be set with the ezs server fusible see env('request.fusible')
Returns Object
From an array field delegate processing of each items to an external pipeline
Note: works like delegate, but each chunk use its own external pipeline
-
path
String? the path to substitute -
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in an object -
command
String? the external pipeline is described in an URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors
Returns Object
- See: ../server/knownPipeline.js
Take Object
, and throw the same object.
This statement will only be used if :
- EZS_METRICS is enabled
- ezs is running in server mode
WARNING: avoid setting bucket to "input" or "output", as these labels are used by ezs. If you do, you risk distorting the associated metrics.
-
pathName
String to identify the script (optional, defaultauto
) -
bucket
String to identify the moment of measurement (optional, defaultunknow
)
Returns Object
Takes an Object
and substitute twice a field with the corresponding value found in a external pipeline
the internal pipeline receive a special object { id, value, token } :
- id is the item identifier
- value is the item path value,
- token is an array containing stream id and an number (0 for first time, 1 for the second tme The internal pipeline can overturn value with another.
It's work like [expand] but the second call starts only when all the values of the stream have been sent once
[
{ year: 2000, dept: 'Meuse' },
{ year: 2001, dept: 'Moselle' },
{ year: 2003, dept: 'Vosges'},
]
Script #1:
[overturn]
path = dept
[overturn/assign]
path = value
value = get('value').split('').reverse().join('')
Output:
[
{ year: 2000, dept: 'Meuse' },
{ year: 2001, dept: 'Moselle' },
{ year: 2003, dept: 'Vosges' },
]
Script #2:
[overturn]
path = dept
[overturn/drop]
path = token.1
if = 0
[overturn/assign]
path = value
value = get('value').split('').reverse().join('')
Output:
[
{ year: 2000, dept: 'esueM' },
{ year: 2001, dept: 'ellesoM' },
{ year: 2003, dept: 'segsoV' },
]
-
path
String? the path to overturn -
size
Number How many chunk for sending to the external pipeline (optional, default1
) -
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command
Returns Object
Take all Object
, throw encoded String
Returns String
Takes an Object
delegate processing to X internal pipelines
-
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors
Returns Object
- See: shift
Return the last Object
and close the feed
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[shift]
Output:
[{
"a": 5
}]
Returns Object
Take Object
and remove it from the feed if test is true
Input file:
[{
a: "a"
},
{
a: 2
},
{
a: "b"
},
{
a: 4
},
{
a: "c"
}]
Script:
[remove]
test = get('a).isInteger()
reverse = true
Output:
[
{
a: 2
},
{
a: 4
}
]
Returns Object
Take Object
and replace it with a new object with some fields.
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[replace]
path = b.c
value = 'X'
Output:
[{
"b": { "c": "X" }
},
{
"b": { "c": "X" }
},
{
"b": { "c": "X" }
},
{
"b": { "c": "X" }
},
{
"b": { "c": "X" }
}]
Returns Object
Return the first Object
and close the feed
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[shift]
Output:
[{
"a": 1
}]
Returns Object
Take Object
, shuffle data of the whole object or only some fields specified by path
Input file:
[{
"a": "abcdefg",
"b": "1234567"
},
{
"a": "abcdefg",
"b": "1234567"
}]
Script:
[shuffle]
path = a
Output:
[{
"a": "cadbefg",
"b": "1234567"
},
{
"a": "dcaegbf",
"b": "1234567"
}]
-
path
String? path of field to shuffle
Returns Object
Takes only the first Object
delegate processing to a external pipeline
-
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in a object -
command
String? the external pipeline is described in a URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors
Returns Object
Delegate processing to an external pipeline, throw each chunk from the result.
Note: works like delegate, but each chunk use its own external pipeline
-
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in an object -
command
String? the external pipeline is described in an URL-like command -
logger
String? A dedicaded pipeline described in a file to trap or log errors -
cache
String? Use a specific ezs statement to run commands (advanced)
Returns Object
Delegate processing to an external pipeline under specifics conditions
Note: works like spawn, but each chunk shares the same external pipeline.
-
test
String? if test is true -
reverse
String reverse the test (optional, defaultfalse
) -
file
String? the external pipeline is described in a file -
script
String? the external pipeline is described in a string of characters -
commands
String? the external pipeline is described in an object -
command
String? the external pipeline is described in an URL-like -
logger
String? A dedicaded pipeline described in a file to trap or log errors command
Returns Object
Take Object
and return the same object
[{
{ id: 'x', value: 2 },
{ id: 't', value: 2 },
}]
Script:
[use]
plugin = analytics
[throttle]
bySecond = 2
Output:
[
{ id: 'x', value: 2 },
{ id: 't', value: 2 },
]
-
bySecond
Number Number of object by second (optional, default1
)
Returns Object
Measure the execution time of a script, on each chunk of input.
-
script
string?
Input
[1]
Program
const script = `
[transit]
`;
from([1])
.pipe(ezs('time', { script }))
Output
[{
data: 1,
time: 15 // milliseconds
}]
Returns object
Take Object
, print a character and throw the same object.
Useful to see the progress in the stream.
-
print
String character to print at each object (optional, default.
) -
last
String character to print at last call (optional, default.
) -
first
String character to print at first call (optional, default.
)
Returns Object
Take Object
and throw the same object again.
Input file:
[{
"a": 1
},
{
"a": 2
}]
Script:
[transit]
Output:
[{
"a": 1
},
{
"a": 2
}]
Returns Object
Takes all the chunks, and closes the feed when the total length is equal to the parameter.
Input file:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
},
{
"a": 4
},
{
"a": 5
}]
Script:
[truncate]
length = 3
Output:
[{
"a": 1
},
{
"a": 2
},
{
"a": 3
}]
-
length
Number? Length of the feed
Returns any
Take all chunk
s, and throw one item for every chunk.
See also group.
[
[ "a", "b", "c" ],
[ "d", "e", "f" ],
[ "g", "h" ]
]
Script:
[ungroup]
Output:
[
"a",
"b",
"c",
"d",
"e",
"f",
"g",
"h"
]
Returns Array<any>
Take String
s or Buffer
s and throw Object
builded by JSON.parse on each line.
Returns object
From an Object
, throw the same object if all rules pass
See
Input file:
[{
"a": 1,
"b": "titi"
},
{
"a": 2,
"b": "toto"
},
{
"a": false
},
]
Script:
[validate]
path = a
rule = required|number
path = a
rule = required|string
Output:
[{
"a": 1,
"b": "titi"
},
{
"a": 2,
"b": "toto"
}]
Returns Object