Распределенное хранилище, проектирование, наш опыт и результат
Хочу описать тут историю одного проектирования. Без деталей, только верхний уровень. и результаты
Как мы проектировали свою СУБД, как мы хотели сделать транзакции , масштабируемость, надежность и скорость…
Дейтвующие лица : Павлов М (я), Шилов А (крутой умный кодер), CONSISTENCY (упрямый «некто», он выносил нам мозг и продолжает это делать)
Этап первый «API почти хорош.»
Наша система на тот момент была просто API прослойкой к касандре, в ней не было еще транзакций в обычном понимании этого слова.
Самое главное , что мы хотели, это «реализовать DT» — Distributed Tree (распределенное дерево) поверх KV (Key-Value). Хотя транзакций не было, но согласованное изменение нескольких ключей KV сразу было.
там по сути были транзакции, но очень ограниченные (порядок ключей был важен и был задан неявно в виде отношений узлов в дереве).
Дерево в фоне масштабировалось и хранилось полностью распределенно и равномерно в кластере. Это позволяло организовывать ключи в сортированные цепочки и иметь по ним навигацию и seek операцию.
API должен был реализовывать одну идею «идентификатор транзакции можно передавать между запросами, и отдельным запросом камитить или откатывать, сама же транзакция и её оперативные данные должны распологаться на серверах».
Ссылка на API http://acapella.ru/nosql/astorage_api.html
Этап второй «Переписывам, чтобы была скорость и уходим в сторону реплик»
Касандра для хранения ключей KV не нравилась — было ощущение, что можно работать быстрее, скорость python не нравилась, транспорт tcp нам казался хуже Aeron, казалось, что мы сможем сильно все улучшить взявшись за все это сами.
Я захотел сформулировать критерии , которые хотел бы видеть в идеале в продукте и этому посвещен пост в этом блоге TODO ссылка. в числе главных :
- для каждого ключа KV указываем независимо от других ключей N — чсло реплик , числа W, R — требования для кворума операции
- два уровня API — «KV availability API» (get, set, cas) с указанаием числа реплик и кворума и «транзакционный API». Нужно, чтобы DT операции и KV операции могли размещатья в одной транзакции и применяться / откатываться вместе
- высокая скорость работы
- высокая надежность
Именно эти требования в последствии будут вести разработку «самой лучшей» БД в мире далее.
не смотря на то, что мы постоянно с Сашей обсуждаем всякие «нерешимые проблемы», мы все ближе к идеалу в продукте.
Очень часто я убеждаюсь, что лучше чем у нас нигде не сделано, и что мы идем правильным путем.
В итоге мы приняли ряд решений и было достигнуто :
- использовали Aeron как транспорт , перешли на JVM и Kotlin language
- Саша изобрел «расширенный варинат» retrofit и сервер к нему, чтобы выйти за рамки retrofit создал проект «ACONITE» — это клиент и сервер. Возможностей больше, чем в оригинальном Retrofit
- Попробовали AJRPC в бою, и разные сериализаторы.
Работает :
- автоматическое непрерывное «распределение реплик по кластеру», точнее:
- перераспределение данных — при изменении конфигурации кластера
- по запросу пользователя к ключу — запись/починка реплик
- надежно работает и availability api и transactional api
- работает DT так, как задумано, но скрость для большинства применений не устраивает
- транзакции применятся единомоментно
- транзакция может быт очень большой / гигантской, и в ней можно по середине выполнять любые вычисиления
- в транзакции возможно консистентно менять данные, чтение чаще всего тоже консистентно (а для примененных транзакций — 100% консистентно)
- существуют способы организовать консистентное чтение (но это не точно)
- наши транзакции не имеют точки отказа совсем, даже не существует ни одного момента времени когда отказ может что-то нарушить в ней (для любой распределенной системы, это очень ценно)
Этап «размышления на счет способа организации транзакций», почему мы все еще не довольны
Наша система нам кажется медленной, хотя и надежной. (Все что мы смотрели менее надежно по способу организации и транзакций и вообще работы с данными)
Мы развернули три машины/ноды и загрузили кластер тестами (линеаризация операций, тесты на ошибки траназкций, тесты произвдительности), и вот что мы пока получили :
KV
- GET 10_000 op/sec, 450 op/sec/client — без транзакций (3 реплики, 2 — кворум)
- CAS, SET 7_000 op/sec, 340 op/sec/client — без транзакций (3 реплики, 2 — кворум)
- Скорость KV с транзакциями (swap two keys): 750 tx/sec, 70 tx/sec/client — малые транзакции
- 30-60 insert/sec/tree/client , R,W вставленной записи — как в KV (3 реплики, 2 — кворум)
- для одного DT Tree : все клиенты сталкиваются и rps не превышает 100 rps для этого tree (3 реплики, 2 — кворум)
Для многих клиентов при загруженном кластере , когда каждый клиент работает с одним своим деревом, то
Для каждого клиента (дерева) получается 20 — 30 rps
Короче : DT транзакционный — медленный (не транзакционный он у нас не бывает, точнее : можно послать запрос без указания транзакции, он будет выполнен в новой транзакции, созданной этим запрсом, как auto-commit).
Конечно существуют применения для не быстрого DT, например выделять по DT дереву на всякие цели одного пользователя, чтобы хранить какие — то списки сущностей — пользователь редко меняет эти списки и 30 rps хватает для этого… но в целом, это ограничивает.
Зато DT — может хранить громадные последовательности из ключей , которые не влезают на одну и даже несколько машин и работает с ними транзакционно. При этом скорость чтения из DT быстрая, 30_000 op/sec — это подходит для индексирования медленно меняющегося контента и поиска понему, например «Wikipedia index».
Большой объём транзакций тоже не обходится бесплатно :
На время применения транзакции, когда код пользователя начал слать CAS’ы, эти ключи начали блокироваться, в том числе и прочитанные ранее, которые нужно проверить на соответствие версий.
Это намного быстрее, чем сначала блокировать чтения, а потом писать в конце… Короче, можно делать оптимистичные блокировки, а можно пессимистичные.
Т.е. если транзакция большая, то даже если там оптимистические блокировки, то многое может заблокироваться на существенное время её применения.
Скорость KV наш и тестирование скорости Cassandra
Наш KМ, кластер три ноды
Конфигурация кластера
Кластер состоит из трёх узлов 1.yyyy.ru — 3.zzzz.ru.
Конфигурация одного узла:
- 1 х Intel Core 2 Quad Q8400
- 1 х Ethernet controller: Intel Corporation 82541GI Gigabit Ethernet Controller
- 4 Гб DDR-2
- 2 x HDD Seagate 250 Gb каждый — программный RAID-1.
Между собой соединены с помощью «свитча» Cisco SF200-48P 48-Port 10 100 PoE Smart Switch.
- не транзакционный GET одного ключа.
- один клиент (latency^-1) — 450 op/sec
- много параллельных клиентов (throughput) — 10_000 op/sec
- не транзакционный SET/CAS
- один клиент (latency^-1) — 340 op/sec
- много параллельных клиентов (throughput) — 7_000 op/sec
- транзакционная подмена значений (TX SWAP)
- один клиент (latency^-1) — 70 op/sec
- много параллельных клиентов (throughput) — 750 op/sec
Тестирование скорости Cassandra три те же машины, сетка 100 МБит/с
Таблица для тестов создана следующими командами:
CREATE KEYSPACE test WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’: 3};
CREATE TABLE test.kv (key bigint PRIMARY KEY, value bigint, version bigint, tx bigint); |
UPDATE
Нетранзакционное обновление строки без условий.
Команда:
UPDATE kv SET value = ? WHERE key = ?; |
Последовательные запросы одного клиента (latency): 4464 op/sec
Пропускная способность (throughput): 23331 op/sec
CAS
Compare And Swap или в терминах кассандры Lightweight Transaction. Contention практически отсутствует, так как ключей намного больше, чем клиентов.
Команда:
UPDATE kv SET value = ?, version = ? WHERE key = ? IF version = ?; |
Последовательные запросы одного клиента (latency): 427 op/sec
Пропускная способность (throughput): 2957 op/sec
BATCH
Атомарное обновление двух ключей без условий (батчи кассандры не позволяют добавлять условия, если обновляется несколько партишен-ключей).
BEGIN BATCH
UPDATE kv SET value = ? WHERE key = <key—1>; UPDATE kv SET value = ? WHERE key = <key—2>; APPLY BATCH |
Последовательные запросы одного клиента (latency): 1046 op/sec
Пропускная способность (throughput): 9833 op/sec
TRANSACTION
ACID обновление двух ключей с условиями. Это надстройка транзакций поверх CAS’ов кассандры.
Эквивалент команды для кассандры:
BEGIN BATCH
UPDATE kv SET value = ?, version = ? WHERE key = <key—1> IF version = ?; UPDATE kv SET value = ?, version = ? WHERE key = <key—2> IF version = ?; APPLY BATCH |
Реально исполняемые команды:
# prepare
UPDATE kv SET tx = ? WHERE key = <key—1> IF version = ? AND tx = null; UPDATE kv SET tx = ? WHERE key = <key—2> IF version = ? AND tx = null; # commit UPDATE kv SET value = ?, version = ?, tx = null WHERE key = <key—1>; UPDATE kv SET value = ?, version = ?, tx = null WHERE key = <key—2>; |
Последовательные запросы одного клиента (latency): 79 op/sec
Пропускная способность (throughput): 603 op/sec
График зависимости количества операций в секунду от количества параллельных
Warp транзакции
У нас работа с репликами сделана через PAXOS, а транзакция — двухфазный камит с временными блокировками, (в момент применения транзакции — оптимистические). есть команды явной блокировки в момент создания транзакции, работы со значениями (пессимистические)
- «В середине» нашей транзакции можно обрабатывать значения и менять по ходу вычислений.
- Warp -другой механизм реализации транзакций. link arxiv pdf link
Warp транзакция — сформированный заранее «скрипт изменений», который пересылается между серверами и поэтапно применяется. Состояние транзакции нигде не храниться — оно вычислямое. Чтобы его вычислить надо обойти сервера, учавствующие в ней.
- Наша транзакция — может работать с любым объёмом данных.
- Warp не может , т.к. «пересылает транзакцию» — имеет дело с небольшими наборами изменений.
Если умозрительно реализовать WARP, то скосроти были бы такими :
- KV — 560 tx/sec — наш KV, транзакционный, который сейчас есть
- WARP over KV — 1600 tx/sec — WARP -поверх нашего KV
- WARP native — 5000 — 8000 tx/sec — реализация WARP поверх уровня реплик (логику CAS не ме же реализуем, и сразу логику кворума и в целом конфликта транзакций)
- WARP single replica — 25000 — 30000 tx/sec — тоже, но с одной репликой
Причем, «WARP native» реализовать проще, чем реализовывать «WARP over KV».
PS /
WARP только с виду прост, но если добавить таймауты в алгоритм (имитация сетевых проблем) — он «ломается». Реализовать его правильно — сложная задача, а Pdf не описывает массу вопросов.
У нас, в acapella CPVM , вариант без обработки данных в середине не походит.
Также не нравиться и ограниченность объёма обрабатываемых данных , применительно к создаваемому нами PaaS.
Новые тренды — немного обсудим потоковую обработку
В проектах Acapella и RedForester мы имеем такое взаимодействие с data \ io миром :
- есть «ячейка», с путем, ид… не важно.
- она изменяемая,
- есть транзакционный способ работать с ней и с миллионами таких как она
Это обычный подход. Все привыкли к нему со студенческих времен (RDBMS), orm busines logic wrapper итд.
Но Консистентность и скорость — вещи очень сложно стыкуемые.
По сути есть только такие варианты :
- меньше реплик — меньше надежность — выше скорость (но тут есть предел одна реплика, а для трёх коэффициент не существенный 9 раз всего, но уже 3-х реплик хватает для 90% случаев, т.е. range не широкий этой оптимизации, а риск недоступности данных велик)
- меньше серверов, но более мощных, т.е. с большим объёмом хранения — реже наступает увеличивать объём хранения , меньше сетевых соединений — выше скорость — все яица в одну корзину , репликация не даст консистентности т.к. в консистентном режиме не успеет за основной интенсивностью работы с данными
- проектировать системы так, чтобы они не требовали консистентности. (там не нужно использовать транзакции вообще, но приходиться думать головой, а это всегда риски для ИТ рынка в целом)
А вот последний вариант не ожиданный и интересный, и именно он сейчас в больших конторах обсуждается как основной. Смотрите что происходит:
Этого парня, выступающего, зовут Мартин https://www.infoq.com/presentations/event-streams-kafka
И он достаточно известный специалист в мире проектировщиков хранилищ. Собаку съел на консистентности, но говорит здесь о её отсутствии.
Он решает в этом дакладе другую проблему «сохранность системы в синхронизированном виде». https://www.infoq.com/presentations/event-streams-kafka
Конечно круто, если есть одна большая БД и в ней все : и обработка данных, и аналитика, и поиск, и все другие нагрузки. Она справляется, эта БД и с нагрузкой и со сложностью запросов… Но реально оказывается, особенно в мире систем микросервисов, что есть масса разных локальных и специальных БД, и надо чтобы они были «синхронизированы» — т.е. рассогласованы , но непрерывно и быстро согласующиеся.
Если сгласование это присходит очень быстро , то часто это приемлимо. Речь тут о Eventual consistency и только.
Акапелла в таком режиме может работать? и есть ли в этом смысл?
Если рассматривать такую архитектуру, то что мы должны предложить ?
- acapella vm читает данные из ячеек как обычно, это не консистентное чтение
- вычисляет результат, но пользователю легко управлять масштабированием
- но далее acapella формирует Events которые предназначены для дальнейшей обработки.
- Events либо складываются в очередь отдельно — никакие транзакции не возможны — скорость будет на высоте, но
никакойконсистентности — только Eventualy - events складываются в одну большую запись в Prepare канал. — скорость будет мала
- чтобы повысить скорость, обычно очередь разбивают на партишены, но естественно теряешь последовательность, поэтому думать надо ещё больше «какие запросы связанны друг с другом» (их надо последовательно применять), а какие нет (можно параллелить).
- Events либо складываются в очередь отдельно — никакие транзакции не возможны — скорость будет на высоте, но
Эта схема «полную консистентность» и уверенность в корректности не может дать, даже в перспективе никогда.
Мысль : для такой архитектуры схему данных надо проектировать и при этом думать. Также придётся планировать IO. Если это придеться делать, то зачем тогда acapella VM упрощает управление потоком исполнения ? Все равно программист уже сам думает и планирует — ему не сложно придумать сразу и IO и схему обработки данных и поток исполнения.
Пока вывод один : надо продолжать работу над «универсальной БД». типичный MySQL еще долго будет иметь нишу. PG тоже. А масштабируемость остро нужна именно тем, кто использует подобные подходы, тем кто хочет согласованные всегда реплики и надежность.
Есть масса проектов где возможностей RDBMS не хватает, а наш вариант и масштабируется линейно и согласованность данных сохраняет и надёжность — выше.
Идеи как увеличить скорость
Сразу скажу : эта идея не вышла эта, но опишу её.
Основной прирост скорости нам дал переход с Python на JVM , а не использование Aeron. Даже хуже : Aeron повысил требования к CPU серверов, а железа на котором он даёт наибольший прирост у нас пока нет. (будет ?)
Мы сравнили скорость работы нашего KV и варианта, быстро реализованного поверх Cassandra :
Конфигурация одного узла:
- 1 х Intel Core 2 Quad Q8400
- 1 х Ethernet controller: Intel Corporation 82541GI Gigabit Ethernet Controller
- 4 Гб DDR-2
- 2 x HDD Seagate 250 Gb каждый — программный RAID-1.
- Между собой соединены с помощью «свитча» Cisco SF200-48P 48-Port 10 100 PoE Smart Switch.
Приведу таблицу сравнения производительности нашего текущего варианта, и нового варианта реализванного поверх C* для трех типов действий
- GET
- CAS не транзакционный
- TR swap two keys
Результаты :
Здесь сравниваеться наше текущее решение и то, что мы получили бы если бы реализовали слой работы с данными на cassandra.
Это результаты тестирования Cassandra отдельно (правая часть). Видно, что в «транзакционном режиме» и у Cassandra скорость будет как у черепахи. Тут просто ничего не поделать — у нее тоже нет возможностей для транзакций, добавив к ней наш слой Tx мы опять теряем скорость.
Это скорости, которые мы могли бы получить если бы делали WARP транзкции, но они нам не нужны. Возможно лучше поняв рынок, мы возьмемся за них.
Характеристики полученного нами решения
Свойства реализованного нами KV
- большие транзакции с вычислениями по середине
- доступ к транзакции по нескольким запросам
- client heartbeat (keep-alive method)
- replication ,durability , availability
- отсутствие точки отказа, мастера, равенство узлов,
- отсутствие момента времени с возможностью отказа
- восстановление после сбоя
возможность комплексных запросов- expire — lazy
multi — cluster replicationmap — reduce possibilityserver side code, scripts , triggers,
Смысл в том, что если перейти на Cassandra мы не выиграем в скорости, но и не проиграем.
Выиграть можно только в надежности и снижении требований к серверам, упрощение кода (меньше пересылок даных между сервисами).
Мы теряем :
- Мы теряем возможность работы с индивидуальным NRW, но эта «фича» оказалась редко востребованной, кстати.
- огромные транзакции становяться невозможны. Повтор транзакции, это по сути повтор отправки batch запроса. а не повтор только лишь одной команды Commit. Этот batch не может быть очень большим.
Мы приобретаем :
- API сохраняется , наша система управления транзакцией остаётся
- Наш код станет чище и проще , сервера наших KV нод станут «stateless» (не будут даже друг с другом общаться)
- Работа с репликами уходит на уровень Кассандры, наш код станет проще
- Надежное хранилище. Мы в своём хранилище, конечно, уверены, но в случае с Cassandra других людей убеждать даже не придёться
- возможность стыковать с С* , например , MapReduce и обрабатывать массово данные
- С* умеет работать в нескольких дата — центрах, Aeron мы не проверяли, но вряд-ли он сможет
не в плюс и не в минус
- DT под касаандру может работать быстрее, как KV, но объём дерева должен помещаться на одну машину. Если это будет другой вид дерева , то мы не потеряем гигантские деревья
- у нашего KV сейчас скорость на 15-20% выше, она немного упадёт, но зато есть шанс увеличить скорость DT
Еще идея как увеличить скорость
Однозначно нужен тюнинг сетки и вообще «хорошая сетка».
- Число сетевых операций требующейся для надежности ПРИНЦИПИАЛЬНО не изменить. Путь один — сделать их быстрыми как только это возможно, возможно, нужно аппаратное решение.
- Выделить partition key для всего набора даных, в рамкох которого транзакции не будут пересекаться — чисто прикладная штука. Её не задать на уровне PaaS, зато скорость увеличиьтся в 5 — 10 раз (Cassandra Однокласники)
- малые транзакции (до 500_000 ключей)
- точка отказа
- её можно избежать если сделать как в однокласниках — выбор Лидера, (в таблице выше есть этот вариант)
- не пройдет «наш вариант длинных транзакций» , вся транзакция уходит в C* одним большим батчем.
Ссылка на Однокласников
Олег Анастасьев — За гранью NoSQL: NewSQL на Cassandra
Олег Анастасьев — За гранью NoSQL: NewSQL на Cassandra Java-конференция Joker 2014 Санкт-Петербург, 20 — 21 октября 2014 До недавнего времени в Одноклассника…
Итог
Вот такой свод информации 🙂 Это то, что мы имеем к текущему моменту:
Просто отличную быструю транзакционную KV систему, и переход на Cassandra её не сможет ускорить. Но еще мы имеем много планов по развитию и головняков с её тестированием 🙂
И Главное — хотим обратной связи от программистов / потребителей. Вопросы :
Нужны советы, на что нам стоит сделать упор ?
Что вам не нравиться сейчас в используемых вами хранилищах данных ?
Нужна ли вам транзакциолнность ? нравиться ли она ?
Нужны ли гигантские транзакции ? Да еще и без точек отказа и момента времени в который такая точка может существовать, на сколько это важно ?