DBMS_AQ is a built-in Oracle package for managing Advanced Queuing (AQ), a messaging system integrated into Oracle Database. It provides APIs to enqueue (add) and dequeue (retrieve) messages, enabling asynchronous communication between applications or processes. Oracle Advance Queuing (AQ) provides support for:

  • Asynchronous Communication: AQ allows applications to send and receive messages independently, without requiring immediate responses. This is ideal for decoupling systems and improving scalability.
  • Distributed Systems: AQ can be used to build distributed systems where different components communicate through message passing.
  • Workflows: AQ is often used to implement complex workflows, where tasks are executed sequentially or in parallel based on message processing.
  • Event-Driven Architecture: AQ can be integrated into event-driven architectures to react to specific events or changes in the system.

Advanced Queuing (DBMS_AQ) was introduced in Oracle 8i. Subsequent versions expanded its functionality, offering more robust features like queue monitoring, better integration with PL/SQL, and improved scalability. Starting from Oracle Database Release 21c, AQ also supports JSON payload type.

DBMS_AQ vs DBMS_PIPE

DBMS_AQ and DBMS_PIPE are both mechanisms for inter-process communication within an Oracle database, but they differ significantly in their scope, features, and reliability. DBMS_PIPE is a simpler mechanism designed for basic, in-memory communication between sessions within a single database instance. It lacks features like message persistence, guaranteed delivery, or advanced queuing capabilities. In contrast, DBMS_AQ is a robust, enterprise-grade messaging system built into the Oracle database. It offers features such as message persistence, reliable delivery, message filtering, and integration with database transactions, making it suitable for complex, distributed applications.

In others words, While DBMS_PIPE is better suited for lightweight, temporary inter-session messaging, DBMS_AQ is ideal for reliable workflows and integration across systems.

DBMS packages

Oracle Advanced Queuing (AQ) uses several DBMS packages to manage and interact with queues. Each package serves a specific purpose:

  • DBMS_AQ: Provides basic APIs for enqueueing and dequeueing messages, creating queues, and managing message attributes. It’s the core package for interacting with queues in Oracle AQ.
  • DBMS_AQADM: Offers administrative functions to manage AQ objects, such as creating, altering, or dropping queues and queue tables. It is used by database administrators to configure and maintain the AQ infrastructure.
  • DBMS_AQELM: This package provides subprograms to manage the configuration of Oracle Advanced Queuing (AQ) asynchronous notification by e-mail and HTTP.
  • DBMS_AQIN: This package provides procedures for integrating AQ with external systems. It enables you to interact with external messaging systems, such as JMS (Java Message Service), through AQ. This allows for interoperability and the ability to exchange messages between Oracle AQ and other messaging platforms.

AQ Components

Oracle Advanced Queuing (AQ) uses Oracle database tables to store messages. Here’s a brief explanation of the key components:

  1. Queue Tables: A database object to manage queues. Multiple queues can exist in a single queue table.
  2. Queues: Logical objects where messages are enqueued and dequeued. They can be of two types: normal queues (for point-to-point communication) and sharded queues (for scalability). A sharded queue is partitioned into multiple shards (sub-queues) that are distributed across different instances in a RAC (Real Application Clusters) environment. Each shard independently handles enqueue and dequeue operations, allowing parallel processing and improved performance.
  3. Messages: Units of data exchanged between producers (enqueue) and consumers (dequeue).
  4. Subscribers: Entities that consume messages from a queue, either explicitly (dequeue) or through notification (subscription).
  5. Agents: Represent subscribers or producers, associated with queues.
  6. Payload: The message content, which can be RAW, VARCHAR2, or an object type.

Setting up AQ

Grants and Roles for DBMS_AQADM (Administrative Operations)

The DBMS_AQADM package is used for administrative tasks like creating and managing queue tables and queues. These operations require elevated privileges.

  1. Roles:
    • AQ_ADMINISTRATOR_ROLE: Grants permissions to perform all administrative tasks related to Advanced Queuing.
  2. System Privileges:
    • EXECUTE on DBMS_AQADM: Allows execution of administrative procedures.
  3. Additional Privileges (if needed):
    • CREATE TABLE: Required to create queue tables.
    • CREATE ANY QUEUE: Required to create queues in schemas other than the user’s own.
    • DROP ANY QUEUE: Allows dropping queues in any schema.
CREATE USER test_adm IDENTIFIED BY test_adm;
ALTER USER test_adm QUOTA UNLIMITED ON users;
GRANT CREATE SESSION TO test_adm;
GRANT CREATE ANY TYPE TO test_adm; 
GRANT EXECUTE ON DBMS_AQADM TO test_adm;
GRANT aq_administrator_role TO test_adm;

