Агрегаторы - aegisql/conveyor Wiki

Агрегаторы

В первой части статьи обсуждаются проблемы агрегации данных, подходы и требования к имплементации. Во второй рассматриваются некоторые решения на основе RACE Framework.

Часть I

Введение

Слово агрегатор можно перевести на русский язык как “собиратель” или “объединитель”. Наиболее общее описание шаблона приводится в книге “Шаблоны интеграции корпоративных приложений”. Краткое описание - на сайте шаблонов интеграции. Шаблон агрегатор так же описывается как один из шаблонов микросервисов.

Как не трудно догадаться из названия, агрегатор служит для объединения данных. Агрегатор это процесс с отслеживанием состояния, который собирает (агрегирует) родственные сообщения, и когда все необходимые сообщения получены, формирует из них итоговое сообщение или объект, который передается в выходной поток для дальнейшей обработки.

Aggregator рис.1

Особенностью шаблона является то, что входные потоки данных независимы, и отсутствует какая либо информация о количестве, порядке и времени доставки сообщений, и даже о том, будет ли какое либо ожидаемое сообщение доставлено вовсе. Канал (поток) ввода может быть один, как на рисунке, или несколько. Ключевым моментом, указывающим на необходимость использования агрегатора, является именно произвольный порядок следования блоков данных из которых необходимо создавать результирующие сообщения.

В приложениях основанных на потоках данных и сообщениях на агрегаторе лежит ответственная функция синхронизации, поскольку итоговое сообщение создается только после того как получены и обработаны все необходимые данные.

В первой части обсуждаются в деталях различные проблемы имплементации агрегаторов. Во второй даны примеры решений с использованием открытой Java библиотеки RACE Framework.

Общий Анализ шаблона

Взаимосвязь данных

В левой части диаграммы на рис 1. изображен некий абстрактный поток, из которого агрегатор один за другим извлекает сообщения. Сообщения следуют в произвольном порядке. Для начала, нужно понять как именно выделять из общего потока данные относящиеся к одной и той же сущности. Например, к одному и тому же заказу, или отчету. Для этого каждое сообщение должно, помимо данных, содержать CORRELATION_ID - локально уникальный идентификатор взаимосвязи между разными кусочками информации относящимися к одному и тому же предмету. Для каждой задачи существует свой собственный способ уникального описания объектов. Примеры CORRELATION_ID: ID сессии, уникальное имя пользователя, PID запущенного процесса, GUID созданный в начале запросов планировщиком задач, ID транзакции, первичный ключ записи в БД, и прочее. Возможны и сложные ключи, состоящие из нескольких полей разных типов. CORRELATION_ID может быть как уникальным так и использоваться повторно. Важно лишь, чтобы две разные сборки не получили, случайно, один и тот же CORRELATION_ID в интервале времени в котором они могут сосуществовать. Во-вторых, нужно уметь различать сами данные. Какую именно информацию содержит то или иное сообщение? Если каждый канал ввода данных поставляет данные строго одного и того же типа, то такой биркой может быть имя или ID канала связи. Если один и тот же канал используется для доставки данных разных типов, то бирка с именем типа сообщения должна быть частью сообщения. Примеры бирок: FIRST_NAME - сообщение содержит имя. E_MAIL - сообщение содержит адрес электронной почты. Бирки необходимы для того, чтобы агрегатор мог правильно обработать и расположить данные для построения финального объекта. Некоторые сообщения могут нести служебную функцию и не содержать информации которая непосредственно используется для создания итогового сообщения. Мы так же должны уметь различать их. Таким образом, каждое сообщение может быть охарактеризовано сложным двухкомпонентный ключом, состоящим из CORRELATION_ID и бирки типа сообщения. Разумеется, сообщение может сопровождаться и другими атрибутами, но эти два являются минимально необходимыми.

Продукт

В правой части диаграммы на рис 1. расположен продукт - итоговое сообщение или объект. Продукт представляет из себя простой POJO объект, не обладающий никаким особым поведениям, кроме функции контейнера для нужных нам данных. После того как все данные получены и обработаны агрегатор должен поместить этот объект в выходной канал. Что именно содержится в сообщении определяется задачей. Это может быть, например, пустое сообщение. С единственной целью уведомить следующий по цепочке процесс, что работа по агрегации завершена. Это так же может быть сложный объект, состоящий из всех тех частей, которые были получены агрегатором. Это может быть контейнер для несколько разных объектов для разных получателей. В любом случае, результирующий продукт должен иметь какую-то модель данных, а так же сопровождаться мета-информацией для правильной маршрутизации и размещении данных в выходных потоках.

Строитель продукта

В нетривиальных случаях, каким, очевидно, является агрегатор, задачу создания экземпляров объектов делегируют хорошо известным Порождающим шаблонам. Шаблон Строитель отлично подходит для создания экземпляра продукта по запросу. Задачей строителя является предоставление необходимых интерфейсов для временного хранения блоков данных и вспомогательной информации, трансформации и валидации данных, приведения типов, словом, всех операций необходимых для подготовки сложных данных к порождению требуемого продукта. Строитель не чувствителен к последовательности поступления блоков данных и, до известной степени, позволяет контролировать внутреннее представление продукта, в зависимости от поступившей информации. В качестве бонуса, класс продукта оказывается избавленным от ненужных зависимостей и перегруженной внутренней логики.

Другим важным достоинством строителя является возможность его независимой разработки и тестирования. Для этого не требуется строить полноценный агрегатор.

Строительные блоки

Для каждого продукта мы должны сопоставить его модель с имеющимися в наличии блоками данных. Мы так же выяснили, что каждый блок данных должен иметь бирку с именем типа, чтобы отличать его от других блоков. В простейшем случае, каждому полю POJO класса должен соответствовать отдельный тип сообщения. Например, сообщение помеченное биркой E_MAIL, скорее всего, содержит строку содержащую e-mail клиента и его значение должно быть присвоено полю eMail продукта. Модель входных данных может весьма серьезно отличаться от модели продукта. Входное сообщение может содержать нужную информацию в виде отдельного элемента своей собственной сложной структуры. Например,  e-mail может оказаться в поле e_mail JSON объекта UserInfo. Таким образом, для каждого блока данных должен быть поставлен в соответствие соответствующий метод интерфейса класса строителя и поля продукта, которые могут быть из него получены. Для строителя не составит проблемы произвести необходимую трансформацию и валидацию. Как видно из примера ниже, поля продукта, содержащие информацию о пользователе, могут быть доставлены как одним блоком, так и тремя разными блоками. При необходимости, агрегатор может поддерживать любой или оба из этих сценариев.

бирка пример блока данных метод строителя поля продукта
USER_INFO {
“firstName”: “John”,
“lastName”: “Smith”,
“e_mail”: “[email protected]
}
extractUserInfo(UserInfo userInfo) String firstName
String lastName
String eMail
FIRST_NAME John setFirstName(String firstName) String firstName
LAST_NAME Smith setLastName(String lastName) String lastName
E_MAIL [email protected] setEmail(String eMail) String eMail

Следующий шаг анализа потока входных сообщений должен определить какие компоненты являются обязательными, какие нет, и необходимо ли для не обязательных компонентов установить дефолтные значения. Некоторые типы входных данных могут не иметь прямого соответствия полям продукта, но при этом использоваться строителем для настройки процесса сборки.

Готовность продукта

В какой-то момент агрегатор должен принять решение о том, что строитель получил достаточное количество данных и может быть создан экземпляр продукта для завершения процесса. Но, на основании чего принимается такое решение? Верный ответ - на основании всей имеющейся совокупности данных. Как внутренних, полученных через каналы связи, так и внешнего “контекста выполнения”. На практике такое сверх-обобщенное требование к алгоритму возникает не всегда, но его нужно иметь в виду, чтобы быть готовым к изменяющимся требованиям развивающейся системы.

Можно привести несколько примеров, которые требуют разных подходов к разработки предиката готовности.

  • Готовность по получении всех ожидаемых блоков данных. Один из самых простых и очевидных алгоритмов. Для каждого CORRELATION_ID агрегатор ожидает получение известного количество блоков каждого типа.
  • Готовность после получения одного сообщения из любого канала. Для конкурирующих процессов. “Выигрывает” первый присланный ответ.
  • Готовность после определенного времени ожидания. Задача - накопить как можно больше данных за отведенный временной интервал. Если данные недополучены возможно присвоение дефолтных значений отсутствующим полям.
  • Готовность по внешнему событию. Процесс генерирующий данные для агрегатора знает когда ввод завершен, например, файл полностью прочитан, и посылает особое терминальное сообщение, интерпретируемое агрегатором как готовность всего набора данных. Само терминальное сообщение, при этом, не обязательно содержит какие-либо данные и не используется при создании продукта.

Алгоритм предиката готовности может быть как статическим, если агрегатор ожидает выполнения одного и того же сценария, так и динамическим, например, ожидаемое число блоков информации и их типы может быть передано отдельным сообщением при инициализации агрегатора.

Возможна комбинация вышеперечисленных подходов. Если одним из блоков данных является список неизвестной длины, то после окончания ввода последнего элемента списка может быть послано терминальное сообщение, относящееся только к этому списку. Остальные блоки при этом описываются статическим алгоритмом. Другой пример, агрегатор может создать продукт сразу как только получены все возможные блоки данных, или после определенного времени ожидания, но только если получены все обязательные блоки. Не обязательные блоки могут при этом оставаться пустыми или получить дефолтные значения.

Если строитель может производить разные представления продукта, то и алгоритм готовности, в общем случае, может быть различным для каждого представления.

Жизненный цикл агрегатора коротко

Процесс агрегации, в общем случае, имеет начало и конец. Звучит слишком банально, если не принимать во внимание, что этот временной интервал может выходить за рамки времени жизни приложения в котором существует данный агрегатор. В абсолютных единицах время агрегации может быть любым. От миллисекунд до минут, часов, дней, месяцев. В начале необходимо создать экземпляр строителя и ассоциировать его с CORRELATION_ID, а так же произвести первоначальную настройку и присвоить дефолтные значения необязательным полям. По мере поступления данных агрегатор находит строителя по соответствующему CORRELATION_ID, а на основе бирки блока данных решает какому методу строителя передать данные. После обработки очередного блока данных вызывается предикат готовности, который должен дать ответ, все ли необходимые для построения продукта данные получены. Если все данные получены строитель порождает экземпляр продукта и передает его обработчику выходного потока. После чего CORRELATION_ID и экземпляр строителя удаляются, а ресурсы высвобождаются.

