From e29a142935c9d53b9ec3829d2a142ac01c38ab9e Mon Sep 17 00:00:00 2001 From: glax Date: Thu, 22 Feb 2024 23:37:00 +0100 Subject: [PATCH] Files --- .gitignore | 5 + .idea/.idea.JobQueue/.idea/.gitignore | 13 +++ .idea/.idea.JobQueue/.idea/.name | 1 + .idea/.idea.JobQueue/.idea/encodings.xml | 4 + .idea/.idea.JobQueue/.idea/indexLayout.xml | 8 ++ .idea/.idea.JobQueue/.idea/vcs.xml | 6 ++ .idea/.idea.TaskQueue/.idea/workspace.xml | 85 +++++++++++++++++ JobQueue.sln | 16 ++++ JobQueue/Job.cs | 82 ++++++++++++++++ JobQueue/JobQueue.cs | 105 +++++++++++++++++++++ JobQueue/JobQueue.csproj | 13 +++ JobQueue/ProgressState.cs | 10 ++ JobQueue/ProgressToken.cs | 73 ++++++++++++++ 13 files changed, 421 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/.idea.JobQueue/.idea/.gitignore create mode 100644 .idea/.idea.JobQueue/.idea/.name create mode 100644 .idea/.idea.JobQueue/.idea/encodings.xml create mode 100644 .idea/.idea.JobQueue/.idea/indexLayout.xml create mode 100644 .idea/.idea.JobQueue/.idea/vcs.xml create mode 100644 .idea/.idea.TaskQueue/.idea/workspace.xml create mode 100644 JobQueue.sln create mode 100644 JobQueue/Job.cs create mode 100644 JobQueue/JobQueue.cs create mode 100644 JobQueue/JobQueue.csproj create mode 100644 JobQueue/ProgressState.cs create mode 100644 JobQueue/ProgressToken.cs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..add57be --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +bin/ +obj/ +/packages/ +riderModule.iml +/_ReSharper.Caches/ \ No newline at end of file diff --git a/.idea/.idea.JobQueue/.idea/.gitignore b/.idea/.idea.JobQueue/.idea/.gitignore new file mode 100644 index 0000000..4f2ca8f --- /dev/null +++ b/.idea/.idea.JobQueue/.idea/.gitignore @@ -0,0 +1,13 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Rider ignored files +/.idea.TaskQueue.iml +/modules.xml +/contentModel.xml +/projectSettingsUpdater.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/.idea.JobQueue/.idea/.name b/.idea/.idea.JobQueue/.idea/.name new file mode 100644 index 0000000..1e235f9 --- /dev/null +++ b/.idea/.idea.JobQueue/.idea/.name @@ -0,0 +1 @@ +JobQueue \ No newline at end of file diff --git a/.idea/.idea.JobQueue/.idea/encodings.xml b/.idea/.idea.JobQueue/.idea/encodings.xml new file mode 100644 index 0000000..df87cf9 --- /dev/null +++ b/.idea/.idea.JobQueue/.idea/encodings.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/.idea.JobQueue/.idea/indexLayout.xml b/.idea/.idea.JobQueue/.idea/indexLayout.xml new file mode 100644 index 0000000..7b08163 --- /dev/null +++ b/.idea/.idea.JobQueue/.idea/indexLayout.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/.idea.JobQueue/.idea/vcs.xml b/.idea/.idea.JobQueue/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/.idea.JobQueue/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/.idea.TaskQueue/.idea/workspace.xml b/.idea/.idea.TaskQueue/.idea/workspace.xml new file mode 100644 index 0000000..c268599 --- /dev/null +++ b/.idea/.idea.TaskQueue/.idea/workspace.xml @@ -0,0 +1,85 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 1708636916152 + + + + + + + + + \ No newline at end of file diff --git a/JobQueue.sln b/JobQueue.sln new file mode 100644 index 0000000..2318f27 --- /dev/null +++ b/JobQueue.sln @@ -0,0 +1,16 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JobQueue", "JobQueue\JobQueue.csproj", "{271080AB-869B-4377-9699-8DBBA3060EF2}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {271080AB-869B-4377-9699-8DBBA3060EF2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {271080AB-869B-4377-9699-8DBBA3060EF2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {271080AB-869B-4377-9699-8DBBA3060EF2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {271080AB-869B-4377-9699-8DBBA3060EF2}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection +EndGlobal diff --git a/JobQueue/Job.cs b/JobQueue/Job.cs new file mode 100644 index 0000000..c924081 --- /dev/null +++ b/JobQueue/Job.cs @@ -0,0 +1,82 @@ +using Microsoft.Extensions.Logging; + +namespace JobQueue; + +public abstract class Job : IComparable +{ + public ProgressToken ProgressToken { get; private set; } + public TimeSpan Interval { get; private set; } + public DateTime LastStarted { get; private set; } + public string JobId { get; init; } + public TimeSpan MaximumTimeBetweenUpdates { get; init; } + private readonly ILogger? _logger; + + public Job(string? jobId = null, ILogger? logger = null) : this (TimeSpan.Zero, jobId, logger) + { + } + + public Job(TimeSpan interval, string? jobId = null, ILogger? logger = null) : this(interval, TimeSpan.FromMinutes(3), 1, jobId, logger) + { + + } + + public Job(TimeSpan interval, TimeSpan maximumTimeBetweenUpdates, string? jobId = null, ILogger? logger = null) : this(interval, maximumTimeBetweenUpdates, 1, jobId, logger) + { + + } + + public Job(TimeSpan interval, TimeSpan maximumTimeBetweenUpdates, int steps, string? jobId = null, ILogger? logger = null) + { + this._logger = logger; + const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + this.ProgressToken = new ProgressToken(steps); + this.Interval = interval; + this.MaximumTimeBetweenUpdates = maximumTimeBetweenUpdates; + this.JobId = jobId ?? new string(Enumerable.Repeat(chars, 16).Select(s => s[Random.Shared.Next(s.Length)]).ToArray()); + this._logger?.LogDebug($"Created Job {jobId}"); + } + + protected abstract void Execute(CancellationToken cancellationToken); + + public void Start() + { + this.LastStarted = DateTime.UtcNow; + this.ProgressToken.Start(); + this._logger?.LogDebug($"Started Job {JobId}"); + this.Execute(ProgressToken._cancellationTokenSource.Token); + } + + public void Cancel() + { + this.ProgressToken.Cancel(); + this._logger?.LogDebug($"Stopped Job {JobId}"); + } + + public void Reset() + { + this.ProgressToken._cancellationTokenSource.Cancel(); + this.ProgressToken = ProgressToken.Clone(); + } + + public TimeSpan TimeToNextExecution() + { + return LastStarted.Add(Interval).Subtract(DateTime.UtcNow); + } + + internal bool OverTimelimit() + { + return this.ProgressToken.LastUpdate.Add(MaximumTimeBetweenUpdates) > DateTime.UtcNow; + } + + public override string ToString() + { + return $"Job: {JobId} Last Started: {LastStarted:dd/MM-HH:mm:ss.fff} Interval: {Interval:d-hh\\:mm\\:ss} {ProgressToken}"; + } + + public int CompareTo(object? obj) + { + if (obj is not Job other) + throw new ArgumentException("Type can not be compared", nameof(obj)); + return TimeToNextExecution().CompareTo(other.TimeToNextExecution()); + } +} \ No newline at end of file diff --git a/JobQueue/JobQueue.cs b/JobQueue/JobQueue.cs new file mode 100644 index 0000000..0accdab --- /dev/null +++ b/JobQueue/JobQueue.cs @@ -0,0 +1,105 @@ +using Microsoft.Extensions.Logging; + +namespace JobQueue; + +public class JobQueue : IDisposable where T : notnull +{ + private readonly Dictionary> _queues = new(); + public Dictionary> FailedJobs { get; init; } = new(); + private bool _running = true; + private readonly ILogger? _logger; + + public JobQueue(int checkIntervalMs, ILogger? logger = null) : this(TimeSpan.FromMilliseconds(checkIntervalMs), logger) + { + + } + + public JobQueue(TimeSpan checkInterval, ILogger? logger = null) + { + this._logger = logger; + while (_running) + { + foreach (T type in _queues.Where(q => !RunningJobsFor(q.Key).Any()).Select(kv => kv.Key)) + { + Job? startJob = WaitingJobsFor(type).Min(); + this._logger?.LogInformation($"Starting job: {startJob}"); + startJob?.Start(); + } + + foreach (Job job in _queues.Values.Select(queue => + queue.Select(job => job.ProgressToken.State is ProgressState.Finished or ProgressState.Cancelled))) + { + this._logger?.LogInformation($"Job finished: {job}"); + job.Reset(); + } + + foreach (T type in _queues.Keys) + { + HashSet failed = FailedJobsFor(type).ToHashSet(); + if (!failed.Any()) + break; + this._logger?.LogInformation($"Failed Jobs:\n\t{string.Join("\n\t", failed)}"); + FailedJobs[type] = FailedJobs[type].Concat(failed).ToHashSet(); + _queues[type].RemoveWhere(job => failed.Contains(job)); + } + + foreach (T type in _queues.Keys) + { + HashSet overtimeJobs = RunningJobsFor(type).Where(job => job.OverTimelimit()).ToHashSet(); + if (!overtimeJobs.Any()) + break; + this._logger?.LogInformation($"Jobs over TimeLimit:\n\t{string.Join("\n\t", overtimeJobs)}"); + foreach (Job jobOvertime in overtimeJobs) + jobOvertime.Cancel(); + } + + Thread.Sleep(checkInterval); + } + } + + public void AddQueue(T type) + { + this._queues.Add(type, new()); + this.FailedJobs.Add(type, new()); + } + + public void AddJob(T obj, Job job) + { + if (!_queues.ContainsKey(obj)) + throw new ArgumentException($"{obj.ToString()} does not have a queue.", nameof(obj)); + this._logger?.LogInformation($"Adding to Queue {job}"); + _queues[obj].Add(job); + } + + public IEnumerable JobsMatching(T obj, ProgressState state) + { + if (!_queues.ContainsKey(obj)) + throw new ArgumentException($"{obj.ToString()} does not have a queue.", nameof(obj)); + return _queues[obj].Where(job => job.ProgressToken.State == state); + } + + public IEnumerable RunningJobsFor(T obj) => JobsMatching(obj, ProgressState.Running); + + public IEnumerable WaitingJobsFor(T obj) => JobsMatching(obj, ProgressState.Waiting); + + public IEnumerable FailedJobsFor(T obj) => JobsMatching(obj, ProgressState.Failed); + + public IEnumerable CancelledJobsFor(T obj) => JobsMatching(obj, ProgressState.Cancelled); + + public IEnumerable FinishedJobsFor(T obj) => JobsMatching(obj, ProgressState.Finished); + + public IEnumerable AllJobsFor(T obj) + { + if (!_queues.ContainsKey(obj)) + throw new ArgumentException($"{obj.ToString()} does not have a queue.", nameof(obj)); + return _queues[obj]; + } + + public void Dispose() + { + this._logger?.LogInformation("Shutting down JobQueue."); + _running = false; + foreach(Job job in _queues.Values.Select(set => set.Select(_ => true))) + job.Cancel(); + } +} \ No newline at end of file diff --git a/JobQueue/JobQueue.csproj b/JobQueue/JobQueue.csproj new file mode 100644 index 0000000..9f1cec8 --- /dev/null +++ b/JobQueue/JobQueue.csproj @@ -0,0 +1,13 @@ + + + + net7.0 + enable + enable + + + + + + + diff --git a/JobQueue/ProgressState.cs b/JobQueue/ProgressState.cs new file mode 100644 index 0000000..559864b --- /dev/null +++ b/JobQueue/ProgressState.cs @@ -0,0 +1,10 @@ +namespace JobQueue; + +public enum ProgressState +{ + Waiting = 0, + Running = 1, + Cancelled = -1, + Finished = 2, + Failed = -2 +} \ No newline at end of file diff --git a/JobQueue/ProgressToken.cs b/JobQueue/ProgressToken.cs new file mode 100644 index 0000000..34de01c --- /dev/null +++ b/JobQueue/ProgressToken.cs @@ -0,0 +1,73 @@ +using Microsoft.Extensions.Logging; + +namespace JobQueue; + +public struct ProgressToken +{ + public ProgressState State { get; private set; } + public DateTime LastUpdate { get; private set; } + public float Progress { get; private set; } + public int? Steps { get; init; } + public int? FinishedSteps { get; private set; } + internal CancellationTokenSource _cancellationTokenSource { get; } = new(); + private readonly ILogger? _logger; + + public ProgressToken(int? steps = null, ILogger? logger = null) + { + this._logger = logger; + State = ProgressState.Waiting; + LastUpdate = DateTime.UtcNow; + Progress = 0; + Steps = steps; + FinishedSteps = steps is null ? null : 0; + } + + public void UpdateProgress(float progress) + { + this._logger?.LogDebug($"Progress Update: {this}"); + this.Progress = progress; + this.LastUpdate = DateTime.UtcNow; + } + + public void UpdateProgress(int newlyFinishedSteps) + { + if (Steps is null || FinishedSteps is null) + throw new InvalidOperationException("This progress token has not been configured to use steps"); + FinishedSteps += newlyFinishedSteps; + UpdateProgress((float)FinishedSteps / (float)Steps); + } + + public void ChangeState(ProgressState state) + { + this._logger?.LogDebug($"State Update: {this}"); + this.State = state; + this.LastUpdate = DateTime.UtcNow; + } + + public void Start() => ChangeState(ProgressState.Running); + public void Cancel() + { + _cancellationTokenSource.Cancel(); + ChangeState(ProgressState.Cancelled); + } + + public void MarkFailed() + { + _cancellationTokenSource.Cancel(); + ChangeState(ProgressState.Failed); + } + + public void MarkFinished() => ChangeState(ProgressState.Finished); + + public ProgressToken Clone() + { + return new ProgressToken(Steps, _logger); + } + + public override string ToString() + { + if (Steps is null || FinishedSteps is null) + return $"State: {Enum.GetName(State)} {Progress:0.##\\%} {LastUpdate:dd/MM-HH:mm:ss.fff}"; + return $"State: {Enum.GetName(State)} {Progress:0.##\\%} {FinishedSteps}/{Steps} {LastUpdate:dd/MM-HH:mm:ss.fff}"; + } +} \ No newline at end of file