Ich war bei demselben Problem nicht sicher, ob Sie eine Lösung gefunden haben oder nicht, aber ich konnte etwas Ähnliches erreichen, indem Sie Folgendes tun. Zuerst habe ich Trigger zu meiner Tabelle hinzugefügt
CREATE TRIGGER trigger_name
AFTER INSERT OR DELETE OR UPDATE
ON table_name
FOR EACH ROW
EXECUTE PROCEDURE trigger_function_name;
Dadurch wird ein Trigger für die Tabelle festgelegt, wenn eine Zeile aktualisiert, gelöscht oder eingefügt wird. Dann ruft es die von mir eingerichtete Trigger-Funktion auf, die etwa so aussah:
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS
$BODY$
DECLARE
payload JSON;
BEGIN
payload = row_to_json(NEW);
PERFORM pg_notify('notification_name', payload::text);
RETURN NULL;
END;
$BODY$;
Auf diese Weise kann ich alle diese Aktualisierungen aus meinem Spring-Boot-Projekt „anhören“ und die gesamte Zeile als Nutzlast senden. Als Nächstes habe ich in meinem Spring-Boot-Projekt eine Verbindung zu meiner Datenbank konfiguriert.
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("host")
.database("db")
.port(port)
.username("username")
.password("password")
.schema("schema")
.connectTimeout(Duration.ofMinutes(2))
.build());
}
}
Damit verdrahte ich es automatisch (Abhängigkeitsinjektion) in den Konstruktor in meiner Dienstklasse und wandle es wie folgt in eine r2dbc PostgressqlConnection-Klasse um:
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();
Jetzt wollen wir unsere Tabelle „anhören“ und benachrichtigt werden, wenn wir eine Aktualisierung an unserer Tabelle durchführen. Dazu richten wir eine Initialisierungsmethode ein, die nach der Abhängigkeitsinjektion mithilfe der @PostContruct-Annotation
ausgeführt wird@PostConstruct
private void postConstruct() {
postgresqlConnection.createStatement("LISTEN notification_name").execute()
.flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}
Beachten Sie, dass wir auf den Namen hören, den wir in die pg_notify-Methode eingeben. Außerdem wollen wir eine Methode einrichten, um die Verbindung zu schließen, wenn die Bohne weggeworfen werden soll, etwa so:
@PreDestroy
private void preDestroy() {
postgresqlConnection.close().subscribe();
}
Jetzt erstelle ich einfach eine Methode, die einen Flux von dem zurückgibt, was sich gerade in meiner Tabelle befindet, und ich führe ihn auch mit meinen Benachrichtigungen zusammen, wie ich bereits sagte, bevor die Benachrichtigungen als JSON eingehen, also musste ich sie deserialisieren und entschied mich für die Verwendung ObjectMapper. Es sieht also etwa so aus:
private Flux<YourClass> getUpdatedRows() {
return postgresqlConnection.getNotifications().map(notification -> {
try {
//deserialize json
return objectMapper.readValue(notification.getParameter(), YourClass.class);
} catch (IOException e) {
//handle exception
}
});
}
public Flux<YourClass> getDocuments() {
return documentRepository.findAll().share().concatWith(getUpdatedRows());
}
Hoffe, das hilft.Prost!