sql >> Database teknologi >  >> NoSQL >> HBase

Robust meddelelsesserialisering i Apache Kafka ved hjælp af Apache Avro, del 1

I Apache Kafka skriver Java-applikationer kaldet producenter strukturerede beskeder til en Kafka-klynge (bestående af mæglere). På samme måde læser Java-applikationer kaldet forbrugere disse beskeder fra den samme klynge. I nogle organisationer er der forskellige grupper, der står for at skrive og administrere producenter og forbrugere. I sådanne tilfælde kan et stort smertepunkt være koordineringen af ​​det aftalte meddelelsesformat mellem producenter og forbrugere.

Dette eksempel viser, hvordan man bruger Apache Avro til at serialisere poster, der er produceret til Apache Kafka, mens det tillader udvikling af skemaer og usynkron opdatering af producent- og forbrugerapplikationer.

Serialisering og deserialisering

En Kafka-post (tidligere kaldet besked) består af en nøgle, en værdi og overskrifter. Kafka er ikke opmærksom på strukturen af ​​data i posters nøgle og værdi. Det håndterer dem som byte-arrays. Men systemer, der læser optegnelser fra Kafka, bekymrer sig om data i disse poster. Så du skal producere data i et læsbart format. Det dataformat, du bruger, skal

  • Vær kompakt
  • Vær hurtig til at indkode og afkode
  • Tillad evolution
  • Tillad upstream-systemer (dem, der skriver til en Kafka-klynge) og downstream-systemer (dem, der læser fra den samme Kafka-klynge) at opgradere til nyere skemaer på forskellige tidspunkter

JSON, for eksempel, er selvforklarende, men er ikke et kompakt dataformat og er langsom at parse. Avro er en hurtig serialiseringsramme, der skaber relativt kompakt output. Men for at læse Avro-poster kræver du det skema, som dataene blev serialiseret med.

En mulighed er at gemme og overføre skemaet med selve posten. Dette er fint i en fil, hvor du gemmer skemaet én gang og bruger det til et stort antal poster. Lagring af skemaet i hver eneste Kafka-post tilføjer imidlertid betydelige overhead med hensyn til lagerplads og netværksudnyttelse. En anden mulighed er at have et aftalt sæt af identifikator-skema-tilknytninger og henvise til skemaer ved deres identifikatorer i posten.

Fra objekt til Kafka Record and Back

Producer-applikationer behøver ikke at konvertere data direkte til byte-arrays. KafkaProducer er en generisk klasse, der skal bruge sin bruger til at specificere nøgle- og værdityper. Derefter accepterer producenter forekomster af ProducerRecord der har samme type parametre. Konvertering fra objektet til byte-array udføres af en Serializer. Kafka leverer nogle primitive serializers:for eksempel IntegerSerializer , ByteArraySerializer , StringSerializer . På forbrugersiden konverterer lignende Deserializers byte-arrays til et objekt, som applikationen kan håndtere.

Så det giver mening at tilslutte sig på Serializer- og Deserializer-niveau og tillade udviklere af producent- og forbrugerapplikationer at bruge den praktiske grænseflade, som Kafka leverer. Selvom de seneste versioner af Kafka tillader ExtendedSerializers og ExtendedDeserializers for at få adgang til overskrifter besluttede vi at inkludere skema-id'en i Kafka-posters nøgle og værdi i stedet for at tilføje postoverskrifter.

Avro Essentials

Avro er en dataserialiseringsramme (og fjernprocedurekald). Den bruger et JSON-dokument kaldet skema til at beskrive datastrukturer. Det meste af Avro-brug sker gennem enten GenericRecord eller underklasser af SpecificRecord. Java-klasser genereret fra Avro-skemaer er underklasser af sidstnævnte, mens førstnævnte kan bruges uden forudgående kendskab til den datastruktur, der arbejdes med.

Når to skemaer opfylder et sæt kompatibilitetsregler, kan data skrevet med det ene skema (kaldet forfatterskemaet) læses, som om det var skrevet med det andet (kaldet læserskemaet). Skemaer har en kanonisk form, der har alle detaljer, der er irrelevante for serialiseringen, såsom kommentarer, fjernet for at hjælpe med at kontrollere ækvivalensen.

VersionedSchema og SchemaProvider

