Queues

PGMQ Extension


pgmq is a lightweight message queue built on Postgres.

Features

  • Lightweight - No background worker or external dependencies, just Postgres functions packaged in an extension
  • "exactly once" delivery of messages to a consumer within a visibility timeout
  • API parity with AWS SQS and RSMQ
  • Messages stay in the queue until explicitly removed
  • Messages can be archived, instead of deleted, for long-term retention and replayability

Enable the extension


_10
create extension pgmq;

Usage

Queue management

create

Create a new queue.


_10
pgmq.create(queue_name text)
_10
returns void

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue

Example:


_10
select from pgmq.create('my_queue');
_10
create
_10
--------

create_unlogged

Creates an unlogged table. This is useful when write throughput is more important than durability. See Postgres documentation for unlogged tables for more information.


_10
pgmq.create_unlogged(queue_name text)
_10
returns void

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue

Example:


_10
select pgmq.create_unlogged('my_unlogged');
_10
create_unlogged
_10
-----------------


detach_archive

Drop the queue's archive table as a member of the PGMQ extension. Useful for preventing the queue's archive table from being drop when drop extension pgmq is executed. This does not prevent the further archives() from appending to the archive table.


_10
pgmq.detach_archive(queue_name text)

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue

Example:


_10
select * from pgmq.detach_archive('my_queue');
_10
detach_archive
_10
----------------


drop_queue

Deletes a queue and its archive table.


_10
pgmq.drop_queue(queue_name text)
_10
returns boolean

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue

Example:


_10
select * from pgmq.drop_queue('my_unlogged');
_10
drop_queue
_10
------------
_10
t

Sending messages

send

Send a single message to a queue.


_10
pgmq.send(
_10
queue_name text,
_10
msg jsonb,
_10
delay integer default 0
_10
)
_10
returns setof bigint

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue
msgjsonbThe message to send to the queue
delayintegerTime in seconds before the message becomes visible. Defaults to 0.

Example:


_10
select * from pgmq.send('my_queue', '{"hello": "world"}');
_10
send
_10
------
_10
4


send_batch

Send 1 or more messages to a queue.


_10
pgmq.send_batch(
_10
queue_name text,
_10
msgs jsonb[],
_10
delay integer default 0
_10
)
_10
returns setof bigint

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue
msgsjsonb[]Array of messages to send to the queue
delayintegerTime in seconds before the messages becomes visible. Defaults to 0.

_11
select * from pgmq.send_batch(
_11
'my_queue',
_11
array[
_11
'{"hello": "world_0"}'::jsonb,
_11
'{"hello": "world_1"}'::jsonb
_11
]
_11
);
_11
send_batch
_11
------------
_11
1
_11
2


Reading messages

read

Read 1 or more messages from a queue. The VT specifies the delay in seconds between reading and the message becoming invisible to other consumers.


_10
pgmq.read(
_10
queue_name text,
_10
vt integer,
_10
qty integer
_10
)
_10
_10
returns setof pgmq.message_record

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue
vtintegerTime in seconds that the message become invisible after reading
qtyintegerThe number of messages to read from the queue. Defaults to 1

Example:


_10
select * from pgmq.read('my_queue', 10, 2);
_10
msg_id | read_ct | enqueued_at | vt | message
_10
--------+---------+-------------------------------+-------------------------------+----------------------
_10
1 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608922-05 | {"hello": "world_0"}
_10
2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"}
_10
(2 rows)


read_with_poll

Same as read(). Also provides convenient long-poll functionality. When there are no messages in the queue, the function call will wait for max_poll_seconds in duration before returning. If messages reach the queue during that duration, they will be read and returned immediately.


_10
pgmq.read_with_poll(
_10
queue_name text,
_10
vt integer,
_10
qty integer,
_10
max_poll_seconds integer default 5,
_10
poll_interval_ms integer default 100
_10
)
_10
returns setof pgmq.message_record

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue
vtintegerTime in seconds that the message become invisible after reading.
qtyintegerThe number of messages to read from the queue. Defaults to 1.
max_poll_secondsintegerTime in seconds to wait for new messages to reach the queue. Defaults to 5.
poll_interval_msintegerMilliseconds between the internal poll operations. Defaults to 100.

