Python avro to json

Dataclasses Avro Schema Generator

To add dataclasses-avroschema functionality to pydantic you only need to replace BaseModel by AvroBaseModel :

   >> , 'has_car': False, 'favorite_colors': , 'country': 'Argentina', 'address': None> >> ', "has_car": false, "favorite_colors": "BLUE", "country": "Argentina", "address": null>'     >> b'\x08bondd\x04\x06dog\x06cat\x00\x02\x06key\x02\x00\x00\x00\x12Argentina\x00' >> UserAdvance(name='bond', age=50, pets=['dog', 'cat'], accounts=, has_car=False, favorite_colors=, country='Argentina', address=None) 

Examples with python streaming drivers (kafka and redis)

Under examples folder you can find 3 differents kafka examples, one with aiokafka ( async ) showing the simplest use case when a AvroModel instance is serialized and sent it thorught kafka, and the event is consumed. The other two examples are sync using the kafka-python driver, where the avro-json serialization and schema evolution ( FULL compatibility) is shown. Also, there are two redis examples using redis streams with walrus and redisgears-py

Читайте также:  Java папка мои документы

Factory and fixtures

Dataclasses Avro Schema also includes a factory feature, so you can generate fast python instances and use them, for example, to test your data streaming pipelines. Instances can be genrated using the fake method.

  >>> Address(street='PxZJILDRgbXyhWrrPWxQ', street_number=2067) >>> User(name='VGSBbOGfSGjkMDnefHIZ', age=8974, addresses=[Address(street='vNpPYgesiHUwwzGcmMiS', street_number=4790)]) 

Features

  • Primitive types: int, long, double, float, boolean, string and null support
  • Complex types: enum, array, map, fixed, unions and records support
  • typing.Annotated supported
  • Logical Types: date, time (millis and micro), datetime (millis and micro), uuid support
  • Schema relations (oneToOne, oneToMany)
  • Recursive Schemas
  • Generate Avro Schemas from faust.Record
  • Instance serialization correspondent to avro schema generated
  • Data deserialization. Return python dict or class instance
  • Generate json from python class instance
  • Case Schemas
  • Generate models from avsc files
  • Examples of integration with kafka drivers: aiokafka, kafka-python
  • Example of integration with redis drivers: walrus and redisgears-py
  • Factory instances
  • Pydantic integration

Development

Poetry is needed to install the dependencies and develope locally

  1. Install dependencies: poetry install
  2. Code linting: ./scripts/format
  3. Run tests: ./scripts/test

For commit messages we use commitizen in order to standardize a way of committing rules

Источник

Python Avro JSON serializer

AvroJsonSerializer serializes data into a JSON format using AVRO schema.

Why do we need serializer instead of just dumping into JSON?

  • validation that your data matches the schema
  • serialization of unions (see SimpleExample below)
  • some Avro JSON deserializers expect fields in JSON in the same order as in the schema
  • serialization of bytes and fixed fields

Binary distribution can be found on pypi.

Simple example:

How to run tests

python-avro-json-serializer$ virtualenv venv python-avro-json-serializer$  venv/bin/activate  pip install tox  tox GLOB sdist-make: /Users/bngo/python-avro-json-serializer/setup.py py27 create: /Users/bngo/python-avro-json-serializer/.tox/py27 py27 installdeps: nose, -rrequirements.txt py27 inst: /Users/bngo/python-avro-json-serializer/.tox/dist/avro_json_serializer-0.4.1.zip py27 installed:  runtests:  runtests: commands  nosetests . ---------------------------------------------------------------------- Ran  tests   create: /Users/bngo/python-avro-json-serializer/.tox/py35 py35 installdeps: nose, -rrequirements.txt py35 inst: /Users/bngo/python-avro-json-serializer/.tox/dist/avro_json_serializer-0.4.1.zip py35 installed: avro-json-serializer runtests:  runtests: commands  nosetests . ---------------------------------------------------------------------- Ran  tests   summary _____________________________________________________________________________________  py27: commands succeeded  py35: commands succeeded  congratulations :Python Avro JSON serializer is licensed under the terms of the Apache License, Version 2.0.

Источник

Работа с форматом AVRO в python — библиотека fastavro

В статье описывается использование формата сериализации AVRO в языке python, дается краткое описание AVRO-схемы с пояснениями наиболее неочевидных моментов, приводятся конкретные примеры кода на python. Намеренно исключены из рассмотрения вопросы эволюции схем (schema evolution), RPC и AVRO-IDL.

Все примеры приводятся с использованием библиотеки fastavro, которую автору пришлось заметно доработать для соответствия спецификации и совместимости с java реализацией.

Читайте также:  Users get vk api python пример

Историческая справка

Почему AVRO, а не json, bson, msgpack, protobuf, ASN.1, thrift, yaml?

При декомпозиции монолитной системы возникла необходимость описать процедуру взаимодействия между микросервисами. Не долго выбирая между RabbitMQ и Kafka остановились на последней. Но вот над следующей задачей — выборе системы сериализации пришлось попотеть.

При выборе системы сериализации учитывались следующие требования:

  1. Поддержка нескольких языков программирования
    Основа нашей кодовой базы — python 2.7. При чем в дальнейшем хотелось бы чувствительные к производительности процессы перевести на другие языки.
  2. Валидация данных при сериализации
    В динамическом интерпретируемом питоне слишком просто случайно отправить «не те» данные. А в выбранной нами pub-sub модели кафки нам было очень важно обеспечить корректность входных данных в топике. Нужна была система позволяющая типизировать топики кафки.
  3. Поддержка типов
    Наша система активно оперирует типами Decimal, UUID и Datetime. Увы — известные форматы сериализации начиная ASN.1 и заканчивая msgpack в основном описывают сериализацию низкоуровневых типов (int\float\string\bytes) и не предлагают законченных решений для интересующих нас.

