Database
 sql >> Datenbank >  >> RDS >> Database

Konfigurieren von Service Broker für die asynchrone Verarbeitung

In meinem letzten Artikel habe ich über die Vorteile der Implementierung einer asynchronen Verarbeitung mit Service Broker in SQL Server gegenüber anderen Methoden gesprochen, die für die entkoppelte Verarbeitung langer Aufgaben vorhanden sind. In diesem Artikel gehen wir alle Komponenten durch, die für eine grundlegende Service Broker-Konfiguration in einer einzigen Datenbank konfiguriert werden müssen, sowie die wichtigen Überlegungen für die Konversationsverwaltung zwischen Brokerdiensten. Zunächst müssen wir eine Datenbank erstellen und die Datenbank für die Nutzung durch Service Broker aktivieren:

CREATE DATABASE AsyncProcessingDemo;
GO
 
IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0
BEGIN
  ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER;
END
GO
 
USE AsyncProcessingDemo;
GO

Broker-Komponenten konfigurieren

Die grundlegenden Objekte, die in der Datenbank erstellt werden müssen, sind die Nachrichtentypen für die Nachrichten, ein Vertrag, der definiert, wie die Nachrichten zwischen den Diensten gesendet werden, eine Warteschlange und der Initiatordienst sowie eine Warteschlange und der Zieldienst. Viele Online-Beispiele für Service Broker zeigen komplexe Objektbenennungen für die Nachrichtentypen, Verträge und Dienste für Service Broker. Es ist jedoch nicht erforderlich, dass die Namen komplex sind, und einfache Objektnamen können für alle Objekte verwendet werden.

Für die Nachrichten müssen wir einen Nachrichtentyp für die Anfrage erstellen, der AsyncRequest genannt wird , und einen Nachrichtentyp für das Ergebnis, das AsyncResult genannt wird . Beide verwenden XML, das von den Brokerdiensten als korrekt formatiert validiert wird, um die von den Diensten benötigten Daten zu senden und zu empfangen.

-- Create the message types
CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE [AsyncResult]  VALIDATION = WELL_FORMED_XML;

Der Vertrag gibt an, dass die AsyncRequest vom initiierenden Dienst an den Zieldienst gesendet wird und dass der Zieldienst ein AsyncResult zurückgibt Nachricht zurück an den initiierenden Dienst. Der Vertrag kann auch mehrere Nachrichtentypen für den Initiator und das Ziel festlegen oder dass ein bestimmter Nachrichtentyp von jedem Dienst gesendet werden kann, wenn die spezifische Verarbeitung dies erfordert.

-- Create the contract
CREATE CONTRACT [AsyncContract] 
(
  [AsyncRequest] SENT BY INITIATOR, 
  [AsyncResult]  SENT BY TARGET
);

Für jeden der Dienste muss eine Warteschlange erstellt werden, um eine Speicherung der von dem Dienst empfangenen Nachrichten bereitzustellen. Der Zieldienst, an den die Anfrage gesendet wird, muss unter Angabe des AsyncContract erstellt werden damit Nachrichten an den Dienst gesendet werden können. In diesem Fall heißt der Dienst ProcessingService und wird in der ProcessingQueue erstellt innerhalb der Datenbank. Für den initiierenden Dienst muss kein Vertrag angegeben werden, sodass er nur Nachrichten als Antwort auf eine von ihm initiierte Konversation empfangen kann.

-- Create the processing queue and service - specify the contract to allow sending to the service
CREATE QUEUE ProcessingQueue;
CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]);
 
-- Create the request queue and service 
CREATE QUEUE RequestQueue;
CREATE SERVICE [RequestService] ON QUEUE RequestQueue;

Senden einer Nachricht zur Verarbeitung

Wie ich im vorherigen Artikel erklärt habe, bevorzuge ich es, eine gespeicherte Wrapper-Prozedur zum Senden einer neuen Nachricht an einen Brokerdienst zu implementieren, damit sie einmal geändert werden kann, um die Leistung bei Bedarf zu skalieren. Dieses Verfahren ist ein einfacher Wrapper zum Erstellen einer neuen Konversation und zum Senden der Nachricht an den ProcessingService .