Example:


_10
select * from pgmq.read_with_poll('my_queue', 1, 1, 5, 100);
_10
msg_id | read_ct | enqueued_at | vt | message
_10
--------+---------+-------------------------------+-------------------------------+--------------------
_10
1 | 1 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"}


pop

Reads a single message from a queue and deletes it upon read.

Note: utilization of pop() results in at-most-once delivery semantics if the consuming application does not guarantee processing of the message.


_10
pgmq.pop(queue_name text)
_10
returns setof pgmq.message_record

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue

Example:


_10
pgmq=# select * from pgmq.pop('my_queue');
_10
msg_id | read_ct | enqueued_at | vt | message
_10
--------+---------+-------------------------------+-------------------------------+--------------------
_10
1 | 2 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"}


Deleting/Archiving messages

delete (single)

Deletes a single message from a queue.


_10
pgmq.delete (queue_name text, msg_id: bigint)
_10
returns boolean

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue
msg_idbigintMessage ID of the message to delete

Example:


_10
select pgmq.delete('my_queue', 5);
_10
delete
_10
--------
_10
t


delete (batch)

Delete one or many messages from a queue.


_10
pgmq.delete (queue_name text, msg_ids: bigint[])
_10
returns setof bigint

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue
msg_idsbigint[]Array of message IDs to delete

Examples:

Delete two messages that exist.


_10
select * from pgmq.delete('my_queue', array[2, 3]);
_10
delete
_10
--------
_10
2
_10
3

Delete two messages, one that exists and one that does not. Message 999 does not exist.


_10
select * from pgmq.delete('my_queue', array[6, 999]);
_10
delete
_10
--------
_10
6


purge_queue

Permanently deletes all messages in a queue. Returns the number of messages that were deleted.


_10
purge_queue(queue_name text)
_10
returns bigint

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue

Example:

Purge the queue when it contains 8 messages;


_10
select * from pgmq.purge_queue('my_queue');
_10
purge_queue
_10
-------------
_10
8


archive (single)

Removes a single requested message from the specified queue and inserts it into the queue's archive.


_10
pgmq.archive(queue_name text, msg_id bigint)
_10
returns boolean

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue
msg_idbigintMessage ID of the message to archive

Returns Boolean value indicating success or failure of the operation.

Example; remove message with ID 1 from queue my_queue and archive it:


_10
select * from pgmq.archive('my_queue', 1);
_10
archive
_10
---------
_10
t


archive (batch)

Deletes a batch of requested messages from the specified queue and inserts them into the queue's archive. Returns an array of message ids that were successfully archived.


_10
pgmq.archive(queue_name text, msg_ids bigint[])
_10
RETURNS SETOF bigint

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue
msg_idsbigint[]Array of message IDs to archive

Examples:

Delete messages with ID 1 and 2 from queue my_queue and move to the archive.


_10
select * from pgmq.archive('my_queue', array[1, 2]);
_10
archive
_10
---------
_10
1
_10
2

Delete messages 4, which exists and 999, which does not exist.


_10
select * from pgmq.archive('my_queue', array[4, 999]);
_10
archive
_10
---------
_10
4


Utilities

set_vt

Sets the visibility timeout of a message to a specified time duration in the future. Returns the record of the message that was updated.


_10
pgmq.set_vt(
_10
queue_name text,
_10
msg_id bigint,
_10
vt_offset integer
_10
)
_10
returns pgmq.message_record

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue
msg_idbigintID of the message to set visibility time
vt_offsetintegerDuration from now, in seconds, that the message's VT should be set to

Example:

Set the visibility timeout of message 1 to 30 seconds from now.


