Мы используем Hangfire в Assetbots управлять и координировать всю нашу фоновую обработку и обработку событий. В то время как Hangfire поставляется с множеством отличных функций из коробки, ему не хватает возможности Расписание отсроченных заданий в определенной очереди Анкет К счастью для нас, архитектура Hangfire чрезвычайно проста и расширяется, поэтому с небольшим количеством пользовательского кода мы можем самостоятельно реализовать эту функцию.
Работа в качестве государственных машин
По своей сути, Hangfire рассматривает работу как отдельные государственные машины. Есть несколько Встроенные состояния что система поставляется с их соответствующими обработчиками. Обработчик каждого штата несет ответственность за переход рабочих мест в это штат и обратно в хранилище.
Например, EnqueuedState
обработчик добавляет задания в соответствующую очередь в хранении. Сравните это с ПЕРЕКЛЮЧАТЕЛЬНАЯ СТАТА
обработчик , который устанавливает временную метку на пользовательскую запланировано
Ключ метаданных в хранении, который указывает, когда задание должна быть включена.
Фильтры работы как промежуточное программное обеспечение государственного машины
Еще одна основная особенность архитектуры Hangfire — это Цепочка ответов трубопровод. Этот конвейер обработки имеет несколько этапов, которые можно перехватить с помощью Фильтры работы . Каждый фильтр может работать и изменить поведение работы в этой точке трубопровода. Может применяться несколько фильтров, каждый из которых работает независимо, и каждый применяется на разных уровнях детализации (например, на уровне задания или метода, на уровне класса или по всей системе).
Используя фильтры, вы можете расширить Hangfire для реализации таких вещей, как регистрация каждого перехода штата или отчетность невозможных ошибок в Bugsnag или Часовой Анкет
Использование состояний + фильтры для планирования заданий в определенной очереди
Как вы можете себе представить, есть много возможностей, предоставляемых этими двумя простыми примитивами. В любом случае, давайте использовать Пользовательское состояние работы и Пользовательский фильтр очереди Чтобы мы могли запланировать отложенные рабочие места в очереди по нашему выбору.
Нам нужно выполнить две вещи:
- Добавьте пользовательский кусок метаданных в запланированную работу, указывающую на то, на какую очередь мы хотим быть внедрены, когда мы переходим к
EnqueuedState
Анкет - Прочитайте наши пользовательские метаданные очереди при переходе из
ПЕРЕКЛЮЧАТЕЛЬНАЯ СТАТА
кEnqueuedState
и используя его для включения работы в правильную очередь.
ПЕРЕКЛЮЧЕНИЕ
Сердж-корабли с незасеченным состоянием, представляющим заложенные, запланированные задания. И, поскольку штаты в Санглом сохраняются, поскольку JSON использует Json.net , мы можем просто расширить этот класс и добавить в него пользовательское свойство. Это свойство будет сериализовано и де-сериализовано с использованием инфраструктуры сериализации JSON по умолчанию.
using System; using Hangfire.States; using Newtonsoft.Json; public sealed class ScheduledQueueState : ScheduledState { public ScheduledQueueState(TimeSpan enqueueIn) : this(DateTime.UtcNow.Add(enqueueIn), null) { } public ScheduledQueueState(DateTime enqueueAt) : this(enqueueAt, null) { } [JsonConstructor] public ScheduledQueueState(DateTime enqueueAt, string queue) : base(enqueueAt) { this.Queue = queue?.Trim(); } public string Queue { get; } }
Этот класс чрезвычайно прост: расширить ПЕРЕКЛЮЧАТЕЛЬНАЯ СТАТА
и добавить Очередь
собственность при сохранении совместимости сериализации JSON. Далее нам нужно воспользоваться этим новым свойством при переходе в EnqueuedState
Анкет
Очередь
Наш Queuefilter
Класс сделает две вещи:
- Следите за работой, созданной в либо в
EnqueuedState
илиGraduledQueuestate
и захватить ихОчередь
собственность для хранения как - Используйте наш пользователь
Очередь
Параметр работы при переходе на (или избрать )EnqueuedState
Анкет
using System; using Hangfire.Client; using Hangfire.States; public sealed class QueueFilter : IClientFilter, IElectStateFilter { public const string QueueParameterName = "Queue"; public void OnCreated(CreatedContext filterContext) { } public void OnCreating(CreatingContext filterContext) { string queue = null; switch (filterContext.InitialState) { case EnqueuedState es: queue = es.Queue; break; case ScheduledQueueState sqs: queue = sqs.Queue; break; default: break; } if (!string.IsNullOrWhiteSpace(queue)) { filterContext.SetJobParameter(QueueFilter.QueueParameterName, queue); } } public void OnStateElection(ElectStateContext context) { if (context.CandidateState.Name == EnqueuedState.StateName) { string queue = context.GetJobParameter(QueueFilter.QueueParameterName)?.Trim(); if (string.IsNullOrWhiteSpace(queue)) { queue = EnqueuedState.DefaultQueue; } context.CandidateState = new EnqueuedState(queue); } } }
Опять же, этот класс довольно прост. Мы используем C#’S Соответствие шаблона Чтобы проверить, создается ли работа с любым из двух штатов, о которых нас волнует. Если это так, мы устанавливаем пользовательский параметр, который будет проходить с заданием через трубопровод.
Затем, когда система переходит на работу в новое государство, мы проверяем, является ли переходное состояние, в которое переводится, является EnqueuedState
Анкет Если это так, мы ищем наш ранее установленный пользовательский параметр и используем его для создания новой версии EnqueuedState
Чтобы перейти в вместо этого.
Соберите кусочки вместе
Теперь, когда у нас есть весь код, который нам нужен, как мы действительно подключим фильтр и воспользуемся нашими новыми GraduledQueuestate
? Во -первых, нам нужно зарегистрировать наш Queuefilter
в обработке Hangfire:
services.AddHangfire(configuration: (services, config) => { config.UseFilter(new QueueFilter()); });
Наконец, мы можем создать методы расширения, соответствующие Расписание
перегрузки что нам нужно. Например:
public static string Schedule( [NotNull] this IBackgroundJobClient client, [NotNull, InstantHandle] ExpressionmethodCall, TimeSpan delay, string queue) { if (client == null) { throw new ArgumentNullException(nameof(client)); } return client.Create(methodCall, new ScheduledQueueState(delay, queue)); }
Вышеупомянутое расширение позволит нам запланировать отложенные задания, которые не возвращают Задача
И не принимайте параметр на запланировано
очередь следующим образом:
// Acquire a reference to IBackgroundJobClient via dependency injection. private IBackgroundJobClient client; // When you want to schedule your job. this.client.Schedule( () => Console.Log("Hello, world!"), TimeSpan.FromDays(1), "scheduled");
Отсюда просто, как добавить любые другие перегрузки, которые вам могут понадобиться.
Завершая
Мы были довольны комбинацией легкой настройки и богатой расширяемой настройки и богатой расширяемой. Это стало важной частью нашей внутренней инфраструктуры, поскольку мы масштабируем наши потребности в фоновой обработке.
Оригинал: «https://dev.to/chadburggraf/how-to-schedule-delayed-jobs-on-a-specific-queue-with-hangfire-4cmc»