-- Create the wrapper procedure for sending messages
CREATE PROCEDURE dbo.SendBrokerMessage 
	@FromService SYSNAME,
	@ToService   SYSNAME,
	@Contract    SYSNAME,
	@MessageType SYSNAME,
	@MessageBody XML
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
 
  BEGIN TRANSACTION;
 
  BEGIN DIALOG CONVERSATION @conversation_handle
    FROM SERVICE @FromService
    TO SERVICE @ToService
    ON CONTRACT @Contract
    WITH ENCRYPTION = OFF;
 
  SEND ON CONVERSATION @conversation_handle
    MESSAGE TYPE @MessageType(@MessageBody);
 
  COMMIT TRANSACTION;
END
GO

Unter Verwendung der gespeicherten Wrapper-Prozedur können wir jetzt eine Testnachricht an den ProcessingService senden um zu bestätigen, dass wir die Maklerdienste korrekt eingerichtet haben.

-- Send a request
EXECUTE dbo.SendBrokerMessage
  @FromService = N'RequestService',
  @ToService   = N'ProcessingService',
  @Contract    = N'AsyncContract',
  @MessageType = N'AsyncRequest',
  @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
 
-- Check for message on processing queue
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO

Verarbeitung von Nachrichten

Während wir die Nachrichten aus der ProcessingQueue manuell verarbeiten könnten , möchten wir wahrscheinlich, dass die Nachrichten automatisch verarbeitet werden, wenn sie an den ProcessingService gesendet werden . Dazu muss eine gespeicherte Aktivierungsprozedur erstellt werden, die wir testen und später an die Warteschlange binden, um die Verarbeitung bei der Warteschlangenaktivierung zu automatisieren. Um eine Nachricht zu verarbeiten, müssen wir RECEIVE die Nachricht aus der Warteschlange innerhalb einer Transaktion zusammen mit dem Nachrichtentyp und dem Konversationshandle für die Nachricht. Der Nachrichtentyp stellt sicher, dass die entsprechende Logik auf die zu verarbeitende Nachricht angewendet wird, und das Konversationshandle ermöglicht das Zurücksenden einer Antwort an den initiierenden Dienst, wenn die Nachricht verarbeitet wurde.

Der RECEIVE Der Befehl ermöglicht die Verarbeitung einer einzelnen Nachricht oder mehrerer Nachrichten innerhalb derselben Konversationskennung oder -gruppe in einer einzigen Transaktion. Um mehrere Nachrichten zu verarbeiten, muss eine Tabellenvariable verwendet werden, oder um eine einzelne Nachricht zu verarbeiten, kann eine lokale Variable verwendet werden. Die folgende Aktivierungsprozedur ruft eine einzelne Nachricht aus der Warteschlange ab und prüft den Nachrichtentyp, um festzustellen, ob es sich um eine AsyncRequest handelt Nachricht und führt dann den lang andauernden Prozess basierend auf den empfangenen Nachrichteninformationen durch. Wenn es innerhalb der Schleife keine Nachricht erhält, wartet es bis zu 5000 ms oder 5 Sekunden, bis eine andere Nachricht in die Warteschlange gelangt, bevor es die Schleife verlässt und seine Ausführung beendet. Nach der Verarbeitung einer Nachricht wird ein AsyncResult erstellt Nachricht und sendet sie mit demselben Konversationshandle, von dem die Nachricht empfangen wurde, an den Initiator zurück. Die Prozedur überprüft auch den Nachrichtentyp, um festzustellen, ob ein EndDialog oder Error Nachricht empfangen wurde, um das Gespräch zu bereinigen, indem es beendet wird.

-- Create processing procedure for processing queue
CREATE PROCEDURE dbo.ProcessingQueueActivation
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM ProcessingQueue
    ), TIMEOUT 5000;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END
 
    IF @message_type_name = N'AsyncRequest'
    BEGIN
      -- Handle complex long processing here
      -- For demonstration we'll pull the account number and send a reply back only
 
      DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT');
 
      -- Build reply message and send back
      DECLARE @reply_message_body XML = N'
        ' + CAST(@AccountNumber AS NVARCHAR(11)) + '
      ';
 
      SEND ON CONVERSATION @conversation_handle
        MESSAGE TYPE [AsyncResult] (@reply_message_body);
    END
 
    -- If end dialog message, end the dialog
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
      END CONVERSATION @conversation_handle;
    END
 
    -- If error message, log and end conversation
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
      -- Log the error code and perform any required handling here
      -- End the conversation for the error
      END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END
GO