Som nævnt før har vi brug for en en-til-en kortlægning mellem skemaer og deres identifikatorer. Nogle gange er det lettere at henvise til skemaer ved navn. Når et kompatibelt skema er oprettet, kan det betragtes som en næste version af skemaet. Således kan vi henvise til skemaer med et navn, versionspar. Lad os kalde skemaet, dets identifikator, navn og version sammen et VersionedSchema . Dette objekt kan indeholde yderligere metadata, som applikationen kræver.

public class VersionedSchema { private final int id; privat endelig Strengnavn; privat endelig int version; privat endeligt skemaskema; public VersionedSchema(int id, String name, int version, Schema schema) { this.id =id; dette.navn =navn; this.version =version; dette.skema =skema; } public String getName() { return name; } public int getVersion() { return version; } public Schema getSchema() { return skema; } public int getId() { return id; }}

SchemaProvider objekter kan slå forekomsterne af VersionedSchema op .

offentlig grænseflade SchemaProvider udvider AutoCloseable { public VersionedSchema get(int id); public VersionedSchema get(String schemaName, int schemaVersion); public VersionedSchema getMetadata(Schema schema);}

Hvordan denne grænseflade implementeres, er beskrevet i "Implementering af en Schema Store" i et fremtidigt blogindlæg.

Serialisering af generiske data

Når vi serialiserer en post, skal vi først finde ud af, hvilket skema vi skal bruge. Hver post har et getSchema metode. Men at finde ud af identifikatoren fra skemaet kan være tidskrævende. Det er generelt mere effektivt at indstille skemaet på initialiseringstidspunktet. Dette kan gøres direkte ved hjælp af identifikator eller ved navn og version. Ydermere, når vi producerer til flere emner, ønsker vi måske at indstille forskellige skemaer for forskellige emner og finde ud af skemaet fra emnenavnet, der leveres som parameter til metoden serialize(T, String) . Denne logik er udeladt i vores eksempler for korthedens og enkelthedens skyld.

privat VersionedSchema getSchema(T data, String topic) { return schemaProvider.getMetadata( data.getSchema());}

Med skemaet i hånden skal vi gemme det i vores besked. Serialisering af ID'et som en del af beskeden giver os en kompakt løsning, da al magien sker i Serializer/Deserializer. Det muliggør også meget nem integration med andre rammer og biblioteker, der allerede understøtter Kafka og lader brugeren bruge deres egen serializer (såsom Spark).

Ved at bruge denne tilgang skriver vi først skema-id'en på de første fire bytes.

private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException { try (DataOutputStream os =new DataOutputStream(stream)) { os.writeInt(id); }}

Så kan vi oprette en DatumWriter og serialiser objektet.

private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) kaster IOException { BinaryEncoder encoder =EncoderFactory.get().binaryEncoder(stream, null); DatumWriter datumWriter =new GenericDatumWriter<>(skema); datumWriter.write(data, encoder); encoder.flush();}

Ved at sætte alt dette sammen har vi implementeret en generisk dataserializer.

offentlig klasse KafkaAvroSerializer implementerer Serializer { private SchemaProvider schemaProvider; @Override public void configure(Map configs, boolean isKey) { schemaProvider =SchemaUtils.getSchemaProvider(configs); } @Override public byte[] serialize(String topic, T data) { try (ByteArrayOutputStream stream =new ByteArrayOutputStream()) { VersionedSchema schema =getSchema(data, topic); writeSchemaId(stream, schema.getId()); writeSerializedAvro(stream, data, schema.getSchema()); returner stream.toByteArray(); } catch (IOException e) { throw new RuntimeException("Kunne ikke serialisere data", e); } } private void writeSchemaId(ByteArrayOutputStream stream, int id) kaster IOException {...} private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) kaster IOException {...} private VersionedSchema getSchema(T data) {, String schema ...} @Override public void close() { prøv { schemaProvider.close(); } catch (Exception e) { throw new RuntimeException(e); } }}

Deserialisering af generiske data

Deserialisering kan fungere med et enkelt skema (skemadataene blev skrevet med), men du kan angive et andet læseskema. Læserskemaet skal være kompatibelt med det skema, som dataene blev serialiseret med, men behøver ikke at være ækvivalente. Af denne grund introducerede vi skemanavne. Vi kan nu angive, at vi ønsker at læse data med en specifik version af et skema. På initialiseringstidspunktet læser vi ønskede skemaversioner pr. skemanavn og gemmer metadata i readerSchemasByName for hurtig adgang. Nu kan vi læse hver post skrevet med en kompatibel version af skemaet, som om den var skrevet med den specificerede version.

