Close

03.07.2017

Распределенное хранилище, проектирование, наш опыт и результат

Хочу описать тут историю одного проектирования. Без деталей, только верхний уровень. и результаты

Как мы проектировали свою СУБД, как мы хотели сделать транзакции , масштабируемость, надежность и скорость…

Дейтвующие лица : Павлов М (я), Шилов А (крутой умный кодер), CONSISTENCY (упрямый «некто», он выносил нам мозг и продолжает это делать)

Этап первый «API почти хорош.»

Наша система на тот момент была просто API прослойкой к касандре, в ней  не было еще транзакций в обычном понимании этого слова.

Самое главное , что мы хотели, это «реализовать DT» — Distributed Tree (распределенное дерево) поверх KV (Key-Value). Хотя транзакций не было, но согласованное изменение нескольких ключей KV сразу было.

там по сути были транзакции, но очень ограниченные (порядок ключей был важен и был задан неявно в виде отношений узлов в дереве).

Дерево в фоне масштабировалось и хранилось полностью распределенно и равномерно в кластере. Это позволяло организовывать ключи в сортированные цепочки и иметь по ним навигацию и seek операцию.

API должен был реализовывать одну идею «идентификатор транзакции можно передавать между запросами, и отдельным запросом камитить или откатывать, сама же транзакция и её оперативные данные должны распологаться на серверах».

Ссылка на API    http://acapella.ru/nosql/astorage_api.html

Этап второй «Переписывам, чтобы была скорость и уходим в сторону реплик»

Касандра для хранения ключей KV не нравилась — было ощущение, что можно работать быстрее, скорость python не нравилась, транспорт tcp нам казался хуже Aeron, казалось, что мы сможем сильно все улучшить взявшись за все это сами.

Я захотел сформулировать критерии , которые хотел бы видеть в идеале в продукте и этому посвещен пост в этом блоге TODO ссылка. в числе главных :

  • для каждого ключа KV  указываем независимо от других ключей N — чсло реплик , числа W, R — требования для кворума операции
  • два уровня API — «KV availability API»  (get, set, cas) с указанаием числа реплик и кворума и «транзакционный API». Нужно, чтобы DT операции и KV операции могли размещатья в одной транзакции и применяться / откатываться вместе
  • высокая скорость работы
  • высокая надежность

Именно эти требования в последствии будут вести разработку «самой лучшей» БД в мире далее.

не смотря на то, что мы постоянно с Сашей обсуждаем   всякие «нерешимые проблемы», мы все ближе к идеалу в продукте.

Очень часто я убеждаюсь, что лучше чем у нас  нигде не сделано, и что мы идем правильным путем.

В итоге мы приняли ряд решений и было достигнуто :

  • использовали Aeron как транспорт , перешли на JVM и Kotlin language
  • Саша изобрел «расширенный варинат» retrofit и сервер к нему, чтобы выйти за рамки retrofit создал проект «ACONITE» — это клиент и сервер. Возможностей больше, чем в оригинальном Retrofit
  • Попробовали AJRPC в бою, и разные сериализаторы.

Работает :

  • автоматическое непрерывное «распределение реплик по кластеру», точнее:
    • перераспределение данных — при изменении конфигурации кластера
    • по запросу пользователя к ключу — запись/починка реплик
  • надежно работает и availability api и transactional api
  • работает DT так, как задумано, но скрость для большинства  применений не устраивает
  • транзакции применятся единомоментно
  • транзакция может быт очень большой / гигантской, и в ней можно по середине выполнять любые вычисиления
  • в транзакции возможно консистентно менять данные, чтение  чаще всего тоже консистентно (а для примененных транзакций — 100% консистентно)
  • существуют способы организовать консистентное чтение (но это не точно)
  • наши транзакции не имеют точки отказа совсем, даже не существует ни одного  момента времени когда отказ может что-то нарушить в ней (для любой распределенной системы, это очень ценно)

Этап «размышления на счет способа организации транзакций», почему мы все еще не довольны

Наша система нам кажется медленной, хотя и надежной. (Все что мы смотрели менее надежно по способу организации и транзакций и вообще работы с данными)

Мы развернули три машины/ноды и загрузили кластер тестами (линеаризация операций, тесты на ошибки  траназкций, тесты произвдительности), и вот что мы пока получили :

