In meinem letzten Artikel habe ich über die Vorteile der Implementierung einer asynchronen Verarbeitung mit Service Broker in SQL Server gegenüber anderen Methoden gesprochen, die für die entkoppelte Verarbeitung langer Aufgaben vorhanden sind. In diesem Artikel gehen wir alle Komponenten durch, die für eine grundlegende Service Broker-Konfiguration in einer einzigen Datenbank konfiguriert werden müssen, sowie die wichtigen Überlegungen für die Konversationsverwaltung zwischen Brokerdiensten. Zunächst müssen wir eine Datenbank erstellen und die Datenbank für die Nutzung durch Service Broker aktivieren:
CREATE DATABASE AsyncProcessingDemo; GO IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0 BEGIN ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER; END GO USE AsyncProcessingDemo; GO
Broker-Komponenten konfigurieren
Die grundlegenden Objekte, die in der Datenbank erstellt werden müssen, sind die Nachrichtentypen für die Nachrichten, ein Vertrag, der definiert, wie die Nachrichten zwischen den Diensten gesendet werden, eine Warteschlange und der Initiatordienst sowie eine Warteschlange und der Zieldienst. Viele Online-Beispiele für Service Broker zeigen komplexe Objektbenennungen für die Nachrichtentypen, Verträge und Dienste für Service Broker. Es ist jedoch nicht erforderlich, dass die Namen komplex sind, und einfache Objektnamen können für alle Objekte verwendet werden.
Für die Nachrichten müssen wir einen Nachrichtentyp für die Anfrage erstellen, der AsyncRequest
genannt wird , und einen Nachrichtentyp für das Ergebnis, das AsyncResult
genannt wird . Beide verwenden XML, das von den Brokerdiensten als korrekt formatiert validiert wird, um die von den Diensten benötigten Daten zu senden und zu empfangen.
-- Create the message types CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML; CREATE MESSAGE TYPE [AsyncResult] VALIDATION = WELL_FORMED_XML;
Der Vertrag gibt an, dass die AsyncRequest
vom initiierenden Dienst an den Zieldienst gesendet wird und dass der Zieldienst ein AsyncResult
zurückgibt Nachricht zurück an den initiierenden Dienst. Der Vertrag kann auch mehrere Nachrichtentypen für den Initiator und das Ziel festlegen oder dass ein bestimmter Nachrichtentyp von jedem Dienst gesendet werden kann, wenn die spezifische Verarbeitung dies erfordert.
-- Create the contract CREATE CONTRACT [AsyncContract] ( [AsyncRequest] SENT BY INITIATOR, [AsyncResult] SENT BY TARGET );
Für jeden der Dienste muss eine Warteschlange erstellt werden, um eine Speicherung der von dem Dienst empfangenen Nachrichten bereitzustellen. Der Zieldienst, an den die Anfrage gesendet wird, muss unter Angabe des AsyncContract
erstellt werden damit Nachrichten an den Dienst gesendet werden können. In diesem Fall heißt der Dienst ProcessingService
und wird in der ProcessingQueue
erstellt innerhalb der Datenbank. Für den initiierenden Dienst muss kein Vertrag angegeben werden, sodass er nur Nachrichten als Antwort auf eine von ihm initiierte Konversation empfangen kann.
-- Create the processing queue and service - specify the contract to allow sending to the service CREATE QUEUE ProcessingQueue; CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Create the request queue and service CREATE QUEUE RequestQueue; CREATE SERVICE [RequestService] ON QUEUE RequestQueue;
Senden einer Nachricht zur Verarbeitung
Wie ich im vorherigen Artikel erklärt habe, bevorzuge ich es, eine gespeicherte Wrapper-Prozedur zum Senden einer neuen Nachricht an einen Brokerdienst zu implementieren, damit sie einmal geändert werden kann, um die Leistung bei Bedarf zu skalieren. Dieses Verfahren ist ein einfacher Wrapper zum Erstellen einer neuen Konversation und zum Senden der Nachricht an den ProcessingService
.
-- Create the wrapper procedure for sending messages CREATE PROCEDURE dbo.SendBrokerMessage @FromService SYSNAME, @ToService SYSNAME, @Contract SYSNAME, @MessageType SYSNAME, @MessageBody XML AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; BEGIN TRANSACTION; BEGIN DIALOG CONVERSATION @conversation_handle FROM SERVICE @FromService TO SERVICE @ToService ON CONTRACT @Contract WITH ENCRYPTION = OFF; SEND ON CONVERSATION @conversation_handle MESSAGE TYPE @MessageType(@MessageBody); COMMIT TRANSACTION; END GO
Unter Verwendung der gespeicherten Wrapper-Prozedur können wir jetzt eine Testnachricht an den ProcessingService
senden um zu bestätigen, dass wir die Maklerdienste korrekt eingerichtet haben.
-- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO
Verarbeitung von Nachrichten
Während wir die Nachrichten aus der ProcessingQueue
manuell verarbeiten könnten , möchten wir wahrscheinlich, dass die Nachrichten automatisch verarbeitet werden, wenn sie an den ProcessingService
gesendet werden . Dazu muss eine gespeicherte Aktivierungsprozedur erstellt werden, die wir testen und später an die Warteschlange binden, um die Verarbeitung bei der Warteschlangenaktivierung zu automatisieren. Um eine Nachricht zu verarbeiten, müssen wir RECEIVE
die Nachricht aus der Warteschlange innerhalb einer Transaktion zusammen mit dem Nachrichtentyp und dem Konversationshandle für die Nachricht. Der Nachrichtentyp stellt sicher, dass die entsprechende Logik auf die zu verarbeitende Nachricht angewendet wird, und das Konversationshandle ermöglicht das Zurücksenden einer Antwort an den initiierenden Dienst, wenn die Nachricht verarbeitet wurde.
Der RECEIVE
Der Befehl ermöglicht die Verarbeitung einer einzelnen Nachricht oder mehrerer Nachrichten innerhalb derselben Konversationskennung oder -gruppe in einer einzigen Transaktion. Um mehrere Nachrichten zu verarbeiten, muss eine Tabellenvariable verwendet werden, oder um eine einzelne Nachricht zu verarbeiten, kann eine lokale Variable verwendet werden. Die folgende Aktivierungsprozedur ruft eine einzelne Nachricht aus der Warteschlange ab und prüft den Nachrichtentyp, um festzustellen, ob es sich um eine AsyncRequest
handelt Nachricht und führt dann den lang andauernden Prozess basierend auf den empfangenen Nachrichteninformationen durch. Wenn es innerhalb der Schleife keine Nachricht erhält, wartet es bis zu 5000 ms oder 5 Sekunden, bis eine andere Nachricht in die Warteschlange gelangt, bevor es die Schleife verlässt und seine Ausführung beendet. Nach der Verarbeitung einer Nachricht wird ein AsyncResult
erstellt Nachricht und sendet sie mit demselben Konversationshandle, von dem die Nachricht empfangen wurde, an den Initiator zurück. Die Prozedur überprüft auch den Nachrichtentyp, um festzustellen, ob ein EndDialog
oder Error
Nachricht empfangen wurde, um das Gespräch zu bereinigen, indem es beendet wird.
-- Create processing procedure for processing queue CREATE PROCEDURE dbo.ProcessingQueueActivation AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle = conversation_handle, @message_body = CAST(message_body AS XML), @message_type_name = message_type_name FROM ProcessingQueue ), TIMEOUT 5000; IF (@@ROWCOUNT = 0) BEGIN ROLLBACK TRANSACTION; BREAK; END IF @message_type_name = N'AsyncRequest' BEGIN -- Handle complex long processing here -- For demonstration we'll pull the account number and send a reply back only DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT'); -- Build reply message and send back DECLARE @reply_message_body XML = N' ' + CAST(@AccountNumber AS NVARCHAR(11)) + ' '; SEND ON CONVERSATION @conversation_handle MESSAGE TYPE [AsyncResult] (@reply_message_body); END -- If end dialog message, end the dialog ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- If error message, log and end conversation ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN -- Log the error code and perform any required handling here -- End the conversation for the error END CONVERSATION @conversation_handle; END COMMIT TRANSACTION; END END GO
Die RequestQueue
muss auch die an ihn gesendeten Nachrichten verarbeiten, also ein zusätzliches Verfahren zum Verarbeiten des AsyncResult
Nachrichten, die von der ProcessingQueueActivation-Prozedur zurückgegeben werden, müssen erstellt werden. Da wir wissen, dass die AsnycResult-Nachricht bedeutet, dass die gesamte Verarbeitungsarbeit abgeschlossen ist, kann die Konversation beendet werden, sobald wir diese Nachricht verarbeitet haben, wodurch eine EndDialog-Nachricht an ProcessingService gesendet wird, die dann von ihrer Aktivierungsprozedur verarbeitet wird, um die zu beenden Konversation alles aufräumen und das Feuer vermeiden und Probleme vergessen, die auftreten, wenn Konversationen ordnungsgemäß beendet werden.
-- Create procedure for processing replies to the request queue CREATE PROCEDURE dbo.RequestQueueActivation AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle = conversation_handle, @message_body = CAST(message_body AS XML), @message_type_name = message_type_name FROM RequestQueue ), TIMEOUT 5000; IF (@@ROWCOUNT = 0) BEGIN ROLLBACK TRANSACTION; BREAK; END IF @message_type_name = N'AsyncResult' BEGIN -- If necessary handle the reply message here DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT'); -- Since this is all the work being done, end the conversation to send the EndDialog message END CONVERSATION @conversation_handle; END -- If end dialog message, end the dialog ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- If error message, log and end conversation ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN END CONVERSATION @conversation_handle; END COMMIT TRANSACTION; END END GO
Testen der Verfahren
Vor der Automatisierung der Warteschlangenverarbeitung für unsere Dienste ist es wichtig, die Aktivierungsverfahren zu testen, um sicherzustellen, dass sie die Nachrichten ordnungsgemäß verarbeiten, und um zu verhindern, dass eine Warteschlange deaktiviert wird, falls ein Fehler auftritt, der nicht ordnungsgemäß behandelt wird. Da es bereits eine Nachricht in der ProcessingQueue
gibt die ProcessingQueueActivation
Prozedur kann ausgeführt werden, um diese Nachricht zu verarbeiten. Denken Sie daran, dass der WAITFOR
führt dazu, dass die Prozedur 5 Sekunden dauert, um beendet zu werden, obwohl die Nachricht sofort aus der Warteschlange verarbeitet wird. Nach der Verarbeitung der Nachricht können wir den korrekten Vorgang überprüfen, indem wir die RequestQueue
abfragen um zu sehen, ob ein AsyncResult
Nachricht existiert, und dann können wir überprüfen, ob die RequestQueueActivation
Prozedur funktioniert korrekt, wenn sie ausgeführt wird.
-- Process the message from the processing queue EXECUTE dbo.ProcessingQueueActivation; GO -- Check for reply message on request queue SELECT CAST(message_body AS XML) FROM RequestQueue; GO -- Process the message from the request queue EXECUTE dbo.RequestQueueActivation; GO
Automatisierung der Verarbeitung
An diesem Punkt sind alle Komponenten vollständig, um unsere Verarbeitung vollständig zu automatisieren. Das Einzige, was noch übrig bleibt, ist, die Aktivierungsprozeduren an ihre entsprechenden Warteschlangen zu binden und dann eine weitere Testnachricht zu senden, um zu bestätigen, dass sie verarbeitet wird und danach nichts in den Warteschlangen verbleibt.
-- Alter the processing queue to specify internal activation ALTER QUEUE ProcessingQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = dbo.ProcessingQueueActivation, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ); GO -- Alter the request queue to specify internal activation ALTER QUEUE RequestQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = dbo.RequestQueueActivation, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ); GO -- Test automated activation -- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue -- nothing is there because it was automatically processed SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO -- Check for reply message on request queue -- nothing is there because it was automatically processed SELECT CAST(message_body AS XML) FROM RequestQueue; GO
Zusammenfassung
Die grundlegenden Komponenten für die automatisierte asynchrone Verarbeitung in SQL Server Service Broker können in einem einzigen Datenbank-Setup konfiguriert werden, um eine entkoppelte Verarbeitung lang andauernder Aufgaben zu ermöglichen. Dies kann ein leistungsstarkes Tool zur Verbesserung der Anwendungsleistung aus Sicht des Endbenutzers sein, indem die Verarbeitung von den Interaktionen des Endbenutzers mit der Anwendung entkoppelt wird.