Если не все данные получены предикат готовности вернет false и агрегатор продолжит обработку входящей очереди сообщений.

Если в момент ввода произошла ошибка, то сам ошибочный блок данных, а так же незавершенный экземпляр строителя должны быть помещены в выходной поток обработки ошибок со всей сопутствующей появлению ошибки информацией.

Агрегатор обладает потенциалом блокировки и бесконтрольного расходования ресурсов, поэтому, разумно иметь ограничения на размеры входящей очереди сообщений и общего количества хранимых объектов, а так же установить максимальные время ожидания получения всех данных.

Инициализация строителя

Итак, первое что должно произойти - экземпляр строителя сам по себе должен быть получен и правильно проинициализирован. Существует некоторое противоречие, которое нужно иметь в виду. Дело в том, что агрегатор - это процесс с состоянием, как таковой является антитезой процессов без состояния. Сам термин агрегатор возникает в контексте обсуждения реактивных процессов, потоков данных, которые не имеют и не должны иметь памяти о своем состоянии. Агрегатор может знать, есть ли в его коллекции строитель для данного CORRELATION_ID, но ни входной поток слева, ни сами блоки данных об этом знать не могут и не должны. Поэтому, необходимо рассмотреть два сценария для инициализации.

Первый способ. Триггером агрегации является единичный процесс. Примером может являться широко применяемая стратегия split-apply-combine. Единичный процесс (split) разбивает задачу на подзадачи и генерирует или иным образом получает CORRELATION_ID. Первым сообщением которое должен послать этот процесс должно быть сообщение агрегатору о начале очередной сборки, после чего он может раздать подзадачи рабочим процессам. Рабочие процессы помечают свои результаты соответствующими бирками и тоже отдают их агрегатору, но первым в очереди всегда должно оказаться сообщение об инициализации со всеми необходимыми для инициализации параметрами. Первый способ позволяет осуществлять максимальный контроль над процессом инициализации.

Второй способ. Все процессы производящие данные независимы, могут оперировать с единым CORRELATION_ID, но невозможно или трудно выделить “первичный” процесс. Например, вы запустили скрипт который должен прочитать CSV файл, разбить на поля, и отдать агрегатору. Невозможно узнать без чрезмерного усложнения всей логики скрипта, являются ли записи в этом файле новыми данными, или продолжением серии начатой в другом файле, либо даже тем же самым файлом загружаемым второй раз. В таком случае, агрегатор должен уметь сам создавать экземпляр строителя и делать это каждый раз когда в его поле зрения появляется CORRELATION_ID для которого не найдено соответствующего строителя. Второй способ можно применять и в том случае если можно выделить стартовый процесс, но сама инициализация строителя тривиальна и не требует каких то особых настроек, уникальных для каждого случая. 
Данный метод удобен, но следует помнить о возможном побочном эффекте, а именно, непреднамеренном начале агрегации при поступлении дубликата, устаревшего или ошибочного блока данных. Если такой риск есть, агрегатор должен поддерживать удаление объектов по таймауту с соответствующей обработкой внештатной ситуации.

Управление временем жизни

При неперегруженной очереди входящих время окончания сборки приблизительно равно времени поступления последнего блока данных. Если этот блок так и не появится в очереди, агрегатор будет вечно ожидать ввода и никогда не освободит ресурсы. Подобные неоконченные сборки имеют тенденцию накапливаться, что приводит как к неконтролируемой утечке ресурсов, так и к отдаленным последствиям, поскольку, очевидно, результаты сборки никогда не достигнут своей цели, и даже не будут обработаны как ошибка. Поэтому, агрегатор должен иметь возможность устанавливать предельное время ожидания данных, или, таймаут. Это полезно даже для таких процессов, в которых появление ошибки крайне маловероятно, поскольку ошибки могут появиться непреднамеренно, в процессе вносимых в программу изменений.

Что такое время с точки зрения требований к системе? Можно выделить два способа описания, которые встречаются в спецификациях.

  • сколько ждать. Время отсчитывается он начала агрегации и указывается длина интервала времени в который должен уложиться процесс. Например, установить таймаут на агрегацию 10 секунд. Это наиболее распространенный способ. Он кажется простым и интуитивно понятным, но, имеет некоторые проблемы. Существенную неопределенность здесь представляет определение времени начала агрегации. Для обычного агрегатора “время начала агрегации” это время получения первого сообщения и инициализации строителя, в то время как для клиента, ожидающего результат, временем начала агрегации является его собственное время отправления запроса. При перегруженной очереди ввода таймаут для пользователя может возникнуть еще до того как будет обработан первый блок данных. Хуже того, система будет продолжать тратить и без того ограниченные ресурсы на задачу которая уже никому не нужна. Именно в этом месте спецификации часто грешат неполнотой. Для уменьшения влияния задержек возникающих на промежуточных этапах можно сопровождать блок данных атрибутом указывающим от какой точки времени следует вычислять таймаут.
  • когда закончить. Указывается абсолютное время окончания ожидания. Например, завершить процесс до 12:45:03PM 1 сентября 2022 года . Очевидно, что ответственность за вычисление предельного времени берет на себя какой то внешний компонент системы. Для агрегатора это более предпочтительный вариант, поскольку не подразумевает никакой интерпретации. При перегруженной системе абсолютное время тайм-аута позволяет отбросить данные которые устарели еще до начала обработки и разгрузить систему. Если процесс верхнего уровня представляет из себя цепочку событий, то абсолютное время проще передавать от одного сервиса к другому.

Совершенно очевидно, что оба метода взаимозаменяемы, и выбор целиком зависит от удобства реализации конкретных требований.

В обоих случаях можно рассмотреть несколько источников откуда система может получить информацию о тайм-ауте.

  • не надо так усложнять. Все предыдущие рассуждения могут быть не важны для вашей системы. Вы просто хотите элементарного контроля над утечкой памяти и блокировками. Тогда один общий для всех механизм первого типа с тайм-аутом, определяемым конфигурацией системы будет достаточен.
  • пусть клиент решает. Запрос пользователя должен в явном виде содержать атрибуты управления временем в любом удобном ему виде. Эти атрибуты должны учитываться при инициализации конкретного агрегатора. Клиентом здесь может выступать как реальный пользователь, так и процесс ждущий результатов агрегации.
  • пусть данные решают. Время тайм-аута динамически вычисляется на основании имеющихся данных и типа создаваемого объекта.

Таймаут

Если таймаут произошел, то система может отреагировать несколькими способами

  • Ошибка. Очевидный способ - передать все накопленные данные потоку обработки ошибок с соответствующим статусом и высвободить ресурсы.
  • Последний шанс. Как было отмечено ранее, блоки данных могут быть как обязательными, так и нет. Если таймаут все же произошел, но все обязательные блоки к этому моменту уже получены, то вместо генерации ошибки можно получить экземпляр продукта и передать его обычной выходящей очереди. Примером может быть обработка массива данных бэтчами, причем в потоке данных могут быть ощутимые перерывы, но явного окончания поток не имеет. Агрегатор накапливает оптимальное количество блоков, после чего передает их далее как единое целое. “Хвост” потока почти наверняка окажется короче оптимального размера бэтча и агрегатор некоторое время будет продолжать ждать поступления данных. Когда произойдет таймаут накопленные к данному времени данные все равно можно и нужно будет обработать. Небольшая задержка при обработке последней порции данных, как правило, приемлема.
  • Еще минуточку. Иногда возникает необходимость скорректировать время тайм-аута. Продлить или сократить время ожидания. Этого можно добиться если агрегатор поддерживает специальные управляющие команды. Учитывая асинхронную природу агрегации, такие команды не могут гарантировать, что данный агрегатор либо еще, либо уже не существует. Более надежно динамически корректировать таймаут средствами самого агрегатора на основе накопленных строителем данных. Например, добавлять определенное время если наблюдается активность поступления данных. Рассмотрим как пример агрегатор ставок аукциона. Если активность торговли низкая, то нет необходимости ничего корректировать, аукцион закончится вовремя, но если некий хитрый участник рассчитывает сделать ставку в последнюю секунду до окончания торгов, чтобы отмести конкурентов, то имеет смысл добавить к торгам еще 5 минут, чтобы все участники имели шанс ответить, и так продлевать его, пока ставки растут.

Обработка ошибок

Ошибки слишком разнообразны по своей природе. Выше уже упоминались некоторые категории ошибок, связанные с невозможностью обработки какого-то блока данных, с наступлением тайм-аута, перегруженностью системы, и прочие. Важно, чтобы агрегатор обладал достаточной гибкостью и позволял адресовать проблемы обработки ошибок с разных точек зрения. Уже упоминалось, что наряду с потоками ввода и вывода агрегатор должен иметь и поток обработки ошибок. Всякая ошибка должна сопровождаться подробным контекстом ее возникновения. CORRELATION_ID, биркой, типом ошибки, исключением, если оно было зафиксировано, а так же экземпляром незавершенного строителя, если он уже был создан.

Ошибки можно разделить на критические и не критические. Возникновение критической ошибки приводит к прекращению процесса агрегации и высвобождению ресурсов. Примеры критических ошибок - таймаут, несоответствие типа данных ожиданиям строителя, ошибки валидации. Не критические ошибки должны быть обработаны, но процесс агрегации не прерывается. Пример не критической ошибки - ошибка ввода пользователя. После получения уведомления об ошибке пользователь имеет шанс попробовать еще раз.

Масштабирование

Агрегаторы достаточно естественно и легко масштабируются по CORRELATION_ID. Достаточно, чтобы любой блок с одним и тем же CORRELATION_ID всегда подавался на вход одного и того же агрегатора. Этот подход хорошо работает при равномерном распределении блоков. Иногда, впрочем, может оказаться так, что 80% всего траффика приходится на 20% CORRELATION_ID. В этом случае разумно включить в сообщение дополнительные атрибуты для лучшей балансировки.