Die RequestQueue muss auch die an ihn gesendeten Nachrichten verarbeiten, also ein zusätzliches Verfahren zum Verarbeiten des AsyncResult Nachrichten, die von der ProcessingQueueActivation-Prozedur zurückgegeben werden, müssen erstellt werden. Da wir wissen, dass die AsnycResult-Nachricht bedeutet, dass die gesamte Verarbeitungsarbeit abgeschlossen ist, kann die Konversation beendet werden, sobald wir diese Nachricht verarbeitet haben, wodurch eine EndDialog-Nachricht an ProcessingService gesendet wird, die dann von ihrer Aktivierungsprozedur verarbeitet wird, um die zu beenden Konversation alles aufräumen und das Feuer vermeiden und Probleme vergessen, die auftreten, wenn Konversationen ordnungsgemäß beendet werden.

-- Create procedure for processing replies to the request queue
CREATE PROCEDURE dbo.RequestQueueActivation
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM RequestQueue
    ), TIMEOUT 5000;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END
 
    IF @message_type_name = N'AsyncResult'
    BEGIN
      -- If necessary handle the reply message here
      DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT');
 
      -- Since this is all the work being done, end the conversation to send the EndDialog message
      END CONVERSATION @conversation_handle;
    END
 
    -- If end dialog message, end the dialog
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
       END CONVERSATION @conversation_handle;
    END
 
    -- If error message, log and end conversation
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
       END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END
GO

Testen der Verfahren

Vor der Automatisierung der Warteschlangenverarbeitung für unsere Dienste ist es wichtig, die Aktivierungsverfahren zu testen, um sicherzustellen, dass sie die Nachrichten ordnungsgemäß verarbeiten, und um zu verhindern, dass eine Warteschlange deaktiviert wird, falls ein Fehler auftritt, der nicht ordnungsgemäß behandelt wird. Da es bereits eine Nachricht in der ProcessingQueue gibt die ProcessingQueueActivation Prozedur kann ausgeführt werden, um diese Nachricht zu verarbeiten. Denken Sie daran, dass der WAITFOR führt dazu, dass die Prozedur 5 Sekunden dauert, um beendet zu werden, obwohl die Nachricht sofort aus der Warteschlange verarbeitet wird. Nach der Verarbeitung der Nachricht können wir den korrekten Vorgang überprüfen, indem wir die RequestQueue abfragen um zu sehen, ob ein AsyncResult Nachricht existiert, und dann können wir überprüfen, ob die RequestQueueActivation Prozedur funktioniert korrekt, wenn sie ausgeführt wird.

-- Process the message from the processing queue
EXECUTE dbo.ProcessingQueueActivation;
GO
 
-- Check for reply message on request queue
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO
 
-- Process the message from the request queue
EXECUTE dbo.RequestQueueActivation;
GO

Automatisierung der Verarbeitung

An diesem Punkt sind alle Komponenten vollständig, um unsere Verarbeitung vollständig zu automatisieren. Das Einzige, was noch übrig bleibt, ist, die Aktivierungsprozeduren an ihre entsprechenden Warteschlangen zu binden und dann eine weitere Testnachricht zu senden, um zu bestätigen, dass sie verarbeitet wird und danach nichts in den Warteschlangen verbleibt.

-- Alter the processing queue to specify internal activation
ALTER QUEUE ProcessingQueue
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = dbo.ProcessingQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );
GO
 
-- Alter the request queue to specify internal activation
ALTER QUEUE RequestQueue
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = dbo.RequestQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );
GO
 
-- Test automated activation
-- Send a request
 
EXECUTE dbo.SendBrokerMessage
	@FromService = N'RequestService',
	@ToService   = N'ProcessingService',
	@Contract    = N'AsyncContract',
	@MessageType = N'AsyncRequest',
	@MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
 
-- Check for message on processing queue 
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO
 
-- Check for reply message on request queue 
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO

Zusammenfassung

Die grundlegenden Komponenten für die automatisierte asynchrone Verarbeitung in SQL Server Service Broker können in einem einzigen Datenbank-Setup konfiguriert werden, um eine entkoppelte Verarbeitung lang andauernder Aufgaben zu ermöglichen. Dies kann ein leistungsstarkes Tool zur Verbesserung der Anwendungsleistung aus Sicht des Endbenutzers sein, indem die Verarbeitung von den Interaktionen des Endbenutzers mit der Anwendung entkoppelt wird.