KV

  • GET 10_000 op/sec, 450 op/sec/client — без транзакций (3 реплики, 2 — кворум)
  • CAS, SET 7_000 op/sec, 340 op/sec/client — без транзакций (3 реплики, 2 — кворум)
  • Скорость KV с транзакциями (swap two keys): 750 tx/sec, 70 tx/sec/client —   малые транзакции
DT
  • 30-60   insert/sec/tree/client   , R,W вставленной записи — как в KV (3 реплики, 2  — кворум)
  • для одного DT Tree : все клиенты сталкиваются и rps не превышает 100 rps для этого tree    (3 реплики, 2  — кворум)

Для многих клиентов при загруженном кластере , когда каждый клиент работает с одним своим деревом, то
Для каждого клиента (дерева) получается 20 — 30 rps

Сетка в кластере 100 МБит/сек  не тюнена. UDP

Короче : DT транзакционный — медленный (не транзакционный он у нас не бывает, точнее : можно послать запрос без указания транзакции, он будет выполнен в новой транзакции, созданной этим запрсом, как auto-commit).

Конечно существуют применения для не быстрого DT, например выделять по DT дереву на всякие цели одного пользователя, чтобы хранить какие — то списки сущностей — пользователь редко меняет эти списки и 30 rps хватает для этого… но в целом, это ограничивает.

Зато DT — может хранить громадные последовательности из ключей , которые не влезают на одну и даже несколько машин и работает с ними транзакционно. При этом скорость чтения из DT быстрая, 30_000 op/sec — это подходит для индексирования медленно меняющегося контента и поиска понему, например «Wikipedia index».

Большой объём транзакций тоже не обходится бесплатно :

На время применения транзакции, когда код пользователя  начал слать CAS’ы, эти ключи начали блокироваться, в том числе и прочитанные ранее, которые нужно проверить на соответствие версий.

Это намного быстрее, чем сначала блокировать чтения, а потом писать в конце… Короче, можно делать оптимистичные блокировки, а можно пессимистичные.

Т.е. если транзакция большая, то даже если там оптимистические блокировки, то многое может заблокироваться на существенное время её применения.

Скорость KV наш и тестирование скорости Cassandra

Наш KМ, кластер три ноды

Конфигурация кластера

Кластер состоит из трёх узлов 1.yyyy.ru — 3.zzzz.ru.

Конфигурация одного узла:

  • 1 х Intel Core 2 Quad Q8400
  • 1 х Ethernet controller: Intel Corporation 82541GI Gigabit Ethernet Controller
  • 4 Гб DDR-2
  • 2 x HDD Seagate 250 Gb каждый — программный RAID-1.

Между собой соединены с помощью «свитча» Cisco SF200-48P 48-Port 10 100 PoE Smart Switch.

  1. не транзакционный GET одного  ключа.
    1. один клиент (latency^-1) — 450 op/sec
    2. много параллельных клиентов (throughput) — 10_000 op/sec
  2. не транзакционный SET/CAS
    1. один клиент (latency^-1) — 340 op/sec
    2. много параллельных клиентов (throughput) — 7_000 op/sec
  3. транзакционная подмена значений (TX SWAP)
    1. один клиент (latency^-1) — 70 op/sec
    2. много параллельных клиентов (throughput) — 750 op/sec

Тестирование скорости Cassandra три те же машины,  сетка 100 МБит/с

Таблица для тестов создана следующими командами:

CREATE KEYSPACE test WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’: 3};

CREATE TABLE test.kv (key bigint PRIMARY KEY, value bigint, version bigint, tx bigint);

UPDATE

Нетранзакционное обновление строки без условий.

Команда:

UPDATE kv SET value = ? WHERE key = ?;

Последовательные запросы одного клиента (latency): 4464 op/sec

Пропускная способность (throughput): 23331 op/sec

CAS

Compare And Swap или в терминах кассандры Lightweight Transaction. Contention практически отсутствует, так как ключей намного больше, чем клиентов.

Команда:

UPDATE kv SET value = ?, version = ? WHERE key = ? IF version = ?;

Последовательные запросы одного клиента (latency): 427 op/sec

Пропускная способность (throughput): 2957 op/sec

BATCH

Атомарное обновление двух ключей без условий (батчи кассандры не позволяют добавлять условия, если обновляется несколько партишен-ключей).

BEGIN BATCH

   UPDATE kv SET value = ? WHERE key = <key1>;

   UPDATE kv SET value = ? WHERE key = <key2>;