Так же может оказаться, что неравнозначны типы блоков данных. Если некоторые блоки являются тривиальными полями и присваивание выполняются практически мгновенно, то другие могут потребовать сложной обработки для извлечения полезной информации. В этом случае можно разделить агрегатор на каскад связанных между собой агрегаторов. Выделить больше потоков для обработки медленных данных. Очевидно, что алгоритм готовности для промежуточной и окончательной агрегации будет в этом случае отличаться, а балансировщик должен учитывать как CORRELATION_ID так и бирку блока.

Эластичность

В идеале, агрегатор должен извлекать и обрабатывать данные из входного потока со скоростью соизмеримой со скоростью заполнения этого потока. Иными словами, входной поток должен быть, как правило, пустым. Опасная ситуация возникает если данные поступают в большом объеме и быстрее, чем агрегатор может их обрабатывать.

Симметричная ситуация возникает с выходным потоком. Невозможность забирать данные из выходного потока с достаточно скоростью создает противодавление, которое приводит к росту размеров входящей очереди.

Желательно чтобы агрегатор имел возможность как устанавливать разумные ограничения на размеры входящей очереди, так и имел механизмы оптимизации ввода больших объемов входящих данных. Часть этих вопросов выходит далеко за рамки обсуждения данного шаблона, но следует помнить, что данная опасность всегда подстерегает системы с отслеживанием состояния при любой попытке наивной имплементации.

Трудно как-то предсказывать или контролировать проблемы возникающие из за противодавления вызванного консьюмерами выходящего потока, но есть вещи которые сам агрегатор и его строители должны избегать.

Во-первых, строитель должен избегать любых синхронных действий сопровождающихся сторонними эффектами. Даже такая простая операция как запись в лог, при неправильной конфигурации системы, может привести к серьезным проблемам и потере данных. Практически все что делает строитель должно быть либо тривиальным присваиванием полей, либо чисто алгоритмической обработкой данных, типа простого парсинга или приведения типов. Строитель который просто присваивает значения своим полям обладает колоссальной пропускной способностью и его трудно перегрузить.

Во-вторых, строитель и обработчики выходных очередей не должны выполнять никаких длительных вычислений. Если в них есть необходимость - делегируйте их рабочему процессу, а результат требуйте в виде еще одного типа сообщений.

Строитель, как правило, не чувствителен к последовательности обработки строительных блоков. Их реальное применение происходит при получении экземпляра продукта, и тогда нужная последовательность операций может быть задана явным образом. Но, этого нельзя сказать об агрегаторе в целом.

Рассмотрим ситуацию. В какой то момент времени во входящую очередь поступает большое количество блоков типа А, которые можно быстро получить, но обрабатываются они медленно. Через некоторое время поступает такое же количество блоков типа Б. Они получаются медленно, но обрабатываются быстро, и вместе с А завершают агрегацию. Что произойдет? Первый результат будет получен только после того как будут обработаны все блоки А. Это не только задерживает получение результата, но и создает большую нагрузку на память и неравномерность нагрузки на стороне получателя. Если между потоками А и Б есть взаимная корреляция (первый А соответствует первому Б), что вероятно, то оптимальной стратегией было бы “пропустить Б вперед”. Иными словами, для достижения лучшей эластичности системы входящая очередь должна уметь работать с приоритетами данных. Само значение приоритета может быть еще одном атрибутом сопровождающим блок. Другие примеры приоритетов: сообщения чьи времена тайм-аута наступают раньше следует обрабатывать в первую очередь; отдавать приоритет более старым сборкам; приоритет устанавливается клиентом.

Персистенс

Если агрегатор хранит состояние сборки в оперативной памяти, то могут возникнуть определенные проблемы. Первая проблема очевидна - потеря данных при сбое программы или железа. Но сохранение данных требуется не только для защиты от сбоев. Выше было отмечено, что время жизни процесса агрегации может быть дольше, иногда значительно дольше, чем время работы приложения которое осуществляет сборку. Причем объемы этих “отложенных” сборок могут превышать объем доступной памяти. В принципе, агрегатор может хранить в памяти только те данные, которые необходимы для обработки текущего блока, вычисления момента готовности и получения продукта, или даже фрагмента продукта. Так как это делают классические мэп-редьюс алгоритмы на базе Хадупа. С практической точки зрения, для лучшей производительности, агрегатор должен по максимуму задействовать память для сохранения состояния активных в данный момент сборок, подгружая данные по мере необходимости.

Конечно, сохранять нужно не только данные, но и сопровождающую мета-информацию. Такую как CORRELATION_ID, бирки, время тайм-аута, приоритеты, и прочее.

Персистенс требуется не всегда. Данные уже могут быть сохранены где-то в системе, если вы используете для ввода обычные файлы, шину сообщений, типа КАФКА, или базу данных. Если произошло обычное или аварийное прерывание сборки часто можно просто заново применить те же запросы. Но, это не всегда возможно. В этом случае, желательно чтобы сам агрегатор мог предоставить механизмы безопасного постоянного хранения входящих сообщений.

Определенную трудность так же представляет определение данных требующих повторной загрузки. Если произошел сбой и нужно повторить ввод, то прогонять заново весь поток ввода может быть крайне неэффективно. Ведь большинство сборок уже закончены и переданы получателю. С другой стороны, нам не может быть заранее известно, где на временной шкале находится первый блок данных из незавершенных транзакций. Для решения этой проблемы полезно поддерживать лог завершенных и не завершенных сборок и выстраивать стратегию восстановления после сбоя основываясь на этой информации.

