NSQ Module

Weave Communications

Edited by

Emmanuel Schmidbauer


Table of Contents

1. Admin Guide
1. Overview
2. How it works
2.1. Acknowledge Messages
3. Dependencies
3.1. Kamailio Modules
3.2. External Libraries or Applications
4. Parameters
4.1. lookupd_address(str)
4.2. lookupd_port(int)
4.3. nsqd_address(str)
4.4. nsqd_port(int)
4.5. consumer_use_nsqd(int)
4.6. consumer_event_key(str)
4.7. consumer_event_sub_key(str)
4.8. max_in_flight(int)
4.9. consumer_workers(int)
4.10. topic_channel(str)
5. Pseudo Variables
6. Event Routes

List of Examples

1.1. Set lookupd_address parameter
1.2. Set lookupd_port parameter
1.3. Set nsqd_address parameter
1.4. Set nsqd_port parameter
1.5. Set consumer_use_nsqd parameter
1.6. Set consumer_event_key parameter
1.7. Set consumer_event_sub_key parameter
1.8. Set max_in_flight parameter
1.9. Set consumer_workers parameter
1.10. Set topic_channel parameter
1.11. Example usage of $nsqE pseudo variable
1.12. Define the event routes

Chapter 1. Admin Guide

1. Overview

The module provides an NSQ consumer for Kamailio configuration file. NSQ is a real time distributed messaging platform, more details about it can be found at nsq.io.

From a high-level perspective, the module may be used for:

  • Provide a real-time integration with you Kamailio configuration file, which can be used as alternative to interact with a database, allowing to overlay additional logic in your preferred language while utilizing a message bus.

  • Rely on a distributed messaging layer, such that machines processing requests/responses/events can go up/down or share the workload, without impacting Kamailio's activity.

Supported NSQ operations are:

  • Subscribe to a Topic and Channel

IMPORTANT The `nsq.json` transformation is deprecated in favor of the json module's `json.parse` transformation. The `nsq_pua_publish()` function is deprecated in favor of pua_json module's `pua_json_publish()` function.

2. How it works

The module creates an additional NSQ manager process that does the communication with NSQ for consuming messages. This one defers the message for processing to other NSQ worker process so that it doesn't block itself, nor the SIP worker processes.

2.1. Acknowledge Messages

Consumed messages have the option of being acknowledge in two ways:

  • immediately when received

  • after processing by the worker

3. Dependencies

3.1. Kamailio Modules

The following modules must be loaded before this module:

  • none.

3.2. External Libraries or Applications

The following libraries or applications must be installed

  • libev.

  • libjson.

  • libevbuffsock.

  • libnsq.

4. Parameters

4.1. lookupd_address(str)

The nsqlookupd address.

Usage: nsq related.

Default value is 127.0.0.1

Example 1.1. Set lookupd_address parameter

...
modparam("nsq", "lookupd_address", "nsqlookupd.mydomain.com")
...

4.2. lookupd_port(int)

The nsqlookupd TCP port.

Usage: nsq related.

Default value is 4161.

Example 1.2. Set lookupd_port parameter

...
modparam("nsq", "lookupd_port", 4161)
...

4.3. nsqd_address(str)

The nsqd address. You can specify connecting directly to nsqd instead of using nsqlookupd.

Usage: nsq related.

Default value is 127.0.0.1

Example 1.3. Set nsqd_address parameter

...
modparam("nsq", "nsqd_address", "nsqd.mydomain.com")
...

4.4. nsqd_port(int)

The nsqd TCP port.

Usage: nsq related.

Default value is 4150.

Example 1.4. Set nsqd_port parameter

...
modparam("nsq", "nsqd_port", 4150)
...

4.5. consumer_use_nsqd(int)

Set to 1 if you'd like to connect to nsqd instead of nsqlookupd.

Usage: nsq related.

Default value is 0.

Example 1.5. Set consumer_use_nsqd parameter

...
modparam("nsq", "consumer_use_nsqd", 1)
...

4.6. consumer_event_key(str)

The default name of the field in json payload to compose the event name 1st part

Usage: nsq related.

Default value is Event-Category.

Example 1.6. Set consumer_event_key parameter

...
modparam("nsq", "consumer_event_key", "My-JSON-Field-Name")
...

4.7. consumer_event_sub_key(str)

The default name of the field in json payload to compose the event name 2nd part

Usage: nsq related.

Default value is Event-Name.

Example 1.7. Set consumer_event_sub_key parameter

...
modparam("nsq", "consumer_event_sub_key", "My-JSON-SubField-Name")
...

4.8. max_in_flight(int)

The number of messages the consumer can receive before nsqd expects a response.

Usage: nsq related.

Default value is 1.

Example 1.8. Set max_in_flight parameter

...
modparam("nsq", "max_in_flight", 2)
...

4.9. consumer_workers(int)

Number of consumer connections to NSQ per topic_channel.

Usage: nsq related.

Default value is 4.

Example 1.9. Set consumer_workers parameter

...
modparam("nsq", "consumer_workers", 2)
...

4.10. topic_channel(str)

The NSQ Topic and Channel. Delimiter-separated by :. It be set multiple times to subscribe to multiple topics and channels. The value of consumer_workers is allocated per topic_channel.

Usage: nsq related.

Default value is Kamailio-Topic:Kamailio-Channel.

Example 1.10. Set topic_channel parameter

...
modparam("nsq", "topic_channel", "My-NSQ-Topic:My-NSQ-Channel")
modparam("nsq", "topic_channel", "My-NSQ-Topic-2:My-NSQ-Channel-2")
...

5. Pseudo Variables

Example 1.11. Example usage of $nsqE pseudo variable

	xlog("L_INFO", "received payload $nsqE from NSQ");
	...
}

  • $nsqE Contains the payload of a consumed message

6. Event Routes

The worker process issues an event-route where we can act on the received payload. The name of the event-route is composed by values extracted from the payload.

NSQ module will try to execute the event route from most significant to less significant. define the event route like event_route[nsq:consumer-event[-payload_key_value[-payload_subkey_value]]]

We can set the key/subkey pair on a subscription base. check the payload on subscribe.

Example 1.12. Define the event routes

...
modparam("nsq", "consumer_event_key", "Event-Category")
modparam("nsq", "consumer_event_sub_key", "Event-Name")
...

event_route[nsq:consumer-event-presence-update]
{
	# presence is the value extracted from Event-Category field in json payload
	# update is the value extracted from Event-Name field in json payload
	if ($(nsqE{json.parse,Event-Package}) == "dialog") {
		xlog("L_INFO", "received $(nsqE{json.parse,Event-Package}) update for $(nsqE{json.parse,From})");
		pua_json_publish($nsqE);
	}
	...
}

event_route[nsq:consumer-event-presence]
{
	# presence is the value extracted from Event-Category field in json payload
	xlog("L_INFO", "received $(nsqE{json.parse,Event-Package}) update for $(nsqE{json.parse,From})");
	...
}

event_route[nsq:consumer-event-event-category-event-name]
{
	# event-category is the name of the consumer_event_key parameter
	# event-name is the name of the consumer_event_sub_key parameter
	# this event route is executed if we can't find the previous
	...
}

event_route[nsq:consumer-event-event-category]
{
	# event-category is the name of the consumer_event_key parameter
	# this event route is executed if we can't find the previous
	...
}

event_route[nsq:consumer-event]
{
	# this event route is executed if we can't find the previous
}