DBMS_AQ
is a built-in Oracle package for managing Advanced Queuing (AQ), a messaging system integrated into Oracle Database. It provides APIs to enqueue (add) and dequeue (retrieve) messages, enabling asynchronous communication between applications or processes. Oracle Advance Queuing (AQ) provides support for:
- Asynchronous Communication: AQ allows applications to send and receive messages independently, without requiring immediate responses. This is ideal for decoupling systems and improving scalability.
- Distributed Systems: AQ can be used to build distributed systems where different components communicate through message passing.
- Workflows: AQ is often used to implement complex workflows, where tasks are executed sequentially or in parallel based on message processing.
- Event-Driven Architecture: AQ can be integrated into event-driven architectures to react to specific events or changes in the system.
Advanced Queuing (DBMS_AQ
) was introduced in Oracle 8i. Subsequent versions expanded its functionality, offering more robust features like queue monitoring, better integration with PL/SQL, and improved scalability. Starting from Oracle Database Release 21c, AQ also supports JSON
payload type.
DBMS_AQ vs DBMS_PIPE
DBMS_AQ and DBMS_PIPE are both mechanisms for inter-process communication within an Oracle database, but they differ significantly in their scope, features, and reliability. DBMS_PIPE is a simpler mechanism designed for basic, in-memory communication between sessions within a single database instance. It lacks features like message persistence, guaranteed delivery, or advanced queuing capabilities. In contrast, DBMS_AQ is a robust, enterprise-grade messaging system built into the Oracle database. It offers features such as message persistence, reliable delivery, message filtering, and integration with database transactions, making it suitable for complex, distributed applications.
In others words, While DBMS_PIPE
is better suited for lightweight, temporary inter-session messaging, DBMS_AQ
is ideal for reliable workflows and integration across systems.
DBMS packages
Oracle Advanced Queuing (AQ) uses several DBMS
packages to manage and interact with queues. Each package serves a specific purpose:
- DBMS_AQ: Provides basic APIs for enqueueing and dequeueing messages, creating queues, and managing message attributes. It’s the core package for interacting with queues in Oracle AQ.
- DBMS_AQADM: Offers administrative functions to manage AQ objects, such as creating, altering, or dropping queues and queue tables. It is used by database administrators to configure and maintain the AQ infrastructure.
- DBMS_AQELM: This package provides subprograms to manage the configuration of Oracle Advanced Queuing (AQ) asynchronous notification by e-mail and HTTP.
- DBMS_AQIN: This package provides procedures for integrating AQ with external systems. It enables you to interact with external messaging systems, such as JMS (Java Message Service), through AQ. This allows for interoperability and the ability to exchange messages between Oracle AQ and other messaging platforms.
AQ Components
Oracle Advanced Queuing (AQ) uses Oracle database tables to store messages. Here’s a brief explanation of the key components:
- Queue Tables: A database object to manage queues. Multiple queues can exist in a single queue table.
- Queues: Logical objects where messages are enqueued and dequeued. They can be of two types: normal queues (for point-to-point communication) and sharded queues (for scalability). A sharded queue is partitioned into multiple shards (sub-queues) that are distributed across different instances in a RAC (Real Application Clusters) environment. Each shard independently handles enqueue and dequeue operations, allowing parallel processing and improved performance.
- Messages: Units of data exchanged between producers (enqueue) and consumers (dequeue).
- Subscribers: Entities that consume messages from a queue, either explicitly (dequeue) or through notification (subscription).
- Agents: Represent subscribers or producers, associated with queues.
- Payload: The message content, which can be RAW, VARCHAR2, or an object type.
Setting up AQ
Grants and Roles for DBMS_AQADM (Administrative Operations)
The DBMS_AQADM
package is used for administrative tasks like creating and managing queue tables and queues. These operations require elevated privileges.
- Roles:
AQ_ADMINISTRATOR_ROLE
: Grants permissions to perform all administrative tasks related to Advanced Queuing.
- System Privileges:
EXECUTE
onDBMS_AQADM
: Allows execution of administrative procedures.
- Additional Privileges (if needed):
CREATE TABLE
: Required to create queue tables.CREATE ANY QUEUE
: Required to create queues in schemas other than the user’s own.DROP ANY QUEUE
: Allows dropping queues in any schema.
CREATE USER test_adm IDENTIFIED BY test_adm; ALTER USER test_adm QUOTA UNLIMITED ON users; GRANT CREATE SESSION TO test_adm; GRANT CREATE ANY TYPE TO test_adm; GRANT EXECUTE ON DBMS_AQADM TO test_adm; GRANT aq_administrator_role TO test_adm;
Creating the Queue Table and the Queue.
While the queue table is a database object that acts as the underlying storage for one or more queues, a queue is a logical abstraction on top of the queue table. It defines a specific messaging context, such as a point-to-point queue or a publish/subscribe queue. A queue must be associated with a queue table, but a queue table can support multiple queues.
In Oracle Database 21c, the JSON
datatype is supported natively. Here’s how to use it as the payload for Advanced Queuing (AQ):
-- Create a queue table using JSON as the payload type BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( QUEUE_TABLE => 'json_queue_table', QUEUE_PAYLOAD_TYPE => 'JSON'); END; / -- Create a queue BEGIN DBMS_AQADM.CREATE_QUEUE( QUEUE_NAME => 'json_queue', QUEUE_TABLE => 'json_queue_table'); END; / -- Start the queue BEGIN DBMS_AQADM.START_QUEUE( QUEUE_NAME => 'json_queue'); END; / -- To List All Queue Tables SELECT OWNER, QUEUE_TABLE FROM DBA_QUEUE_TABLES WHERE OWNER = 'TEST_ADM' ORDER BY OWNER, QUEUE_TABLE; OWNER QUEUE_TABLE ======== ================ TEST_ADM JSON_QUEUE_TABLE -- To List All Queues SELECT OWNER, NAME AS QUEUE_NAME, QUEUE_TABLE, QUEUE_TYPE FROM DBA_QUEUES WHERE OWNER = 'TEST_ADM' ORDER BY OWNER, NAME; OWNER QUEUE_NAME QUEUE_TABLE QUEUE_TYPE ======== ====================== ================ ================= TEST_ADM AQ$_JSON_QUEUE_TABLE_E JSON_QUEUE_TABLE EXCEPTION_QUEUE TEST_ADM JSON_QUEUE JSON_QUEUE_TABLE NORMAL_QUEUE
Think of a queue_table
as a mailbox. It’s a physical container where letters are stored. The queue
represents the rules for how mail is delivered to that mailbox (e.g., priority mail goes to the top, certain addresses are allowed, etc.).
As shown above, two queues are created. The NORMAL_QUEUE
is the standard queue type for typical message processing. Messages are enqueued and dequeued in the order they are received, following the defined delivery options (e.g., priority). The EXCEPTION_QUEUE
is a special queue where messages are moved when processing errors occur in a NORMAL_QUEUE
, such as exceeding retry limits or encountering exceptions during message handling. This allows for separate handling and investigation of failed messages, ensuring message reliability and preventing data loss.
Basic Enqueuing and Dequeuing
The following code showcases a simple point-to-point communication setup using Oracle AQ with structured JSON payloads.
set SERVEROUTPUT on -- Enqueue a JSON message DECLARE message_handle RAW(16); json_payload JSON := JSON('{"key": "value", "data": 123}'); BEGIN DBMS_AQ.ENQUEUE( queue_name => 'json_queue', enqueue_options => DBMS_AQ.ENQUEUE_OPTIONS_T(), message_properties => DBMS_AQ.MESSAGE_PROPERTIES_T(), payload => json_payload, msgid => message_handle ); DBMS_OUTPUT.PUT_LINE('Message enqueued with ID: ' || RAWTOHEX(message_handle)); END; / -- Dequeue a JSON message DECLARE dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; payload JSON; msg_id RAW(16); BEGIN -- Set dequeue options dequeue_options.wait := DBMS_AQ.NO_WAIT; -- Dequeue the message DBMS_AQ.DEQUEUE( queue_name => 'json_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => payload, msgid => msg_id); -- Output the JSON payload DBMS_OUTPUT.PUT_LINE('Dequeued JSON: ' || json_serialize(payload)); END; /
Dropping the queues
You cannot drop a queue table if it still contains active queues. Ensure the queue is stopped before dropping it and use caution when dropping objects, as this action is irreversible.
BEGIN DBMS_AQADM.STOP_QUEUE(queue_name => 'JSON_QUEUE'); DBMS_AQADM.DROP_QUEUE(queue_name => 'JSON_QUEUE'); END; / BEGIN DBMS_AQADM.DROP_QUEUE_TABLE(queue_table => 'JSON_QUEUE_TABLE'); END; /
References
- Transactional Event Queues and Advanced Queuing User’s Guide
- DBMS_AQADM in PL/SQL Packages and Types Reference
- DBMS_AQ in PL/SQL Packages and Types Reference