Если агрегатор использует персистенс, не важно, собственную или как часть системы ввода данных, возникает вопрос о гарантированной доставке. После того как продукт получен клиентом и есть подтверждение этому, входные данные можно удалить или архивировать. То есть, клиент-получатель должен каким-то образом взаимодействовать с поставщиками данных или системой архивации. Это не желательный вид зависимости. Схема агрегатора структурно очень похожа на схему другого популярного шаблона проектирования - моста (https://ru.wikipedia.org/wiki/Мост_(шаблон_проектирования)). Шаблон мост применяют когда нужно отделить абстракцию от множественных вариантов реализации. Это именно то, что нужно. Может существовать множество вариантой доставки блоков данных, и так же много вариантов кому и как передать экземпляр продукта. Стандартизованный интерфейс передачи уведомлений о завершенной доставке позволяет избежать излишних зависимостей в коде и гибко переключаться с одной реализации на другую.

Еще один аспект, требующий сохранения информации о текущих и ранее произведенных сборках - это проблема дубликатов сообщений. Любой алгоритм производящий суммирование требует чтобы каждое входящее было учтено ровно один раз. Очевидно, никто не хочет, чтобы из за ошибки браузера сума была дважды списана с кредитной карты. Агрегатор хранит данные о текущих сборках, но не может и не должен хранить информацию обо всех когда либо произведенных действиях. Если такая потребность может возникнуть, она должна иметь поддержку со стороны системы постоянного хранения данных.

Промежуточные итоги

Итак, для построения агрегатора нам понадобятся ответы на следующие основные вопросы

Зачем нужно
Что является CORRELATION ID Агрегатор должен уметь различать к какому экземпляру строителя продукта относится то или иное сообщение
Какие типы входящих сообщений существуют и какие бирки им соответствуют Понять какому методу интерфейса строителя отдать то или иное сообщение, как их обозначить
Что мы собираемся получить Создать класс/модель для продукта
Как мы собираемся это получить из имеющихся блоков данных Создать класс строитель, способный произвести продукт из имеющихся данных
Какие блоки данных являются обязательными, а какие нет Пометить обязательные и не обязательные блоки, и дефолтные значения для них
Как определить готовность Описать алгоритм готовности в виде предиката, использующего полученные в процессе сборки данные и метаданные
Куда поместить результат Агрегатор должен гарантировать безопасную и бесперебойную доставку сообщений всем получателям
Что делать с ошибками Разработать стратегию обработки ошибок
Управление временим сборки Выбрать стратегию управления временем жизни сборки в соответствии с задачей, указать поведение агрегатора в случае наступления тайм-аута

Часть II

Примеры разработки агрегатора

Данный раздел не является заменой документации или руководством для разработки. В ней приводятся примеры имплементации обсуждаемые в предыдущем разделе типы агрегаторов с использованием RACE Framework.

Рассмотрим простейший пример агрегатора. Следуя традиции, пусть агрегатор выведет в стандартный поток вывода строку “Hello, World!”

Зачем нужно
Что является CORRELATION ID Пусть блоки данных определяется уникальным случайным UUID
Какие типы входящих сообщений существуют и какие бирки им соответствуют Один поток данных, пометим его как “greeting”, поставляет первую часть строки - приветствие. Второй, ”name”, имя субъекта к которому относится приветствие. Типом для бирок будет String
Что мы собираемся получить Строку вида “, !” полученную из двух частей, соединенных “, ”, и оканчивающуюся “!”
Как мы собираемся это получить из имеющихся блоков данных За сборку будет отвечать класс GreetingBuilder. Экземпляр строителя будет создаваться автоматически для каждого нового приветствия.
Какие блоки данных являются обязательными, а какие нет Все блоки являются обязательными
Как определить готовность Готовность наступает когда получены обе части, “greeting” и “name”
Куда поместить результат Выведем результат в стандартный поток
Что делать с ошибками Оставим дефолтное поведение - ошибки выводятся в стандартный лог
Управление временим сборки Не определено

Продуктом у нас является простая строка, и нам не надо создавать для нее никакого дополнительного контейнера. Воспользуемся стандартным Java классом String.

Определим класс-строитель GreetingBuilder

public class GreetingBuilder implements Supplier<String> {
// Сохраняем строительные блоки в соответствующих полях
private String greeting;
private String name;
// Метод-фабрика для создания продукта
@Override
public String get() {
return greeting + ", " + name + "!";
}
// Сеттер для приветствия
public void greeting(String value) {
greeting = value;
}
// Сеттер для имени
public void name(String value) {
name = value;
}
}

Создадим простой агрегатор использующий GreetingBuilder и отвечающий вышеперечисленным требованиям.

// Создаем экземпляр конвейера
Conveyor<UUID, String, String> greetingAggregator = new AssemblingConveyor<>();
// Конвейер создает экземпляр строителя с помощью дефолтного конструктора
greetingAggregator.setBuilderSupplier(GreetingBuilder::new);
// Продукт выводим в стандартный поток вывода
greetingAggregator.resultConsumer(bin->System.out.println(bin.product)).set();
// Связываем бирки с методами строителя
greetingAggregator.setDefaultCartConsumer(Conveyor.getConsumerFor(greetingAggregator, GreetingBuilder.class)
.when("greeting", GreetingBuilder::greeting)
.when("name", GreetingBuilder::name)
);
// Задаем предикат готовности
greetingAggregator.setReadinessEvaluator(Conveyor.getTesterFor(greetingAggregator).accepted("greeting","name"));

// Тестируем:
// Используем случайный CORRELATION_ID
UUID correlationID = UUID.randomUUID();
// Посылаем данные. Порядок не важен
greetingAggregator.part().id(correlationID).label("greeting").value("Hello").place();
greetingAggregator.part().id(correlationID).label("name").value("World").place();
// Ждем завершения всех активных сборок и останавливаем конвейер
greetingAggregator.completeAndStop().join();

Ожидаемый вывод:

Hello, World!

Итак, что произошло?

Во-первых, мы создали класс строитель имплементирующий стандартный интерфейс поставщика строк (implements Supplier). Любой строитель используемый конвейером должен предоставить метод String get() интерфейса Supplier. Возвращаемый тип является типом продукта. Как видно, строитель достаточно тривиален, никак не зависит от конвейера и его легко протестировать независимо. Потоковую безопасность строителя обеспечивает RACE, поэтому разработчику в обычных случаях беспокоиться о ней не нужно.

Во вторых, мы создали и настроили конвейер - основной класс используемый в RACE для осуществления сборки и контроля. Конвейер требует трех типовых параметров.

  • Тип CORRELATION ID
  • Тип для бирок
  • Тип продукта

В нашем первом примере

TYPE
CORRELATION_ID UUID
LABEL String
PRODUCT String

Следующий шаг - поскольку мы выбрали второй способ инициализации строителя, конвейер должен знать как создавать экземпляр строителя для каждой новой сборки. Передаем методу setBuilderSupplier дефолтный конструктор класса GreetingBuilder.

Далее, метод resultConsumer устанавливает поведение после получения экземпляра продукта. Мы решили, что хотим просто поместить результат в STDOUT.

Метод setDefaultCartConsumer связывает строковые бирки с конкретными методами строителя.

И, наконец, setReadinessEvaluator устанавливает предикат готовности. Мы выбираем один из стандартных предикатов - готовность наступает немедленно после того как получены оба типа блоков.

Тестирование:

Конвейеру необходимо передать минимум три параметра. CORRELATION_ID, LABEL, и само значение. Метод place() асинхронно помещает блок данных в очередь. Вся дальнейшая обработка данных происходит в собственном потоке конвейера.

Поскольку наша цель состоит в том, чтобы убедиться в работоспособности этой минимальной конфигурации, мы должны синхронизировать потоки теста и конвейера. Существует несколько способов. Метод completeAndStop блокирует ввод, позволяет конвейеру завершить незаконченные сборки, после чего останавливает конвейер.

В результате на экране должно появиться сообщение “Hello, World!”

Типизация конвейера

RACE не диктует пользователям какой тип данных использовать в качестве CORRELATION_ID или LABEL. Выбор зависит от самих данных и требований эффективности. Общие рекомендации для CORRELATION_ID. Чаще всего уже имеющихся классов типа String, UUID, Long, Integer вполне достаточно. В редких случаях может понадобиться создать собственный класс. Например, чтобы хранить сложный ключ содержащий несколько полей. Такой класс должен иметь хорошую имплементацию методов hashCode и equals. Если предполагается хранить в памяти большие объемы данных полезно чтобы объекты могли быть естественным образом упорядочены. То есть, имплементировали Comparable интерфейс.

Например, выше приведенный пример можно модифицировать для использования в качестве CORRELATION_ID строк вместо UUID:

// Создаем экземпляр конвейера
Conveyor<String, String, String> greetingAggregator = new AssemblingConveyor<>();
// Далее все как в предыдущем примере:
….
// Тестируем:
// Создаем строковый CORRELATION_ID
String correlationID = "HelloWorld_00001";
// Посылаем данные. Порядок не важен
greetingAggregator.part().id(correlationID).label("greeting").value("Hello").place();
greetingAggregator.part().id(correlationID).label("name").value("World").place();

Второй типовой параметр конвейера задает тип для бирок. В примере используются строки. Строки удобны своей наглядностью. Впрочем, для бирок можно выбирать любой тип. Константа с подходящим именем в коде не менее читабельна, чем строка.
Чаще всего количество бирок ограничено и соответствует количеству методов интерфейса строителя. Поэтому, самым удобным типом являются перечислимые бирки. При их использовании меньше вероятность допустить ошибку или раньше ее обнаружить. Для нашего примера класс можно было бы определить так:

public enum GreetingLabel {
GREETING,NAME;
}

Конвейер поддерживает специальный функциональный интерфейс SmartLabel<B> позволяющий наиболее эффективным образом связать метод строителя с биркой.

public enum GreetingLabel implements SmartLabel<GreetingBuilder> {
GREETING(GreetingBuilder::greeting)
,NAME(GreetingBuilder::name);
final BiConsumer<GreetingBuilder, Object> consumer;
<T> GreetingLabel(BiConsumer<GreetingBuilder, T> consumer) {
this.consumer = (BiConsumer<GreetingBuilder, Object>) consumer;
}
@Override
public BiConsumer<GreetingBuilder, Object> get() {
return consumer;
}
}

Определенный таким образом класс для бирок упрощает использование конвейера. Нет необходимости устанавливать CartConsumer. Тем не менее, функциональный интерфейс LabeledValueConsumer обладает некоторыми дополнительными свойствами, которые делают его использование более предпочтительным в некоторых случаях.

Пример конвейера использующего SmartLabel.

// Создаем экземпляр конвейера
Conveyor<UUID, GreetingLabel, String> greetingAggregator = new AssemblingConveyor<>();
// Конвейер создает экземпляр строителя с помощью конструктора
greetingAggregator.setBuilderSupplier(GreetingBuilder::new);
// Продукт выводим в стандартный поток вывода
greetingAggregator.resultConsumer(bin->System.out.println(bin.product)).set();
// Задаем предикат готовности
greetingAggregator.setReadinessEvaluator(Conveyor.getTesterFor(greetingAggregator)
.accepted(GREETING,NAME));
// Тестируем:
// Используем случайный CORRELATION_ID
UUID correlationID = UUID.randomUUID();
// Посылаем данные. Порядок не важен
greetingAggregator.part().id(correlationID).label(GREETING).value("Hello").place();
greetingAggregator.part().id(correlationID).label(NAME).value("World").place();
// Ждем завершения всех активных сборок и останавливаем конвейер
greetingAggregator.completeAndStop().join();

Дефолтные значения и настройка строителя

В первом примере пробел отделяющий приветствие от имени, а так же символ завершающий строку, задаются в коде литералами. Допустим, мы хотим добавить гибкости в нашу систему и сделать их настраиваемыми. Для начала, изменим слегка класс строителя.

public class GreetingBuilder implements Supplier<String> {
// Сохраняем строительные блоки в соответствующих полях
private String greeting;
private String name;
private String spacer = " “; //часть дефолтных значений
private String eol = "."; // можно определить в коде строителя

    // Метод-фабрика для создания продукта
    @Override
    public String get() {
        return greeting + spacer + name + eol;
    }
    // Сеттер для приветствия
    public void greeting(String value) {
        greeting = value;
    }
    // Сеттер для имени
    public void name(String value) {
        name = value;
    }
    // Сеттер для пробела
    public void spacer(String spacer) {
        this.spacer = spacer;
    }
    // Сеттер для конца строки 
    public void eol(String eol) {
        this.eol = eol;
    }
}

Как видно, мы вынесли литералы в поля, а так же создали соответствующие сеттеры. Какие-то дефолтные значения можно, разумеется, задать прямо в коде, как в данном примере, но нас интересует динамическая настройка.

Во первых, мы можем изменить способ создания экземпляра строителя. вместо

// Конвейер создает экземпляр строителя с помощью дефолтного конструктора
greetingAggregator.setBuilderSupplier(GreetingBuilder::new);

Можно определить Supplier по-другому:

// Альтернативный способ инициализации строителя
greetingAggregator.setBuilderSupplier(()->{
GreetingBuilder greetingBuilder = new GreetingBuilder();
greetingBuilder.spacer(", ");
greetingBuilder.eol("!");
return greetingBuilder;
});

Разумеется, раз уж мы создали сеттеры для пробела и окончания строки, мы можем рассматривать их как обычные сообщения. Отличительной особенностью будет то, что они должны быть гарантировано обработаны до получения главных значений приветствия и имени. Обратите внимание, мы не изменили определение для предиката готовности. Это значит, что получение или не получение необязательных полей SPACER или EOL никак не повлияет на момент наступления готовности. Это не является существенной проблемой, если начало процесса агрегации хорошо определено, а входящая очередь FIFO.

public enum GreetingLabel implements SmartLabel<GreetingBuilder> {
GREETING(GreetingBuilder::greeting)
,NAME(GreetingBuilder::name)
,SPACER(GreetingBuilder::spacer)
,EOL(GreetingBuilder::eol);
final BiConsumer<GreetingBuilder, Object> consumer;
<T> GreetingLabel(BiConsumer<GreetingBuilder, T> consumer) {
this.consumer = (BiConsumer<GreetingBuilder, Object>) consumer;
}
@Override
public BiConsumer<GreetingBuilder, Object> get() {
return consumer;
}
}

Теперь, как обычно, посылаем значения.

// В начале посылаем строки для пробела и окончания приветствия
greetingAggregator.part().id(correlationID).label(SPACER).value(", ").place();
greetingAggregator.part().id(correlationID).label(EOL).value("!").place();
// Затем приветствие и имя        
greetingAggregator.part().id(correlationID).label(GREETING).value("Hello").place();
greetingAggregator.part().id(correlationID).label(NAME).value("World").place();

Что делать, если начало сборки определяется с трудом, да и просто нет желания загромождать очередь лишними сообщениями? Можно определить статические данные.

// Посылаем статические данные для пробела и окончания строки
// Они будут применяться ко всем агрегатам.
greetingAggregator.staticPart().label(SPACER).value(",\n").place();
greetingAggregator.staticPart().label(EOL).value(";").place();

Статические данные связаны с биркой, хранятся в единственном экземпляре на уровне конвейера, и автоматически применяются сразу после создания экземпляра строителя. Так что, если вы все же решите позже послать эти значения обычным образом, новые данные перезапишут ранее установленные статические блоки.

Если вы заметили, загрузчик для staticPart не имеет метода id(), так как относится ко всем будущим сборкам. Изменение staticPart возможно и в середине цикла программы, но оно не окажет влияния на уже начатые сборки.

Параметризация

Иногда возникает необходимость сохранить и передать информацию, которая не является частью продукта. Например. Пусть мы хотим не просто составить и напечатать строку с приветствием, а отправить это приветствие клиенту по электронной почте. У каждого клиента, конечно, свой собственный e-mail. Проблема заключается в том, что само сообщение этой информации не содержит. И, даже если бы содержало, извлечь эту информацию обратно из готовой строки и сложно, и бессмысленно. Возможное решение могло бы состоять в переопределении продукта таким образом, чтобы он содержал как текст приветствия, так и необходимые для общения с клиентом электронной почты метаданные. Но, это не слишком элегантное решение, навязывающее изменение существующего контракта. Загрузчик сообщений позволяет добавить с сообщению дополнительные атрибуты, которые хранятся все время существования сборки и доступны консьюмеру продукта.

// Обогащаем любое сообщение дополнительными атрибутами.
greetingAggregator.part().id(correlationID).label(SPACER).value(", ")
.addProperty("E-MAIL","[email protected]")
.addProperty("SUBJECT","Greetings from RACE").place();

Консьюмер продукта имеет доступ к параметрам

// Продукт выводим в стандартный поток вывода вместе с е-мэйлом и сабжектом
greetingAggregator.resultConsumer(bin-> {
String eMail = (String) bin.properties.get("E-MAIL");
String subject = (String) bin.properties.getOrDefault("SUBJECT","Greetings");
System.out.println("MailTo: "+eMail);
System.out.println("Subject: "+subject);
System.out.println(bin.product);
}).set();

Обработка ошибок

Рано или поздно в любом потоке данных может возникнуть сбой. Это может произойти на любом этапе. На этапе начальной загрузки сообщения, на этапе его обработки, на этапе сборки продукта, и, наконец, на этапе отправки в выходной поток. Виды ошибок так же ничем не ограничены.

В конвейере обращение с потоком ошибок похоже на работа с потоком результатов. Существует функциональный интерфейс ScrapConsumer который можно настроить под любые нужды пользователя. Продолжая развивать наш пример, настроем конвейер так, чтобы он отвергал пустые приветствия и имена. Сделаем это двумя способами.

Для начала, добавим две строчки к конфигурации конвейера:

// Добавим проверку, что никакие значения не равны NULL
greetingAggregator.addCartBeforePlacementValidator(cart->{
Objects.requireNonNull(cart.getValue(),"Value is NULL for label '"+cart.getLabel()+"'");
});
// Ошибки выводим в стандартный поток ошибок
greetingAggregator.scrapConsumer(bin->System.err.println(bin)).set();

И немного доработаем наш строитель, добавим в него простую валидацию и метод toString для облегчения диагностики

// Сеттер для приветствия
public void greeting(String value) {
requireNonBlank(value,"Greeting is blank");
greeting = value;
}
// Сеттер для имени
public void name(String value) {
requireNonBlank(value,"Name is blank");
name = value;
}
// Простой валидатор проверяет, что строки содержат не пустое значение
private static void requireNonBlank(String value, String message) {
if(value.isBlank()) throw new RuntimeException(message);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("GreetingBuilder{");
sb.append("greeting='").append(greeting).append('\'');
sb.append(", name='").append(name).append('\'');
sb.append('}');
return sb.toString();
}

Теперь, если мы попробуем послать null или строку состоящую из пробелов в качестве значения, эти сообщение будут отвергнуты.

greetingAggregator.part().id(correlationID2).label(GREETING).value(null).place();
greetingAggregator.part().id(correlationID2).label(NAME).value("   ").place();

Конвейер выдаст в поток ошибок сообщения типа

ScrapBin [CART_REJECTED conveyor=AssemblingConveyor 15 key=dd1c5180-7961-44d2-ac84-6087b0784414: Value is NULL for label 'GREETING'; PART [key=dd1c5180-7961-44d2-ac84-6087b0784414, value=null, label=GREETING, expirationTime=0] error=Value is NULL for label ‘GREETING']
ScrapBin [DATA_REJECTED conveyor=AssemblingConveyor 15 key=dd1c5180-7961-44d2-ac84-6087b0784414: Site Processor failed; BuildingSite [builder=GreetingBuilder{greeting='null', name='null'}, initialCart=PART [key=dd1c5180-7961-44d2-ac84-6087b0784414, value=, label=NAME, expirationTime=0], acceptCount=0, builderCreated=1663276732584, builderExpiration=0, status=INVALID, unexpireable, lastError=java.lang.RuntimeException: Name is blank, eventHistory={NAME=0}] error=Name is blank]

Обработчик остатков от сборки предоставляет доступ к ошибке, блоку данных, вызвавшему проблему, а так же экземпляру строителя со всей накопленной на данный момент информацией.

Управление жизненным циклом

Что произойдет, если мы в текущем примере пошлем только одно сообщение из двух ожидаемых? Приложение окажется заблокированным вечным ожиданием получения второго блока данных. С практической точки зрения, если нам известно, что сборка с определённым CORRELATION_ID не может быть завершена, проще всего сделать отмену.

greetingAggregator.command().id(correlationID).cancel();

Здесь мы видим новый способ общения с конвейером - команда. Команды отличаются от прочих сообщений тем, что всегда имеют максимальный приоритет и отдельную очередь обработки. Команда отмены немедленно удаляет экземпляр строителя. Если вы хотите произвести какие либо действия связанные с отменой, например, совершить действия предусмотренные обработчиком остатков, создайте специальную бирку для обработки отмены. Например, так:

CANCEL((a,reason)->{throw new RuntimeException("Greeting aggregation has been canceled. Reason: '"+reason+"'");})

Отмена будет выглядеть так:

greetingAggregator.part().id(correlationID).label(CANCEL).value("for the demo").place();

Соответствующий вывод в поток ошибок:

ScrapBin [DATA_REJECTED conveyor=AssemblingConveyor 15 key=14e8c712-97d8-4d31-8ea9-9123e61913c5: Site Processor failed; BuildingSite [builder=GreetingBuilder{greeting='null', name='null'}, initialCart=PART [key=14e8c712-97d8-4d31-8ea9-9123e61913c5, value=for the demo, label=CANCEL, expirationTime=0], acceptCount=0, builderCreated=1663607396196, builderExpiration=0, status=INVALID, unexpireable, lastError=java.lang.RuntimeException: Greeting aggregation has been canceled. Reason: 'for the demo', eventHistory={CANCEL=0}] error=Greeting aggregation has been canceled. Reason: 'for the demo']

Вышеописанный подход содержит в себе проблему. Он возлагает на разработчика полную ответственность за мониторинг всех событий связанных с конкретной сборкой. Дело в том, что даже если известно, что какой-то канал выдал ошибку, другие каналы могут не знать об этом, и продолжать слать свои сообщения. Более того, эти сообщения могут быть уже посланы и кешированы входящей очередью. Если мы сконструировали наш конвейер так как это сделано в данных примерах, то даже если мы отменим сборку, первое же сообщение с тем же самым CORRELATION_ID восстановит строительство, но оно, конечно, не будет содержать необходимых данных, а лишь те, которые пришли после отмены.

Если мы можем четко определить событие начала сборки, мы можем послать команду создать экземпляр строителя в явном виде, передав ему в качестве параметра Supplier строителя.

// Создаем экземпляр конвейера
Conveyor<UUID, GreetingLabel, String> greetingAggregator = new AssemblingConveyor<>();
// Ошибки выводим в стандартный поток ошибок
greetingAggregator.scrapConsumer(bin->System.err.println(bin)).set();
// Продукт выводим в стандартный поток вывода
greetingAggregator.resultConsumer(bin->System.out.println(bin.product)).set();
// Задаем предикат готовности
greetingAggregator.setReadinessEvaluator(Conveyor.getTesterFor(greetingAggregator).accepted(GREETING,NAME));
// Тестируем:
// Используем случайный CORRELATION_ID
UUID correlationID = UUID.randomUUID();
// Создаем экземпляр загрузчика строителя. Он может быть использован повторно
var builderLoader = greetingAggregator.build().supplier(GreetingBuilder::new);
// Первым посылаем запрос создать экземпляр строителя
builderLoader.id(correlationID).create();
// Посылаем данные.
greetingAggregator.part().id(correlationID).label(GREETING).value("Hello").place();
greetingAggregator.part().id(correlationID).label(NAME).value("World").place();
// Первым посылаем запрос создать экземпляр строителя
builderLoader.id(correlationID).create();
greetingAggregator.part().id(correlationID).label(GREETING).value("Hi").place();
greetingAggregator.part().id(correlationID).label(CANCEL).value("for the demo").place();
greetingAggregator.part().id(correlationID).label(NAME).value("Planet").place();
// Ждем завершения всех активных сборок и останавливаем конвейер
greetingAggregator.completeAndStop().join();

Вывод

Hello, World!
ScrapBin [DATA_REJECTED conveyor=AssemblingConveyor 15 key=a111c7ea-d19a-4f3f-b77a-bf5c8ae34249: Site Processor failed; BuildingSite [builder=GreetingBuilder{greeting='Hi', name='null'}, initialCart=BUILDER [key=a111c7ea-d19a-4f3f-b77a-bf5c8ae34249, value=com.aegisql.demo.demo_05.Demo$$Lambda$112/[email protected], label=null, expirationTime=0], acceptCount=1, builderCreated=1663709969467, builderExpiration=0, status=INVALID, unexpireable, lastError=java.lang.RuntimeException: Greeting aggregation has been canceled. Reason: 'for the demo', eventHistory={null=0, GREETING=1}] error=Greeting aggregation has been canceled. Reason: 'for the demo']
ScrapBin [GENERAL_FAILURE conveyor=AssemblingConveyor 15 key=a111c7ea-d19a-4f3f-b77a-bf5c8ae34249: Cart Processor Failed; PART [key=a111c7ea-d19a-4f3f-b77a-bf5c8ae34249, value=Planet, label=NAME, expirationTime=0] error=Builder Supplier is not set]

Отличие от первого примера в том, что мы не стали устанавливать дефолтный Supplier. Вместо этого, в начале сборки мы посылаем сообщение с инструкцией как его получить. Соответственно, первый набор данных отрабатывается как и раньше, выдавая сообщение Hello, World!, а во втором, блок с биркой NAME был помещен в очередь уже после отмены, не сможет создать экземпляр строителя, и будет обработан сборщиком остатков.

Что если мы не можем четко определить событие начала сборки? В этом случае можно определить максимальное время ожидания. Добавим следующую строчку к конфигурации конвейера

// Задаем время таймаута
greetingAggregator.setDefaultBuilderTimeout(Duration.ofSeconds(3));

И посылаем неполные данные

// Используем случайный CORRELATION_ID
UUID correlationID = UUID.randomUUID();
// Посылаем частичные данные.
greetingAggregator.part().id(correlationID).label(GREETING).value("Hi").place();
// Ждем завершения всех активных сборок и останавливаем конвейер. Конвейер не будет заблокирован
greetingAggregator.completeAndStop().join();

Вывод, который последует приблизительно через три секунды:

ScrapBin [BUILD_EXPIRED conveyor=AssemblingConveyor 15 key=1ee13787-ab35-4991-94f9-d3f249b43d59: Site expired. No timeout action; BuildingSite [builder=GreetingBuilder{greeting='Hi', name='null'}, initialCart=PART [key=1ee13787-ab35-4991-94f9-d3f249b43d59, value=Hi, label=GREETING, expirationTime=0], acceptCount=1, builderCreated=1663711576971, builderExpiration=1663711579971, status=TIMED_OUT, delay=-35, eventHistory={GREETING=1}]]

Можно задать таймаут и по-другому. Если требуется более индивидуальный подход, время таймаута можно включить непосредственно в загрузчик данных или строителя.

var builderLoader = greetingAggregator
.build()
.supplier(GreetingBuilder::new)
.ttl(Duration.ofSeconds(3));
// Или
greetingAggregator
.part()
.id(correlationID)
.label(GREETING)
.value("Hello")
.ttl(Duration.ofSeconds(3))
.place();

Можно задать и абсолютный момент времени.

// Вычисляем начало следующего дня
Instant midnight = LocalDate.now().atStartOfDay(ZoneId.systemDefault()).plusDays(1).toInstant();
// Сборка завершится в полночь
greetingAggregator
.part()
.id(correlationID)
.label(GREETING)
.value("Hello")
.expirationTime(midnight)
.place();

До сих пор мы рассматривали пример где вся конфигурация агрегации задавалась на уровне конвейера. Это было просто, но иногда нам может потребоваться дополнительная гибкость. Давайте рассмотрим примеры где управление сборкой осуществляется самими полученными данными. Для этого строитель может быть декорирован несколькими вспомогательными интерфейсами.

Пусть задачей следующего примера будет получать от пользователей произвольные строки и соединять их пробелами. Критерием окончания ввода будет введение пустой строки, либо по истечении 5 секунд с момента последнего ввода. Если пользователь ввел хоть одну строку это должно рассматриваться как успех, если ни одной строку так и не было послано - как ошибка.

Первый из интерфейсов - Testing с методом boolean test(). Метод задает предикат готовности. Если строитель имплементирует этот интерфейс, то он и будет решать когда строитель готов произвести продукт на основе собранных данных.

Второй - TimeoutAction с методом void onTimeout(). В этом методе можно произвести некоторые действия на случай возникновения тайм-аута. Если это требуется, конечно. Если метод onTimeout определен, то сразу после него будет вызван метод test(). И если он вернет true, то агрегатор перейдет к сборке продукта. В противном случае, возникнет ошибка тайм-аута.

Третий интерфейс - Experiable с методом long getExpirationTime(). Метод должен вернуть значение в миллисекундах абсолютного времени наступления тайм-аута.

Возможная имплементация класса строителя:

public class ConcatStringBuilder implements Supplier<String>, TimeoutAction, Testing, Expireable {
// Накапливаем результат в буфере
StringBuilder sb = new StringBuilder();
// Состояние готовности
boolean ready = false;
long timestamp;
// Время таймаута
private long expirationTime = System.currentTimeMillis() + 5000;
// Фабрика продукта
@Override
public String get() {
return sb.toString();
}
// Сеттер для частей
public void stringPart(String part) {
// Согласно требованию, пустая строка является признаком окончания ввода
if(part.isEmpty()) {
// Готово
ready = true;
} else {
// Добавляем пробел
if (!sb.isEmpty()) {
sb.append(" ");
}
// Присоединяем часть
sb.append(part);
}
// Сдвигаем время таймаута на 5 секунд в будущее
expirationTime = System.currentTimeMillis() + 5000;
}
// Метод интерфейса TimeoutAction
@Override
public void onTimeout() {
// Вычисляем задержку
long delta = System.currentTimeMillis() - expirationTime;
if(delta > 0) { // Истинный таймаут
// Готово если есть накопленные строки
ready = ! sb.isEmpty();
} else { // expirationTime успел обновиться раньше срабатывания таймера
// ждем еще delta миллисекунд
expirationTime = System.currentTimeMillis() - delta;
}
}
// Предикат готовности, метод интерфейса Testing
@Override
public boolean test() {
return ready;
}
// метод интерфейса Expireable возвращает время наступления таймаута
@Override
public long getExpirationTime() {
return expirationTime;
}
}

Единственный тип данных в этом примере это строки. Обработка особого случая пустой строки происходит в том же методе где и все прочие строки. Поэтому, бирка выглядит просто.

public enum StringPartLabel implements SmartLabel<ConcatStringBuilder> {
PART; // Только один тип блоков. Дифференциация производится самим строителем
@Override
public BiConsumer<ConcatStringBuilder, Object> get() {
return (builder,part)->builder.stringPart(((String)part).trim());
}
}

Наконец, создаем агрегатор и тестируем:

// Будем читать строки из стандартного ввода
Scanner scanner = new Scanner(System.in);
// Создаем конвейер
AssemblingConveyor<String,StringPartLabel,String> stringConcatenator = new AssemblingConveyor<>();
// Даем конвейеру имя
stringConcatenator.setName("string_concatenator");
// Задаем метод создания строителя
stringConcatenator.setBuilderSupplier(ConcatStringBuilder::new);
// Обработчик ошибок
stringConcatenator.scrapConsumer(bin->System.err.print(bin)).set();
// Обработчик продукта
stringConcatenator.resultConsumer(bin->System.out.print(bin.product)).set();
// Разрешаем динамически сдвигать время таймаута
stringConcatenator.enablePostponeExpiration(true);
// Разрешаем динамически сдвигать время таймаута если таймаут уже произошел
stringConcatenator.enablePostponeExpirationOnTimeout(true);
// Читаем ввод
String nextLine = "next";
while ( true ) {
// Получаем строку
nextLine = scanner.nextLine();
// Посылаем конвейеру
stringConcatenator.part().id("test").value(nextLine).label(PART).place();
// Обработаем только одну серию строк. Достаточно для теста
if(nextLine.isBlank()) break;
}
// Подождем окончания обработки
stringConcatenator.completeAndStop().join();

Ввод/Вывод:

1 <CR>
2 <CR>
3 <CR>
4 <CR>
<CR>
1 2 3 4
Process finished with exit code 0

Можно поэкспериментировать с кодом, посмотреть как отодвигается время тайм-аута в зависимости от активности пользователя.

Приоритеты

Выше обсуждался случай обработки данных из перегруженного агрегатора, когда для улучшения отзывчивости системы полезно устанавливать приоритет извлечения из очереди. Для того чтобы конвейер начал понимать приоритет блока данных, устанавливаемый пользователем, он должен получить в явном виде тип очереди с поддержкой приоритетов.

// Создаем конвейер с поддержкой приоритетов
AssemblingConveyor<String,StringPartLabel,String> stringConcatenator = new AssemblingConveyor<>(PriorityBlockingQueue::new);
// Даем конвейеру имя
stringConcatenator.setName("string_concatenator");
// Задаем метод создания строителя
stringConcatenator.setBuilderSupplier(ConcatStringBuilder::new);
// Обработчик ошибок
stringConcatenator.scrapConsumer(bin->System.err.print(bin)).set();
// Обработчик продукта
stringConcatenator.resultConsumer(bin->System.out.print(bin.product)).set();
// Разрешаем динамически сдвигать время таймаута
stringConcatenator.enablePostponeExpiration(true);
// Разрешаем динамически сдвигать время таймаута если таймаут уже произошел
stringConcatenator.enablePostponeExpirationOnTimeout(true);
// Приостановить конвейер
stringConcatenator.suspend();
// Помещаем данные в очередь в обратном порядке, но с приоритетами
stringConcatenator.part().id("test").value("three").priority(1).label(PART).place();
stringConcatenator.part().id("test").value("two").priority(2).label(PART).place();
stringConcatenator.part().id("test").value("one").priority(3).label(PART).place();
// Восстанавливаем чтение из очереди
stringConcatenator.resume();
// Завершаем ввод
stringConcatenator.part().id("test").value("").priority(0).label(PART).place();
// Ждем окончания обработки
stringConcatenator.completeAndStop().join();

Вывод:

one two three

Обратите, так же, внимание на методы suspend/resume, позволяющие временно приостановить/возобновить чтение данных из очереди.

Персистенс

Если прервать выполнение теста до наступления тайм-аута, то все накопленные в памяти данные будут потеряны. RACE имеет собственную поддержку персистенс на базе произвольной БД с поддержкой JDBC и достаточно общими требованиями к работе с таблицами. Пользователь может имплементировать персистенс для уже имеющейся в наличии БД, либо воспользоваться одной из поддерживаемых конфигураций. На настоящий момент поддерживаются MySQL, MariaDB, PostgreSQL, SQLITE и DerbyDB. При наличии достаточных прав вся конфигурация БД, таблиц и индексов может производиться автоматически. Это удобно для тестирования и автоматизации развертывания программ.

Для включения персистенс для предыдущего примера не требуется никаких изменений классов строителя и бирки, и минимальные изменения в демо примере. Воспользуемся конфигурацией на базе SQLITE. Если у вас есть БД клиент можно открыть им файл string_parts, если вас интересуют детали хранения промежуточных строк.

// Будем читать строки из стандартного ввода
Scanner scanner = new Scanner(System.in);
// Контейнер для correlationId
AtomicInteger correlationId = new AtomicInteger(0);
// Создаем персистенс для временного хранения частей
JdbcPersistence<Integer> persistence = JdbcPersistenceBuilder
.presetInitializer("sqlite", Integer.class) // на базе SQLITE
.database("string_parts") // имя базы данных
.autoInit(true) // создать БД и необходимые таблицы и индексы автоматически
.setArchived() // при архивации устанавливать флаг ARCHIVED
.maxBatchSize(1) // Частота работы архиватора для теста должна быть максимальной
.labelConverter(StringPartLabel.class) // конвертер в/из строки для бирок
.build(); // создать персистенс
// Создаем конвейер
AssemblingConveyor<Integer, StringPartLabel,String> c = new AssemblingConveyor<>();
// Задаем метод создания строителя
c.setBuilderSupplier(ConcatStringBuilder::new);
// Обработчик ошибок - по окончании увеличиваем на единицу correlationId
c.scrapConsumer(bin->System.err.print(bin)).andThen(bin->correlationId.incrementAndGet()).set();
// Обработчик продукта - по окончании увеличиваем на единицу correlationId
c.resultConsumer(bin->System.out.print(bin.product)).andThen(bin->correlationId.incrementAndGet()).set();
// Разрешаем динамически сдвигать время таймаута
c.enablePostponeExpiration(true);
// Разрешаем динамически сввигать время таймаута если таймаут уже произошел
c.enablePostponeExpirationOnTimeout(true);
// Создаем PersistentConveyor
PersistentConveyor<Integer, StringPartLabel, String> stringConcatenator = persistence.wrapConveyor(c);
// Даем имя конвейеру
stringConcatenator.setName("string_concatenator");
// Ищем сохраненный незавершенный correlationId или устанавливаем 1
Integer lastKey = persistence.getAllParts().stream().map(Cart::getKey).findFirst().orElse(1);
// Устанавливаем текущий correlationId
correlationId.set(lastKey);
// Читаем ввод
String nextLine = "next";
while ( true ) {
// Получаем строку
nextLine = scanner.nextLine();
// Посылаем конвейеру - возвращаемое Future возвращает true после успешной обработки очередного блока
CompletableFuture<Boolean> future = stringConcatenator.part().id(correlationId.get()).value(nextLine).label(PART).place();
// Обработаем только одну серию строк. Достаточно для теста
if(nextLine.isBlank()) {
future.join(); // Подождем окончания обработки последнего блока
break; // Выход
}
}
// Безусловная остановка конвейера.
// Все незавершенные сборки будут переданы обработчику остатков
stringConcatenator.stop();

Тестирование будет состоять из двух этапов. На первом этапе мы введем несколько строк, после чего, не дожидаясь тайм-аута, прервем выполнение теста. Затем мы повторно запустим тест и продолжим ввод. После окончания результат должен содержать оба набора токенов.

1 <CR>
1 <CR>
1 <CR>
1 <CR>
1 <CR>

Process finished with exit code 130 (interrupted by signal 2: SIGINT)
2 <CR>
2 <CR>
2 <CR>
2 <CR>
2 <CR>
<CR>
1 1 1 1 1 2 2 2 2 2
Process finished with exit code 0

Как видно, работа с персистенс очень проста, Любой уже имеющийся конвейер просто “оборачивается” в сконфигурированный экземпляр персистенс. И полученный таким образом конвейер автоматически начинает сохранять все полученные блоки в БД и хранит их там до успешного окончания сборки.

В базе данных информация хранится в двоичном виде. Конвертация для всех примитивных типов, строк, коллекций, массивов и Serializable объектов происходит автоматически. Если этого не достаточно, пользователь может создать двоичный конвертер который лучше отвечает его потребностям. Все данные могут быть зашифрованы перед записью в БД.

Синхронизация

На пути между размещением блока данных во входящей очереди и обработкой этого блока методом строителя может произойти множество непредвиденных событий. Входящая очередь может оказаться переполненной, у приложения закончится свободная память, данные устареют по пути, строитель откажется принимать неожиданный тип или значение, агрегация может быть отменена, а конвейер остановлен. Клиент посылающий данные должен уметь обнаружить ситуацию, когда данные не достигают цели.

Загрузчики данных и команд возвращают CompletableFuture

CompletableFuture<Boolean> future = stringConcatenator.part().id(correlationId.get()).value(nextLine).label(PART).place();

Future позволяет синхронизировать обработку строительного блока с потоком из которого он был послан.

try {
Boolean dataProcessed = future.join(); // Подождем окончания обработки последнего блока
if (dataProcessed) {
System.out.println("Conveyor processed data");
} else {
System.err.println("Conveyor rejected data");
}
} catch (RuntimeException e) {
System.err.println("Conveyor failed processing data. "+e.getMessage());
}

Как правило, синхронизировать обработку каждого сообщения не требуется, поскольку это может резко снизить эффективность работы агрегатора. Имеет смысл дождаться результата последнего блока из бэтча данных. К этому моменту все ранее полученные Futures уже будут завершены и содержать данные, которые можно обработать эффективно, не блокируя работу потоков дольше необходимого.

Если каткой-то процесс начал агрегацию, то, с хорошей долей вероятности, он же заинтересован в получении результата. На этом месте чаще всего и спотыкаются многие другие имплементации агрегаторов. Простой пример - почти любое клиент-серверное приложение. Клиент отправляет запрос и он же ожидает ответ, причем здесь и сейчас, а не потом и не через колбэк. Тем более, что ответом, вообще говоря, может быть не только ожидаемый продукт, но и всевозможные исключения. Клиентов может быть много, а выходной поток у конвейера один. Не самое удобное для имплементации решение. Выходной поток не должен зависеть от количества и вида клиентов.

К счастью, конвейер разрешает любому клиенту запросить будущее продукта с известным CORRELATION_ID.

// Создаем экземпляр конвейера
Conveyor<UUID, GreetingLabel, String> greetingAggregator = new AssemblingConveyor<>();
// Продукт будем получать через Future
greetingAggregator.resultConsumer(IgnoreResult.of(greetingAggregator)).set();
// Задаем предикат готовности
greetingAggregator.setReadinessEvaluator(Conveyor.getTesterFor(greetingAggregator).accepted(GREETING,NAME));

// Тестируем:
// Используем случайный CORRELATION_ID
UUID correlationID = UUID.randomUUID();
// Инициируем агрегацию
greetingAggregator.build().id(correlationID).supplier(GreetingBuilder::new).create();
// Запрашиваем будущее
CompletableFuture<String> greetingFuture = greetingAggregator.future().id(correlationID).get();
// Посылаем данные.
greetingAggregator.part().id(correlationID).label(GREETING).value("Hello").place();
greetingAggregator.part().id(correlationID).label(NAME).value("World").place();
// Ждем результат
String greeting = greetingFuture.join();
// Используем
System.out.println(greeting);
// Останавливаем конвейер
greetingAggregator.stop();

Количество Futures затребованных разными потоками может быть произвольным, и все они получат результат в свое распоряжение.

Дубликаты и уведомления

Допустим, нам необходимо накапливать численные данные в виде их суммы. Строитель для такой задачи чрезвычайно прост.

public class SummaBuilder implements Supplier<Double> {
// Храним сумму
private double summa = 0;
// Возвращаем сумму
@Override
public Double get() {
return summa;
}
// Добавляем значение
public void add(Double x) {
summa += x;
}
}

Соответственно, бирки для конвейера

public enum SummaLabel implements SmartLabel<SummaBuilder> {
ADD_VALUE(SummaBuilder::add) // Добавить значение
,DONE((b,v)->{/*do nothing*/}) // Закончить суммирование
;
private final BiConsumer<SummaBuilder, Object> consumer;
<T> SummaLabel(BiConsumer<SummaBuilder, T> consumer) {
this.consumer = (BiConsumer<SummaBuilder, Object>) consumer;
}
@Override
public BiConsumer<SummaBuilder, Object> get() {
return consumer;
}
}

Предположим, что канал ввода не устойчив, и в случае сбоя ввод возобновляется с произвольной позиции. Для получения правильной суммы это не приемлемо. Каждое значение должно быть обработано ровно один раз. Очевидно, что мы должны каким-то образом сохранить и обработать идентификатор позиции каждого блока данных. Для этого мы можем воспользоваться дополнительными атрибутами, а так же механизмами валидации и уведомления.

Создадим фильтр способный хранить идентификаторы значений текущих сборок:

public class DuplicatesFilter {
// Имя параметра используемое загрузчиком
public static String VALUE_ID = "VALUE_ID";
// Храним уникальные VALUE_ID для каждого CORRELATION_ID
private Map<Integer, Set<Integer>> valueIds = new HashMap<>();
// Валидатор
public Consumer<Cart<Integer, ?, SummaLabel>> getValidator() {
return cart->{
Integer valueId = cart.getProperty(VALUE_ID, Integer.class);
if(valueId == null) {
return; // Игнорируем данные не помеченные VALUE_ID
}
Integer key = cart.getKey(); // CORRELATION_ID
Set<Integer> ids = valueIds.computeIfAbsent(key, k -> new HashSet<>());
if(ids.contains(valueId)) { // Дубликат обнаружен
throw new RuntimeException("Duplicate found for "+key+"#"+valueId+" value="+cart.getValue());
} else { // Сохранить VALUE_ID
ids.add(valueId);
}
};
}
// Действие после завершения суммирования
public Consumer<AcknowledgeStatus<Integer>> getAcknowledge() {
return status->{
// Суммирование завершено. Удаляем накопленные VALUE_IDs для данного CORRELATION_ID
valueIds.remove(status.getKey());
};
}
}

Мы уже рассматривали пример валидатора, но, в данном случае, не достаточно просто сохранять уже обработанные идентификаторы. Так как это приведет к неконтролируемому расходу памяти. После окончания сборки необходимо подчистить ресурсы и, возможно, послать уведомление источнику о статусе завершения подсчета суммы. Этом и занимается консьюмер возвращаемый методом getAcknowledge().

Конфигурация конвейера и тест выглядят следующим образом:

// Создаем экземпляр фильтра
DuplicatesFilter filter = new DuplicatesFilter();
// Создаем конвейер
AssemblingConveyor<Integer,SummaLabel,Double> summator = new AssemblingConveyor<>();
// Устанавливаем имя
summator.setName("summator");
// Метод создания строителя
summator.setBuilderSupplier(SummaBuilder::new);
// Обработчик продукта
summator.resultConsumer(bin->System.out.println("Summa["+bin.key+"] = "+bin.product)).set();
// Обработчик ошибок
summator.scrapConsumer(bin->System.err.println(bin)).set();
// Добавляем фильтр не пустых значений. Строитель не проверяет на null
summator.addCartBeforePlacementValidator(CommonValidators.CART_VALUE_NOT_NULL());
// Добавляем фильтр дубликатов
summator.addCartBeforePlacementValidator(filter.getValidator());
// Добавляем уведомление по окончании агрегации
summator.addBeforeKeyEvictionAction(filter.getAcknowledge());
// Заканчиваем когда получили бирку DONE
summator.setReadinessEvaluator(Conveyor.getTesterFor(summator).accepted(DONE));
// Получаем загрузчики
var loader = summator.part().label(ADD_VALUE);
var done = summator.part().label(DONE).value(0);
// Посылаем null
loader.id(1).value(null).place();
// Посылаем данные с дубликатом без VALUE_ID
loader.id(1).value(0.1).place();
loader.id(1).value(0.2).place();
loader.id(1).value(0.2).place(); // дубликат будет просуммирован
loader.id(1).value(0.3).place();
done.id(1).place();
// Данные помеченные VALUE_ID будут проверены на наличие дубликатов
loader.id(2).value(0.1).addProperty(VALUE_ID,10001).place();
loader.id(2).value(0.2).addProperty(VALUE_ID,10002).place();
loader.id(2).value(0.2).addProperty(VALUE_ID,10002).place(); // дубликат будет отвергнут фильтром
loader.id(2).value(0.3).addProperty(VALUE_ID,10003).place();
done.id(2).place().join();
// Останавливаем конвейер
summator.stop();

Вывод. Первая сумма посчитана неверно. Во втором случае дубликат отвергнут и сумма верна. Значение null отвергнуто:

ScrapBin [CART_REJECTED conveyor=summator key=1: Cart is null; PART [key=1, value=null, label=ADD_VALUE, expirationTime=0] error=Cart is null]
ScrapBin [CART_REJECTED conveyor=summator key=2: Duplicate found for 2#10002 value=0.2; PART [key=2, value=0.2, label=ADD_VALUE, expirationTime=0, properties={VALUE_ID=10002}] error=Duplicate found for 2#10002 value=0.2]
Summa[1] = 0.8
Summa[2] = 0.6000000000000001

За действие в ответ на окончание сборки отвечает алгоритм установленный методом addBeforeKeyEvictionAction(…). Метод получает консьюмер со статусом окончания. Статус содержит ключ, флаг статуса и все атрибуты только-что завершенной сборки. Таким же образом можно добавить нотификатор уведомляющий источник об успешном, или не успешном завершении обработки очередного CORRELATION_ID. Как и в случае с фильтрами-валидаторами можно добавлять неограниченное количество нотификаторов.

Присматриваем за агрегацией

Все предыдущие примеры рассматривали агрегатор и его строители как черные ящики. В которые как-то попадают данные и из которых рано или поздно появляется требуемый продукт. В реальной жизни иногда хотелось бы иметь небольшое окошко, чтобы можно было присматривать за тем, что происходит внутри. В нашем примере с сумматором, представим, нам хотелось бы знать, достаточно ли велика накопленная сумма. Например, чтобы понять насколько эффективен источник данных. Это может быть довольно актуально, если сборка занимает не мгновения, как в тесте, а, например, одни сутки. Если присмотреться к строителю, он способен произвести результат практически в любой момент. Для этого нет никаких препятствий. А раз так, мы можем попросить конвейер исполнить для нас метод get() строителя и посмотреть что он вернет. Есть синхронный и асинхронный вариант этой команды. Рассмотрим синхронный:

var peekValue = summator.command().id(2).peek().join();
System.out.println("Текущее значение суммы для key="+peekValue.key+": "+peekValue.product);

И вывод:

Текущее значение суммы для key=2: 0.30000000000000004

Получение результата не может быть гарантировано. К моменту обработки команды сборка может или еще, или уже не существовать. Неполные данные, в общем случае, могут приводить к самым непредсказуемым ошибкам, которые, впрочем, не влияют на дальнейший процесс агрегации.

Дубликаты и персистенс

Особый механизм контроля за дубликатами предоставляет персистенс. Поскольку поддержка персистенс делегирована произвольной БД имеющей JDBC коннектор, можно задействовать такие возможности реляционных, и не только, баз данных как уникальные индексы на определенные поля. Персистенс сохраняет все дополнительные атрибуты сообщения в виде JSON строки, но так же позволяет выделить отдельный атрибут в свою собственную колонку, и установить индекс на нее.

Пример:

JdbcPersistence<Integer> persistence = JdbcPersistenceBuilder
.presetInitializer("sqlite", Integer.class) // на базе SQLITE
.database("summa_values") // имя базы данных
.autoInit(true) // создать БД и необходимые таблицы и индексы автоматически
.deleteArchiving() // при архивации удалять все записи
.maxBatchSize(1) // Частота работы архиватора для теста должна быть максимальной
.labelConverter(SummaLabel.class) // конвертер в/из строки для бирок
.addField(Integer.class,VALUE_ID) // Добавляем колонку для хранения параметра VALUE_ID
.addUniqueFields(VALUE_ID) // Устанавливаем требование уникальности для VALUE_ID
.build(); // создать персистенс
persistence.archiveAll(); // для теста начинаем с чистой БД
// Создаем конвейер
AssemblingConveyor<Integer,SummaLabel,Double> summator = new AssemblingConveyor<>();
// оборачиваем конвейер в персистенс
var persistentSummator = persistence.wrapConveyor(summator);
// Устанавливаем имя
persistentSummator.setName("summator");
// Метод создания строителя
persistentSummator.setBuilderSupplier(SummaBuilder::new);
// Обработчик продукта
persistentSummator.resultConsumer(bin->System.out.println("Summa["+bin.key+"] = "+bin.product)).set();
// Обработчик ошибок
persistentSummator.scrapConsumer(bin->System.err.println("Error key="+bin.key+": "+bin.error)).set();
// Добавляем фильтр не пустых значений
persistentSummator.addCartBeforePlacementValidator(CommonValidators.CART_VALUE_NOT_NULL());
// Заканчиваем когда получили бирку DONE
persistentSummator.setReadinessEvaluator(Conveyor.getTesterFor(summator).accepted(DONE));
// Используем загрузчики
var loader = persistentSummator.part().label(ADD_VALUE);
var done = persistentSummator.part().label(DONE).value(0);

// Данные помеченные VALUE_ID будут проверены на наличие дубликатов перед сохранением в БД
loader.id(1).value(0.1).addProperty(VALUE_ID,10001).place();
loader.id(1).value(0.2).addProperty(VALUE_ID,10002).place();
// Дубликат будет отвергнут до записи в БД установленным уникальным индексом
loader.id(1).value(0.2).addProperty(VALUE_ID, 10002).place();
loader.id(1).value(0.3).addProperty(VALUE_ID,10003).place();
// Ждем окончания. Требование уникального VALUE_ID распространяется на все бирки и диктуется схемой БД
done.id(1).addProperty(VALUE_ID,10004).place().join();
// Останавливаем конвейер
persistentSummator.stop();

Вывод. Дубликат отвергнут БД, сумма посчитана верно:

Error key=1: com.aegisql.conveyor.persistence.core.PersistenceException: com.aegisql.conveyor.persistence.core.PersistenceException: Failed executing INSERT INTO PART(ID,LOAD_TYPE,CART_KEY,CART_LABEL,CREATION_TIME,EXPIRATION_TIME,CART_VALUE,CART_PROPERTIES,VALUE_TYPE,PRIORITY,VALUE_ID) VALUES (?,?,?,?,?,?,?,?,?,?,?); [SQLITE_CONSTRAINT_UNIQUE]  A UNIQUE constraint failed (UNIQUE constraint failed: PART.VALUE_ID)
Summa[1] = 0.6000000000000001

Значение атрибута VALUE_ID передается в собственную колонку и на нее установлен UNIQUE constraint.

Что дальше?

Поскольку данная часть не является заменой для руководства пользователя, а служит для иллюстрации обсуждаемого выше шаблона агрегатора и его проблем, за рамками данной статьи остались многие возможности по конфигурированию и использованию RACE. Обратитесь к документации и примерам на сайте проекта.

⚠️ **GitHub.com Fallback** ⚠️