APPLY BATCH

Последовательные запросы одного клиента (latency): 1046 op/sec

Пропускная способность (throughput): 9833 op/sec

TRANSACTION

ACID обновление двух ключей с условиями. Это надстройка транзакций поверх CAS’ов кассандры.

Эквивалент команды для кассандры:

BEGIN BATCH

   UPDATE kv SET value = ?, version = ? WHERE key = <key1> IF version = ?;

   UPDATE kv SET value = ?, version = ? WHERE key = <key2> IF version = ?;

APPLY BATCH

Реально исполняемые команды:

# prepare

UPDATE kv SET tx = ? WHERE key = <key1> IF version = ? AND tx = null;

UPDATE kv SET tx = ? WHERE key = <key2> IF version = ? AND tx = null;

# commit

UPDATE kv SET value = ?, version = ?, tx = null WHERE key = <key1>;

UPDATE kv SET value = ?, version = ?, tx = null WHERE key = <key2>;

Последовательные запросы одного клиента (latency): 79 op/sec

Пропускная способность (throughput): 603 op/sec

График зависимости количества операций в секунду от количества параллельных

Warp транзакции

У нас работа с репликами сделана через PAXOS, а транзакция — двухфазный камит с временными блокировками, (в момент применения транзакции — оптимистические). есть команды явной блокировки в момент создания транзакции, работы со значениями (пессимистические)

  • «В середине» нашей транзакции можно обрабатывать значения и менять по ходу вычислений.
  • Warp -другой  механизм реализации транзакций. link arxiv     pdf link

Warp транзакция — сформированный заранее «скрипт изменений», который пересылается между серверами и поэтапно применяется. Состояние транзакции нигде не храниться — оно вычислямое. Чтобы его вычислить надо обойти сервера, учавствующие в ней.

  • Наша транзакция — может работать с любым объёмом данных.
  • Warp не может , т.к. «пересылает транзакцию» — имеет дело с небольшими наборами изменений.

Если умозрительно реализовать WARP, то скосроти были бы такими :

  • KV — 560 tx/sec — наш KV, транзакционный,  который  сейчас есть
  • WARP over KV — 1600 tx/sec  — WARP -поверх нашего KV
  • WARP native — 5000 — 8000 tx/sec — реализация WARP поверх уровня реплик (логику CAS не ме же реализуем, и сразу логику кворума и в целом конфликта транзакций)
  • WARP single replica — 25000 — 30000 tx/sec — тоже, но с одной репликой

Причем, «WARP native»  реализовать проще, чем реализовывать «WARP over KV».

PS /

WARP только с виду прост, но если добавить таймауты в алгоритм (имитация сетевых проблем) — он «ломается». Реализовать его правильно — сложная задача, а Pdf не описывает массу вопросов.

У нас, в acapella CPVM , вариант без обработки данных в середине  не походит.

Также не нравиться и ограниченность объёма обрабатываемых данных , применительно к создаваемому нами PaaS.

 

Новые тренды — немного обсудим потоковую обработку

В проектах Acapella и RedForester  мы имеем такое взаимодействие с data \ io  миром :

  • есть «ячейка», с путем, ид… не важно.
  • она изменяемая,
  • есть транзакционный способ работать с ней и с миллионами таких как она

Это обычный подход. Все привыкли к нему со студенческих времен (RDBMS),  orm busines logic wrapper  итд.

Но Консистентность и скорость — вещи очень сложно стыкуемые.

По сути есть только такие варианты :

  • меньше реплик — меньше надежность — выше скорость  (но тут есть предел одна реплика, а для трёх коэффициент не существенный 9 раз всего, но уже 3-х реплик хватает для 90% случаев, т.е. range не широкий этой оптимизации, а риск недоступности данных велик)
  • меньше серверов, но более мощных, т.е. с большим объёмом хранения — реже наступает увеличивать объём хранения , меньше сетевых соединений — выше скорость — все яица в одну корзину , репликация не даст консистентности т.к. в консистентном режиме не успеет за основной интенсивностью работы с данными
  • проектировать системы так, чтобы они не требовали консистентности. (там не нужно использовать транзакции вообще, но приходиться думать головой, а это всегда риски для ИТ рынка в целом)

А вот последний вариант не ожиданный и интересный, и именно он сейчас в больших конторах обсуждается как основной. Смотрите что происходит:

Этого парня, выступающего, зовут Мартин https://www.infoq.com/presentations/event-streams-kafka