Исходя из этих соображений выбор пал на AVRO. Но внезапно оказалось (весна 2017) что не смотря на наличие поддержки логических типов в спецификации и JAVA билиотеки — ни в официальной реализации AVRO для python, ни в конкурирующей fastavto их просто проигнорировали. Пришлось добавлять самостоятельно.

Наиболее адекватный (а также самый быстрый) код оказался у fastavro, в результате было принято решение доработать эту библиотеку. Это стало первым моим опытом участия в opensource.

Что такое AVRO

AVRO это система сериализации данных, созданная в рамках проекта Hadoop. Данные сериализуются бинарный формат с помощью предварительно созданной json-схемы при чем для десериализации также требуется схема (возможно — другая).

Также AVRO позволят в рамках одного контейнера упаковать множество записей заданных единственной схемой, что позволяет эффективно передавать большие объемы данных, избегая свойственные другим форматам накладные расходы.

AVRO-схема

Я не стану подробно описывать правила построения схем, т.к. они изложены в официальной документации
Остановлюсь лишь на базовых вещах и совсем уж не очевидных моментах.

AVRO-схема представляет из себя JSON, описывающий сериализуемую\десериализуемую структуру данных. Типы данных могут быть:

  1. Примитивными
    • null
    • boolean
    • int — знаковое целое 32 бита
    • long — знаковое целое 64 бита
    • float
    • double
    • string — unicode строка
    • bytes — последовательность байт
    • fixed — те же байты, но с длинной заданной в схеме
  2. Составными
    • union — тип-сумма
    • recod — тип-произведение
    • enum — перечисление
    • array — массив/список
    • map — ассоциативный массив
  3. Логическими (в терминологии AVRO)
    • decimal — число с фиксированной точкой
    • date — дата
    • time-millis — время с миллисекундной точностью
    • time-micros — время с микросекундной точностью
    • timestamp-millis — дата-время с миллисекундной точностью
    • timestamp-micros — дата-время с микросекундной точностью
    • uuid — universally unique identifier
Читайте также:  Java object native methods

Хотя вышеперечисленные логические типы уже давно являются стандартом в реляционных БД и современных языках программирования — библиотеки сериализации обходили их стороной, вынуждая сводить их к примитивным типам, к счастью в AVRO эта проблема решена.

Схема начинается с объявления типа record с заданными name и namespace. Эти поля в первых строках будут использоваться в системах кодогенерации, не актуальных для питона, т.к. у нас схема будет обрабатываться динамически. Далее идет перечисление типов полей нашей записи.

Особый интерес представляет объявление поля datetime, т.к. оно содержит логический тип. Важно помнить, что логические типы следует задавать вложенными в описания типа поля.

Далее идет поле source объявленное как union «type»: [«null», «string»] , эта запись означает что значение source может быть одного из двух типов null или string . Комбинировать таким образом можно не только примитивные типы, но и составные и логические. Примеры таких комбинаций, а также более сложных схем можно посмотреть тут
Ещё один неочевидный момент связан с default : значение по умолчанию должно быть задано для первого типа в перечислении.

Отдельного упоминания заслуживают логические типы Decimal (число с фиксированной точкой) и UUID.

Decimal требует дополнительных параметров — числа знаков в числе и числа знаков после запятой:

А UUID интересен тем, что в спецификации его нет, а реализация его есть. При чём довольно странно сделанная — UUID кодируется строкой.

Пришлось добавить в fastavro и такую реализацию.

Примеры работы с fastavro

Как прочитать из контейнера

import fastavro as avro with open('some-file.avro', 'rb') as fo: # можно подменить схему контейнера с помощью reader_schema reader = fastavro.reader(fo, reader_schema=None) schema = reader.schema for record in reader: process_record(record)

Как записать в контейнер

from fastavro import writer schema = < 'doc': 'A weather reading.', 'name': 'Weather', 'namespace': 'test', 'type': 'record', 'fields': [ , , , ], > records = [ , , , , ] with open('weather.avro', 'wb') as out: writer(out, schema, records)

Как сериализовать и десериализовать данные вне контейнера

Используется при передачи данных как сообщений.

from io import BytesIO import fastavro def serialize(schema, data): bytes_writer = BytesIO() fastavro.schemaless_writer(bytes_writer, schema, data) return bytes_writer.getvalue() def deserialize(schema, binary): bytes_writer = BytesIO() bytes_writer.write(binary) bytes_writer.seek(0) data = fastavro.schemaless_reader(bytes_writer, schema) return data

Как подавить чтение логических типов

import fastavro fastavro._reader.LOGICAL_READERS['long-timestamp-millis'] = lambda d, w, r: d

Теперь логический тип timestamp-millis будет читаться не в Datetime питона, а в long.

Как заранее прочитать схему

В fastavro предусмотрена функция acquaint_schema, которая считывает схему во внутренний репозиторий (бывают ещё и внешение, но это отдельная история).

и загрузив её с помощью acquaint_schema можно в дальнейшем использовать краткое описание типов :

Обратите внимание — имя типа при обращении включает его namespace types.decimal_18_6

Также это необходимо в отдельных не тривиальных случаях

Источник

Оцените статью