In Apache Kafka schreiben Java-Anwendungen namens Producer strukturierte Nachrichten an einen Kafka-Cluster (bestehend aus Brokern). In ähnlicher Weise lesen Java-Anwendungen, sogenannte Verbraucher, diese Nachrichten aus demselben Cluster. In einigen Organisationen gibt es verschiedene Gruppen, die für das Schreiben und Verwalten der Erzeuger und Verbraucher zuständig sind. In solchen Fällen kann ein großer Schmerzpunkt in der Abstimmung des vereinbarten Nachrichtenformats zwischen Herstellern und Verbrauchern liegen.
Dieses Beispiel zeigt, wie Apache Avro verwendet wird, um Datensätze zu serialisieren, die für Apache Kafka produziert werden, während die Entwicklung von Schemas und die nicht synchrone Aktualisierung von Produzenten- und Verbraucheranwendungen ermöglicht werden.
Serialisierung und Deserialisierung
Ein Kafka-Record (früher Message genannt) besteht aus einem Schlüssel, einem Wert und Headern. Kafka ist sich der Struktur der Daten im Schlüssel und Wert der Datensätze nicht bewusst. Es behandelt sie als Byte-Arrays. Aber Systeme, die Datensätze aus Kafka lesen, kümmern sich um die Daten in diesen Datensätzen. Sie müssen also Daten in einem lesbaren Format erstellen. Das von Ihnen verwendete Datenformat sollte
- Sei kompakt
- Sei schnell beim Codieren und Decodieren
- Entwicklung zulassen
- Upstream-Systemen (solche, die in einen Kafka-Cluster schreiben) und Downstream-Systemen (solche, die aus demselben Kafka-Cluster lesen) erlauben, zu unterschiedlichen Zeiten auf neuere Schemas zu aktualisieren
JSON zum Beispiel ist selbsterklärend, aber kein kompaktes Datenformat und lässt sich nur langsam analysieren. Avro ist ein schnelles Serialisierungs-Framework, das eine relativ kompakte Ausgabe erstellt. Aber um Avro-Datensätze zu lesen, benötigen Sie das Schema, mit dem die Daten serialisiert wurden.
Eine Möglichkeit besteht darin, das Schema mit dem Datensatz selbst zu speichern und zu übertragen. Dies ist in einer Datei in Ordnung, in der Sie das Schema einmal speichern und es für eine große Anzahl von Datensätzen verwenden. Das Speichern des Schemas in jedem einzelnen Kafka-Datensatz fügt jedoch einen erheblichen Overhead in Bezug auf Speicherplatz und Netzwerknutzung hinzu. Eine andere Option besteht darin, einen vereinbarten Satz von Bezeichner-Schema-Zuordnungen zu haben und auf Schemas anhand ihrer Bezeichner im Datensatz zu verweisen.
Vom Objekt zum Kafka-Record und zurück
Produzentenanwendungen müssen Daten nicht direkt in Byte-Arrays konvertieren. KafkaProducer ist eine generische Klasse, für die der Benutzer Schlüssel- und Werttypen angeben muss. Dann akzeptieren Produzenten Instanzen von ProducerRecord
die dieselben Typparameter haben. Die Konvertierung vom Objekt in das Byte-Array erfolgt durch einen Serializer. Kafka bietet einige primitive Serialisierer:zum Beispiel IntegerSerializer
, ByteArraySerializer
, StringSerializer
. Auf Verbraucherseite konvertieren ähnliche Deserialisierer Byte-Arrays in ein Objekt, mit dem die Anwendung umgehen kann.
Daher ist es sinnvoll, sich auf Serializer- und Deserializer-Ebene einzuklinken und Entwicklern von Producer- und Consumer-Anwendungen die komfortable Nutzung der von Kafka bereitgestellten Schnittstelle zu ermöglichen. Obwohl die neuesten Versionen von Kafka ExtendedSerializers
zulassen und ExtendedDeserializers
Um auf Kopfzeilen zuzugreifen, haben wir uns entschieden, die Schema-ID in Schlüssel und Wert von Kafka-Datensätzen aufzunehmen, anstatt Datensatzkopfzeilen hinzuzufügen.
Avro-Essentials
Avro ist ein Framework für Datenserialisierung (und Remoteprozeduraufruf). Es verwendet ein JSON-Dokument namens Schema, um Datenstrukturen zu beschreiben. Der größte Teil der Avro-Nutzung erfolgt entweder über GenericRecord oder Unterklassen von SpecificRecord. Aus Avro-Schemata generierte Java-Klassen sind Unterklassen der letzteren, während erstere ohne vorherige Kenntnis der verwendeten Datenstruktur verwendet werden können.
Wenn zwei Schemas eine Reihe von Kompatibilitätsregeln erfüllen, können Daten, die mit einem Schema (dem so genannten Writer-Schema) geschrieben wurden, so gelesen werden, als ob sie mit dem anderen (dem so genannten Reader-Schema) geschrieben worden wären. Schemas haben eine kanonische Form, bei der alle Details, die für die Serialisierung irrelevant sind, wie z. B. Kommentare, entfernt wurden, um die Äquivalenzprüfung zu erleichtern.
Versioniertes Schema und SchemaProvider
Wie bereits erwähnt, benötigen wir eine Eins-zu-Eins-Zuordnung zwischen Schemas und ihren Bezeichnern. Manchmal ist es einfacher, mit Namen auf Schemas zu verweisen. Wenn ein kompatibles Schema erstellt wird, kann es als nächste Version des Schemas betrachtet werden. Somit können wir auf Schemas mit einem Namen-Version-Paar verweisen. Nennen wir das Schema, seine Kennung, seinen Namen und seine Version zusammen ein VersionedSchema
. Dieses Objekt kann zusätzliche Metadaten enthalten, die die Anwendung benötigt.
public class VersionedSchema { private final int id; private final String name; private final int version; private final Schema schema; public VersionedSchema(int id, String name, int version, Schema schema) { this.id = id; this.name = name; this.version = version; this.schema = schema; } public String getName() { return name; } public int getVersion() { return version; } public Schema getSchema() { return schema; } public int getId() { return id; } }
SchemaProvider
Objekte können die Instanzen von VersionedSchema
nachschlagen .
public interface SchemaProvider extends AutoCloseable { public VersionedSchema get(int id); public VersionedSchema get(String schemaName, int schemaVersion); public VersionedSchema getMetadata(Schema schema); }
Wie diese Schnittstelle implementiert wird, wird in einem zukünftigen Blogbeitrag unter „Implementieren eines Schemaspeichers“ behandelt.
Generische Daten serialisieren
Beim Serialisieren eines Datensatzes müssen wir zuerst herausfinden, welches Schema verwendet werden soll. Jeder Datensatz hat ein getSchema
Methode. Das Herausfinden des Bezeichners aus dem Schema kann jedoch zeitaufwändig sein. Im Allgemeinen ist es effizienter, das Schema zum Zeitpunkt der Initialisierung festzulegen. Dies kann direkt nach Kennung oder nach Name und Version erfolgen. Darüber hinaus möchten wir beim Produzieren für mehrere Themen möglicherweise unterschiedliche Schemas für verschiedene Themen festlegen und das Schema anhand des Themennamens ermitteln, der als Parameter für die Methode serialize(T, String)
bereitgestellt wird . Diese Logik wird in unseren Beispielen der Kürze und Einfachheit halber weggelassen.
private VersionedSchema getSchema(T data, String topic) { return schemaProvider.getMetadata( data.getSchema()); }
Mit dem vorliegenden Schema müssen wir es in unserer Nachricht speichern. Das Serialisieren der ID als Teil der Nachricht gibt uns eine kompakte Lösung, da die ganze Magie im Serializer/Deserializer passiert. Es ermöglicht auch eine sehr einfache Integration mit anderen Frameworks und Bibliotheken, die Kafka bereits unterstützen, und lässt den Benutzer seinen eigenen Serializer (wie Spark) verwenden.
Bei diesem Ansatz schreiben wir zuerst die Schemakennung auf die ersten vier Bytes.
private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException { try (DataOutputStream os = new DataOutputStream(stream)) { os.writeInt(id); } }
Dann können wir einen DatumWriter
erstellen und das Objekt serialisieren.
private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException { BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null); DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema); datumWriter.write(data, encoder); encoder.flush(); }
Zusammenfassend haben wir einen generischen Datenserialisierer implementiert.
public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> { private SchemaProvider schemaProvider; @Override public void configure(Map<String, ?> configs, boolean isKey) { schemaProvider = SchemaUtils.getSchemaProvider(configs); } @Override public byte[] serialize(String topic, T data) { try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { VersionedSchema schema = getSchema(data, topic); writeSchemaId(stream, schema.getId()); writeSerializedAvro(stream, data, schema.getSchema()); return stream.toByteArray(); } catch (IOException e) { throw new RuntimeException("Could not serialize data", e); } } private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...} private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...} private VersionedSchema getSchema(T data, String topic) {...} @Override public void close() { try { schemaProvider.close(); } catch (Exception e) { throw new RuntimeException(e); } } }
Generische Daten deserialisieren
Die Deserialisierung kann mit einem einzelnen Schema funktionieren (mit dem die Schemadaten geschrieben wurden), aber Sie können ein anderes Reader-Schema angeben. Das Reader-Schema muss mit dem Schema kompatibel sein, mit dem die Daten serialisiert wurden, muss aber nicht äquivalent sein. Aus diesem Grund haben wir Schemanamen eingeführt. Wir können jetzt angeben, dass wir Daten mit einer bestimmten Version eines Schemas lesen möchten. Zur Initialisierungszeit lesen wir gewünschte Schemaversionen pro Schemaname und speichern Metadaten in readerSchemasByName
für schnellen Zugriff. Jetzt können wir jeden Datensatz lesen, der mit einer kompatiblen Version des Schemas geschrieben wurde, als ob er mit der angegebenen Version geschrieben worden wäre.
@Override public void configure(Map<String, ?> configs, boolean isKey) { this.schemaProvider = SchemaUtils.getSchemaProvider(configs); this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider); }
Wenn ein Datensatz deserialisiert werden muss, lesen wir zuerst die Kennung des Writer-Schemas. Dies ermöglicht das Nachschlagen des Reader-Schemas anhand des Namens. Wenn beide Schemas verfügbar sind, können wir einen GeneralDatumReader
erstellen und lesen Sie die Aufzeichnung.
@Override public GenericData.Record deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { int schemaId = readSchemaId(stream); VersionedSchema writerSchema = schemaProvider.get(schemaId); VersionedSchema readerSchema = readerSchemasByName.get(writerSchema.getName()); GenericData.Record avroRecord = readAvroRecord(stream, writerSchema.getSchema(), readerSchema.getSchema()); return avroRecord; } catch (IOException e) { throw new RuntimeException(e); } } private int readSchemaId(InputStream stream ) throws IOException { try(DataInputStream is = new DataInputStream(stream)) { return is.readInt(); } } private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException { DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); GenericData.Record record = new GenericData.Record(readerSchema); datumReader.read(record, decoder); return record; }
Umgang mit SpecificRecords
Meistens gibt es eine Klasse, die wir für unsere Aufzeichnungen verwenden möchten. Diese Klasse wird dann meist aus einem Avro-Schema generiert. Apache Avro bietet Tools zum Generieren von Java-Code aus Schemas. Ein solches Tool ist das Avro Maven-Plugin. Generierte Klassen haben das Schema, aus dem sie generiert wurden, zur Laufzeit verfügbar. Dadurch wird die Serialisierung und Deserialisierung einfacher und effektiver. Für die Serialisierung können wir die Klasse verwenden, um den zu verwendenden Schemabezeichner herauszufinden.
@Override public void configure(Map<String, ?> configs, boolean isKey) { String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString(); try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) { Class<?> recordClass = Class.forName(className); Schema writerSchema = new SpecificData(recordClass.getClassLoader()).getSchema(recordClass); this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId(); } catch (Exception e) { throw new RuntimeException(e); } }
Wir brauchen also keine Logik, um das Schema aus Thema und Daten zu bestimmen. Wir verwenden das in der Datensatzklasse verfügbare Schema, um Datensätze zu schreiben.
Ebenso kann für die Deserialisierung das Reader-Schema aus der Klasse selbst herausgefunden werden. Die Logik der Deserialisierung wird einfacher, da das Leserschema zur Konfigurationszeit festgelegt wird und nicht anhand des Schemanamens gesucht werden muss.
@Override public T deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { int schemaId = readSchemaId(stream); VersionedSchema writerSchema = schemaProvider.get(schemaId); return readAvroRecord(stream, writerSchema.getSchema(), readerSchema); } catch (IOException e) { throw new RuntimeException(e); } } private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException { DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); return datumReader.read(null, decoder); }
Zusätzliche Lektüre
Weitere Informationen zur Schemakompatibilität finden Sie in der Avro-Spezifikation zur Schemaauflösung.
Weitere Informationen zu kanonischen Formen finden Sie in der Avro-Spezifikation für Parsing Canonical Form for Schemas.
Nächstes Mal…
Teil 2 zeigt eine Implementierung eines Systems zum Speichern der Avro-Schemadefinitionen.