И он достаточно известный специалист в мире проектировщиков хранилищ. Собаку съел на консистентности, но говорит здесь о её отсутствии.

%d0%b2%d1%8b%d0%b4%d0%b5%d0%bb%d0%b5%d0%bd%d0%b8%d0%b5_039

Он решает в этом дакладе другую проблему «сохранность системы в синхронизированном виде». https://www.infoq.com/presentations/event-streams-kafka

Конечно круто, если есть одна большая БД и в ней все : и обработка данных, и аналитика, и поиск, и все другие нагрузки.  Она справляется, эта БД и с нагрузкой и со сложностью  запросов… Но реально оказывается, особенно в мире систем микросервисов, что есть масса разных локальных и специальных БД, и надо чтобы они были «синхронизированы» — т.е. рассогласованы , но непрерывно и быстро  согласующиеся.

Если  сгласование это присходит очень быстро , то часто это приемлимо. Речь тут о Eventual consistency и только.

Акапелла в таком режиме может работать?  и есть ли в этом смысл? 

Если рассматривать такую архитектуру, то что мы должны предложить ?

  • acapella vm читает данные из ячеек как обычно, это не консистентное чтение
  • вычисляет результат, но пользователю легко  управлять масштабированием
  • но далее acapella формирует  Events которые предназначены для дальнейшей обработки.
    • Events либо складываются в очередь отдельно — никакие транзакции не возможны — скорость будет на высоте, но никакой консистентности — только Eventualy
    • events складываются в одну большую запись в Prepare канал. — скорость будет мала
      • чтобы повысить скорость, обычно очередь разбивают на партишены, но естественно теряешь последовательность, поэтому думать надо ещё больше «какие запросы связанны друг с другом» (их надо последовательно применять), а какие нет (можно параллелить).

Эта схема «полную консистентность» и уверенность в корректности не может дать, даже в перспективе никогда.

Мысль : для такой архитектуры схему данных надо проектировать и при этом думать. Также придётся  планировать IO.  Если это придеться делать, то зачем тогда acapella  VM упрощает управление потоком исполнения ? Все равно программист уже сам думает и планирует — ему не сложно придумать сразу и IO и схему обработки данных и поток исполнения.

Пока вывод один : надо продолжать работу над «универсальной БД».  типичный MySQL еще долго будет иметь нишу. PG тоже. А масштабируемость остро нужна именно тем, кто использует подобные подходы, тем кто хочет согласованные всегда реплики и надежность.

Есть масса проектов где  возможностей RDBMS не хватает, а наш вариант и масштабируется линейно и согласованность данных сохраняет и надёжность — выше.

Идеи как увеличить скорость

Сразу скажу : эта идея не вышла эта, но опишу её.

Основной прирост скорости нам дал переход с Python на JVM , а не использование Aeron.  Даже хуже : Aeron повысил требования к CPU серверов, а железа на котором он даёт наибольший прирост у нас пока нет. (будет ?)

Мы сравнили скорость работы нашего KV и варианта, быстро реализованного поверх Cassandra :

Конфигурация одного узла:

  • 1 х Intel Core 2 Quad Q8400
  • 1 х Ethernet controller: Intel Corporation 82541GI Gigabit Ethernet Controller
  • 4 Гб DDR-2
  • 2 x HDD Seagate 250 Gb каждый — программный RAID-1.
  • Между собой соединены с помощью «свитча» Cisco SF200-48P 48-Port 10 100 PoE Smart Switch.

Приведу таблицу сравнения производительности нашего текущего варианта, и нового варианта реализванного поверх C*  для трех типов действий

  • GET
  • CAS  не транзакционный
  • TR swap two keys

Результаты : 

%d0%b2%d1%8b%d0%b4%d0%b5%d0%bb%d0%b5%d0%bd%d0%b8%d0%b5_048

Здесь сравниваеться наше текущее решение и то, что мы получили бы если бы реализовали слой работы с данными на cassandra.

%d0%b2%d1%8b%d0%b4%d0%b5%d0%bb%d0%b5%d0%bd%d0%b8%d0%b5_045

Это результаты тестирования Cassandra отдельно (правая часть). Видно, что в «транзакционном режиме»  и у Cassandra скорость будет как у черепахи.  Тут просто ничего не поделать — у  нее тоже нет возможностей для транзакций, добавив к ней наш слой Tx мы опять теряем скорость.

 