Creating the Queue Table and the Queue.

While the queue table is a database object that acts as the underlying storage for one or more queues, a queue is a logical abstraction on top of the queue table. It defines a specific messaging context, such as a point-to-point queue or a publish/subscribe queue. A queue must be associated with a queue table, but a queue table can support multiple queues.

In Oracle Database 21c, the JSON datatype is supported natively. Here’s how to use it as the payload for Advanced Queuing (AQ):

-- Create a queue table using JSON as the payload type
BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE(
      QUEUE_TABLE => 'json_queue_table',
      QUEUE_PAYLOAD_TYPE => 'JSON');
END;
/

-- Create a queue
BEGIN
   DBMS_AQADM.CREATE_QUEUE(
      QUEUE_NAME => 'json_queue',
      QUEUE_TABLE => 'json_queue_table');
END;
/

-- Start the queue
BEGIN
   DBMS_AQADM.START_QUEUE(
      QUEUE_NAME => 'json_queue');
END;
/

-- To List All Queue Tables
SELECT OWNER, QUEUE_TABLE
FROM DBA_QUEUE_TABLES
WHERE OWNER = 'TEST_ADM'
ORDER BY OWNER, QUEUE_TABLE;

OWNER	   QUEUE_TABLE
========   ================
TEST_ADM   JSON_QUEUE_TABLE



-- To List All Queues
SELECT OWNER, NAME AS QUEUE_NAME, QUEUE_TABLE, QUEUE_TYPE
FROM DBA_QUEUES
WHERE OWNER = 'TEST_ADM'
ORDER BY OWNER, NAME;

OWNER      QUEUE_NAME	           QUEUE_TABLE	      QUEUE_TYPE
========   ======================  ================   =================   
TEST_ADM   AQ$_JSON_QUEUE_TABLE_E  JSON_QUEUE_TABLE   EXCEPTION_QUEUE
TEST_ADM   JSON_QUEUE	           JSON_QUEUE_TABLE   NORMAL_QUEUE

Think of a queue_table as a mailbox. It’s a physical container where letters are stored. The queue represents the rules for how mail is delivered to that mailbox (e.g., priority mail goes to the top, certain addresses are allowed, etc.).

As shown above, two queues are created. The NORMAL_QUEUE is the standard queue type for typical message processing. Messages are enqueued and dequeued in the order they are received, following the defined delivery options (e.g., priority). The EXCEPTION_QUEUE is a special queue where messages are moved when processing errors occur in a NORMAL_QUEUE, such as exceeding retry limits or encountering exceptions during message handling. This allows for separate handling and investigation of failed messages, ensuring message reliability and preventing data loss.

Basic Enqueuing and Dequeuing

The following code showcases a simple point-to-point communication setup using Oracle AQ with structured JSON payloads.

set SERVEROUTPUT on
-- Enqueue a JSON message
DECLARE
   message_handle RAW(16);
   json_payload JSON := JSON('{"key": "value", "data": 123}');
BEGIN
   DBMS_AQ.ENQUEUE(
      queue_name      => 'json_queue',
      enqueue_options => DBMS_AQ.ENQUEUE_OPTIONS_T(),
      message_properties => DBMS_AQ.MESSAGE_PROPERTIES_T(),
      payload         => json_payload,
      msgid           => message_handle
   );
   DBMS_OUTPUT.PUT_LINE('Message enqueued with ID: ' || RAWTOHEX(message_handle));
END;
/


-- Dequeue a JSON message
DECLARE
   dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
   message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   payload JSON;
   msg_id RAW(16);
BEGIN
   -- Set dequeue options
   dequeue_options.wait := DBMS_AQ.NO_WAIT;

   -- Dequeue the message
   DBMS_AQ.DEQUEUE(
      queue_name => 'json_queue',
      dequeue_options => dequeue_options,
      message_properties => message_properties,
      payload => payload,
      msgid => msg_id);

   -- Output the JSON payload
   DBMS_OUTPUT.PUT_LINE('Dequeued JSON: ' || json_serialize(payload));
END;
/

Dropping the queues

You cannot drop a queue table if it still contains active queues. Ensure the queue is stopped before dropping it and use caution when dropping objects, as this action is irreversible.

BEGIN
   DBMS_AQADM.STOP_QUEUE(queue_name => 'JSON_QUEUE');
   DBMS_AQADM.DROP_QUEUE(queue_name => 'JSON_QUEUE');
END;
/

BEGIN
   DBMS_AQADM.DROP_QUEUE_TABLE(queue_table => 'JSON_QUEUE_TABLE');
END;
/

References

Leave a Reply

Discover more from DB-Master

Subscribe now to keep reading and get access to the full archive.

Continue reading