@Overridepublic void configure(Map configs, boolean isKey) { this.schemaProvider =SchemaUtils.getSchemaProvider(configs); this.readerSchemasByName =SchemaUtils.getVersionedSchemas(configs, schemaProvider);}

Når en post skal deserialiseres, læser vi først identifikatoren for forfatterskemaet. Dette gør det muligt at slå læseskemaet op efter navn. Med begge skemaer tilgængelige kan vi oprette en GeneralDatumReader og læs journalen.

@Overridepublic GenericData.Record deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream =new ByteArrayInputStream(data)) { int schemaId =readSchemaId(stream); VersionedSchema writerSchema =schemaProvider.get(schemaId); VersionedSchema readerSchema =readerSchemasByName.get(writerSchema.getName()); GenericData.Record avroRecord =readAvroRecord(stream, writerSchema.getSchema(), readerSchema.getSchema()); returnere avroRecord; } catch (IOException e) { throw new RuntimeException(e); }}private int readSchemaId(InputStream stream) kaster IOException { try(DataInputStream is =new DataInputStream(stream)) { return is.readInt(); }}private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) kaster IOException { DatumReader datumReader =new GenericDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder =DecoderFactory.get().binaryDecoder(stream, null); GenericData.Record record =new GenericData.Record(readerSchema); datumReader.read(record, dekoder); returnere post; 

Håndtering af SpecificRecords

Oftere end ikke er der én klasse, vi ønsker at bruge til vores optegnelser. Denne klasse genereres derefter normalt ud fra et Avro-skema. Apache Avro giver værktøjer til at generere Java-kode fra skemaer. Et sådant værktøj er Avro Maven plugin. Genererede klasser har det skema, de blev genereret fra, tilgængeligt under kørsel. Dette gør serialisering og deserialisering enklere og mere effektiv. Til serialisering kan vi bruge klassen til at finde ud af den skema-id, der skal bruges.

@Overridepublic void configure(Map configs, boolean isKey) { String className =configs.get(isKey ? KEY_RECORD_CLASSNAME :VALUE_RECORD_CLASSNAME).toString(); prøv (SchemaProvider schemaProvider =SchemaUtils.getSchemaProvider(configs)) { Class recordClass =Class.forName(className); Schema writerSchema =new SpecificData(recordClass.getClassLoader()).getSchema(recordClass); this.writerSchemaId =schemaProvider.getMetadata(writerSchema).getId(); } catch (Exception e) { throw new RuntimeException(e); }}

Vi har således ikke brug for logikken til at bestemme skema ud fra emne og data. Vi bruger det tilgængelige skema i recordklassen til at skrive poster.

Tilsvarende kan læseskemaet til deserialisering findes ud fra selve klassen. Deserialiseringslogikken bliver enklere, fordi læseskemaet er fast på konfigurationstidspunktet og ikke behøver at blive slået op efter skemanavnet.

@Overridepublic T deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream =new ByteArrayInputStream(data)) { int schemaId =readSchemaId(stream); VersionedSchema writerSchema =schemaProvider.get(schemaId); return readAvroRecord(stream, writerSchema.getSchema(), readerSchema); } catch (IOException e) { throw new RuntimeException(e); }}privat T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) kaster IOException { DatumReader datumReader =new SpecificDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder =DecoderFactory.get().binaryDecoder(stream, null); return datumReader.read(null, dekoder);}

Yderligere læsning

For mere information om skemakompatibilitet, se Avro-specifikationen for skemaopløsning.

For mere information om kanoniske former, se Avro-specifikationen for parsing af kanoniske formularer til skemaer.

Næste gang...

Del 2 vil vise en implementering af et system til at gemme Avro-skemadefinitionerne.


  1. Hvordan jeg skrev en Chart-Topping-app på en uge med Realm og SwiftUI

  2. Hvorfor bruge Redis i stedet for MongoDb til cachelagring?

  3. MongoDB slapper af flere arrays

  4. Laravel Redis konfiguration