%d0%b2%d1%8b%d0%b4%d0%b5%d0%bb%d0%b5%d0%bd%d0%b8%d0%b5_046

Это скорости, которые мы могли бы получить если бы делали WARP транзкции, но они нам не нужны. Возможно лучше поняв рынок, мы возьмемся за них.

 

%d0%b2%d1%8b%d0%b4%d0%b5%d0%bb%d0%b5%d0%bd%d0%b8%d0%b5_047

Характеристики полученного нами решения

Свойства  реализованного нами KV

  • большие транзакции с вычислениями по середине
  • доступ к транзакции по нескольким  запросам
  • client heartbeat (keep-alive method)
  • replication ,durability , availability
  • отсутствие точки отказа,  мастера, равенство узлов,
  • отсутствие момента  времени с возможностью отказа
  • восстановление после сбоя
  • возможность комплексных запросов
  • expire — lazy
  • multi — cluster replication
  • map — reduce possibility
  • server side code,  scripts , triggers,

Смысл в том, что если перейти на Cassandra мы не выиграем в скорости, но и не проиграем.

Выиграть можно только в надежности и снижении требований к серверам, упрощение кода (меньше пересылок даных между сервисами).

Мы теряем :

  • Мы теряем возможность работы с индивидуальным NRW,  но эта «фича» оказалась редко востребованной, кстати.
  • огромные транзакции становяться невозможны. Повтор транзакции, это по сути повтор отправки batch запроса. а не повтор только лишь одной команды Commit. Этот batch не может быть очень большим.

Мы приобретаем :

  • API сохраняется , наша система управления транзакцией остаётся
  • Наш код станет чище и проще , сервера наших KV нод станут «stateless» (не будут даже друг с другом общаться)
  • Работа с репликами уходит на уровень Кассандры, наш код станет проще
  • Надежное хранилище. Мы в своём хранилище, конечно, уверены, но в случае с Cassandra других людей убеждать даже не придёться
  • возможность стыковать с С* , например , MapReduce и обрабатывать массово данные
  • С* умеет работать в нескольких дата — центрах, Aeron мы не проверяли, но вряд-ли он сможет

не в плюс и не в минус

  • DT под касаандру  может работать быстрее, как KV, но объём дерева должен помещаться на одну машину. Если это будет  другой вид дерева , то мы не потеряем гигантские деревья
  • у нашего KV сейчас скорость на 15-20% выше, она немного упадёт, но зато есть шанс увеличить скорость DT

Еще идея как увеличить скорость

Однозначно нужен тюнинг сетки и вообще «хорошая сетка».

  1. Число сетевых  операций требующейся для надежности ПРИНЦИПИАЛЬНО не изменить. Путь один — сделать их быстрыми как только это возможно, возможно, нужно аппаратное решение. 
  2. Выделить partition key для всего набора даных, в рамкох которого транзакции не будут пересекаться — чисто прикладная штука. Её не задать на уровне PaaS, зато скорость увеличиьтся в 5 — 10 раз (Cassandra Однокласники)
    1. малые транзакции (до 500_000 ключей)
    2. точка отказа
      1. её можно избежать если сделать как в однокласниках — выбор Лидера, (в таблице выше есть этот вариант)
    3. не пройдет «наш вариант длинных транзакций» , вся транзакция уходит в C* одним большим батчем.

 

Ссылка на Однокласников 

Олег Анастасьев — За гранью NoSQL: NewSQL на Cassandra
Олег Анастасьев — За гранью NoSQL: NewSQL на Cassandra Java-конференция Joker 2014 Санкт-Петербург, 20 — 21 октября 2014 До недавнего времени в Одноклассника…

 

Итог

Вот такой свод информации 🙂 Это то, что  мы имеем к текущему моменту:

Просто отличную быструю транзакционную KV систему, и переход на Cassandra её не сможет ускорить.  Но еще мы имеем много планов по развитию и головняков с её тестированием 🙂 

И Главное — хотим обратной связи от программистов / потребителей. Вопросы : 

Нужны советы, на что нам стоит сделать упор ?

Что вам не нравиться сейчас в используемых вами хранилищах данных ?

Нужна ли вам транзакциолнность ? нравиться ли она ?

Нужны ли гигантские  транзакции ?  Да еще и без точек отказа и момента времени в который такая точка может существовать, на сколько это важно ?

 

 

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *