AMQP Worker
This code implements an AMQP worker which listens to an input queue, validates the metadata of incoming messages, decrypts them and passes the decrypted message to a message handler function. The handler function can return a list of messages to be produced on several output queues.
Usage
To use the AMQP worker, call the amqpWorker
function, passing the following parameters:
-
ch
- an instance ofamqplib.Channel
. -
inputQueue
- the name of the input queue to listen to. -
ap$self$
- a string representation of the JSON Web Key (JWK) for the worker's own secret key. -
sourceJwkPublicKeys
- an array of string representations of the JWKs of the public keys of the message sources that are allowed to communicate with this worker. -
incomingSchemaIds
- an array of schema identifiers, one for each schema that should be used to validate incoming messages. -
propertiesValidator
- a function that validates message properties and context. It should return true if the message is valid and should be accepted, or false if it should be rejected. -
messageHandler
- a function that receives the validated, decrypted message, message properties, and context. It should return an array of parameters to be passed to the amqpProducerCreator function, which will produce the messages to be sent to output queues. -
errorHandler
(optional) - a function to handle any errors that occur during message handling.
Other functions
amqpProducerCreator
This function takes three arguments, an AMQP channel, an apuKid string, and an apuCK CryptoKey. It returns an asynchronous function that takes an object with the following properties:
-
queue
: a string representing the name of the queue where the message should be sent. -
apv
: a string representing a JSON Web Key (JWK) for the recipient's public key. -
schemaId
: a string representing the schema ID of the message. -
payload
: an ArrayBuffer representing the message payload. -
correlationId
: a string representing the correlation ID of the message. -
replyTo
: an optional string representing the name of the queue where the response should be sent. -
headers
: an optional object representing additional headers for the message.
The function encrypts the message payload using the recipient's public key and signs the encrypted message using the sender's secret key. It then sends the encrypted message to the specified queue using the provided AMQP channel.
This is an internal function, but it may be useful to produce messages for workers created by amqpWorker
that are not sent as a response to a previous message (for example, it can be used to provide an initial message to a worker).
deriveKeyId
The deriveKeyId
function takes in a CryptoKey
object corresponding to a public key and returns a string representing the key ID. This function is intended to be used in conjunction with the amqpProducerCreator
function.
The result comes from computing a SHA-384 hash of the extractable public key (in its SubjectPublicKeyInfo
representatioin) and encoding the digst as base64url.
parseJwk
This utility function takes a JSON Web Key (JWK) string as input, and returns a Promise
that resolves to an object containing CKP
(a CryptoKeyPair
) and kId
(a string).
The CryptoKeyPair
is generated by importing the JWK as an elliptic curve Diffie-Hellman (ECDH) key using the importKey
method of the Web Crypto API. The private and public keys of the CryptoKeyPair
are then returned in the CKP
field of the output object.
The kId
string is generated by computing a SHA-384 hash of the JWK's extractable public key using the deriveKeyId
function.
Security
This code uses cryptographic techniques to ensure message integrity and confidentiality. The worker's own secret key is used to derive shared secret keys with the public keys of the message sources. These shared secret keys are used to encrypt and decrypt messages. Message authentication codes (MACs) are used to ensure message integrity. These steps may be helpful in scenarios where the AMQP broker or the network transport are not entirely trusted.
License
This code is released under the ISC license. Please see the LICENSE
file for more information.