_10
select * from pgmq.set_vt('my_queue', 11, 30);
_10
msg_id | read_ct | enqueued_at | vt | message
_10
--------+---------+-------------------------------+-------------------------------+----------------------
_10
1 | 0 | 2023-10-28 19:42:21.778741-05 | 2023-10-28 19:59:34.286462-05 | {"hello": "world_0"}


list_queues

List all the queues that currently exist.


_10
list_queues()
_10
RETURNS TABLE(
_10
queue_name text,
_10
created_at timestamp with time zone,
_10
is_partitioned boolean,
_10
is_unlogged boolean
_10
)

Example:


_10
select * from pgmq.list_queues();
_10
queue_name | created_at | is_partitioned | is_unlogged
_10
----------------------+-------------------------------+----------------+-------------
_10
my_queue | 2023-10-28 14:13:17.092576-05 | f | f
_10
my_partitioned_queue | 2023-10-28 19:47:37.098692-05 | t | f
_10
my_unlogged | 2023-10-28 20:02:30.976109-05 | f | t


metrics

Get metrics for a specific queue.


_10
pgmq.metrics(queue_name: text)
_10
returns table(
_10
queue_name text,
_10
queue_length bigint,
_10
newest_msg_age_sec integer,
_10
oldest_msg_age_sec integer,
_10
total_messages bigint,
_10
scrape_time timestamp with time zone
_10
)

Parameters:

ParameterTypeDescription
queue_nametextThe name of the queue

Returns:

AttributeTypeDescription
queue_nametextThe name of the queue
queue_lengthbigintNumber of messages currently in the queue
newest_msg_age_secinteger | nullAge of the newest message in the queue, in seconds
oldest_msg_age_secinteger | nullAge of the oldest message in the queue, in seconds
total_messagesbigintTotal number of messages that have passed through the queue over all time
scrape_timetimestamp with time zoneThe current timestamp

Example:


_10
select * from pgmq.metrics('my_queue');
_10
queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time
_10
------------+--------------+--------------------+--------------------+----------------+-------------------------------
_10
my_queue | 16 | 2445 | 2447 | 35 | 2023-10-28 20:23:08.406259-05


metrics_all

Get metrics for all existing queues.


_10
pgmq.metrics_all()
_10
RETURNS TABLE(
_10
queue_name text,
_10
queue_length bigint,
_10
newest_msg_age_sec integer,
_10
oldest_msg_age_sec integer,
_10
total_messages bigint,
_10
scrape_time timestamp with time zone
_10
)

Returns:

AttributeTypeDescription
queue_nametextThe name of the queue
queue_lengthbigintNumber of messages currently in the queue
newest_msg_age_secinteger | nullAge of the newest message in the queue, in seconds
oldest_msg_age_secinteger | nullAge of the oldest message in the queue, in seconds
total_messagesbigintTotal number of messages that have passed through the queue over all time
scrape_timetimestamp with time zoneThe current timestamp

_10
select * from pgmq.metrics_all();
_10
queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time
_10
----------------------+--------------+--------------------+--------------------+----------------+-------------------------------
_10
my_queue | 16 | 2563 | 2565 | 35 | 2023-10-28 20:25:07.016413-05
_10
my_partitioned_queue | 1 | 11 | 11 | 1 | 2023-10-28 20:25:07.016413-05
_10
my_unlogged | 1 | 3 | 3 | 1 | 2023-10-28 20:25:07.016413-05

Types

message_record

The complete representation of a message in a queue.

Attribute NameTypeDescription
msg_idbigintUnique ID of the message
read_ctbigintNumber of times the message has been read. Increments on read().
enqueued_attimestamp with time zonetime that the message was inserted into the queue
vttimestamp with time zoneTimestamp when the message will become available for consumers to read
messagejsonbThe message payload

Example:


_10
msg_id | read_ct | enqueued_at | vt | message
_10
--------+---------+-------------------------------+-------------------------------+--------------------
_10
1 | 1 | 2023-10-28 19:06:19.941509-05 | 2023-10-28 19:06:27.419392-05 | {"hello": "world"}

Resources