diff --git a/JobQueue/Job.cs b/JobQueue/Job.cs index ca90df7..0441c45 100644 --- a/JobQueue/Job.cs +++ b/JobQueue/Job.cs @@ -2,37 +2,47 @@ namespace JobQueue; -public abstract class Job : IComparable +public abstract class Job : IComparable where T : notnull { + protected JobQueue Queue { get; init; } public ProgressToken ProgressToken { get; private set; } public TimeSpan Interval { get; private set; } public DateTime LastStarted { get; private set; } public string JobId { get; init; } + public string? ParentJobId { get; init; } + private List> childJobs = new(); public TimeSpan MaximumTimeBetweenUpdates { get; init; } private readonly ILogger? _logger; - public Job(string? jobId = null, ILogger? logger = null) : this (TimeSpan.Zero, jobId, logger) + protected Job(JobQueue queue, string? jobId = null, ILogger? logger = null) : this (queue, TimeSpan.Zero, jobId, logger) { } - public Job(TimeSpan interval, string? jobId = null, ILogger? logger = null) : this(interval, TimeSpan.FromMinutes(3), 1, jobId, logger) + protected Job(JobQueue queue, TimeSpan interval, string? jobId = null, ILogger? logger = null) : this(queue, interval, TimeSpan.FromMinutes(3), 1, jobId, null, logger) { } - public Job(TimeSpan interval, TimeSpan maximumTimeBetweenUpdates, string? jobId = null, ILogger? logger = null) : this(interval, maximumTimeBetweenUpdates, 1, jobId, logger) + protected Job(JobQueue queue, TimeSpan interval, TimeSpan maximumTimeBetweenUpdates, string? jobId = null, ILogger? logger = null) : this(queue, interval, maximumTimeBetweenUpdates, 1, jobId, null, logger) + { + + } + + protected Job(JobQueue queue, TimeSpan interval, TimeSpan maximumTimeBetweenUpdates, string? jobId = null, string? parentJobId = null, ILogger? logger = null) : this(queue, interval, maximumTimeBetweenUpdates, 1, jobId, parentJobId, logger) { } - public Job(TimeSpan interval, TimeSpan maximumTimeBetweenUpdates, int steps, string? jobId = null, ILogger? logger = null) + protected Job(JobQueue queue, TimeSpan interval, TimeSpan maximumTimeBetweenUpdates, int steps, string? jobId = null, string? parentJobId = null, ILogger? logger = null) { + this.Queue = queue; this._logger = logger; - const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; this.ProgressToken = new ProgressToken(steps); this.Interval = interval; this.MaximumTimeBetweenUpdates = maximumTimeBetweenUpdates; + const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; this.JobId = jobId ?? new string(Enumerable.Repeat(chars, 16).Select(s => s[Random.Shared.Next(s.Length)]).ToArray()); + this.ParentJobId = parentJobId; this._logger?.LogDebug($"Created Job {jobId}"); } @@ -43,7 +53,8 @@ public abstract class Job : IComparable this.LastStarted = DateTime.UtcNow; this.ProgressToken.Start(); this._logger?.LogDebug($"Started Job {JobId}"); - this.Execute(ProgressToken.CancellationTokenSource.Token); + Task t = new (() => Execute(ProgressToken.CancellationTokenSource.Token)); + t.Start(); } public void Cancel() @@ -68,6 +79,11 @@ public abstract class Job : IComparable return this.ProgressToken.LastUpdate.Add(MaximumTimeBetweenUpdates) > DateTime.UtcNow; } + public IEnumerable> GetChildJobs() + { + return this.childJobs; + } + public override string ToString() { return $"Job: {JobId} Last Started: {LastStarted:dd/MM-HH:mm:ss.fff} Interval: {Interval:d-hh\\:mm\\:ss} {ProgressToken}"; @@ -75,13 +91,13 @@ public abstract class Job : IComparable public int CompareTo(object? obj) { - if (obj is not Job other) + if (obj is not Job other) throw new ArgumentException("Type can not be compared", nameof(obj)); return TimeToNextExecution().CompareTo(other.TimeToNextExecution()); } public override bool Equals(object? obj) { - return obj is Job other && other.JobId.Equals(this.JobId); + return obj is Job other && other.JobId.Equals(this.JobId); } } \ No newline at end of file diff --git a/JobQueue/JobQueue.cs b/JobQueue/JobQueue.cs index 30477e9..753da63 100644 --- a/JobQueue/JobQueue.cs +++ b/JobQueue/JobQueue.cs @@ -4,8 +4,8 @@ namespace JobQueue; public class JobQueue : IDisposable where T : notnull { - private readonly Dictionary> _queues = new(); - private Dictionary> FailedJobs { get; init; } = new(); + private readonly Dictionary>> _queues = new(); + private Dictionary>> AllFailedJobs { get; init; } = new(); private bool _running = true; private readonly ILogger? _logger; @@ -21,35 +21,38 @@ public class JobQueue : IDisposable where T : notnull { foreach (T type in _queues.Where(q => !RunningJobsFor(q.Key).Any()).Select(kv => kv.Key)) { - Job? startJob = WaitingJobsFor(type).Min(); + Job? startJob = WaitingJobsFor(type).Min(); this._logger?.LogInformation($"Starting job: {startJob}"); startJob?.Start(); } - foreach (Job job in _queues.Values.SelectMany(job => job).Where(job => + foreach (Job job in _queues.Values.SelectMany(job => job).Where(job => job.ProgressToken.State is ProgressState.Finished or ProgressState.Cancelled)) { this._logger?.LogInformation($"Job finished: {job}"); - job.Reset(); + if(job.Interval.TotalMilliseconds != 0) + job.Reset(); + else + RemoveJob(job); } foreach (T type in _queues.Keys) { - HashSet failed = FailedJobsFor(type).ToHashSet(); + HashSet> failed = _queues[type].Where(job => job.ProgressToken.State is ProgressState.Failed).ToHashSet(); if (!failed.Any()) break; this._logger?.LogInformation($"Failed Jobs:\n\t{string.Join("\n\t", failed)}"); - FailedJobs[type] = FailedJobs[type].Concat(failed).ToHashSet(); + AllFailedJobs[type] = AllFailedJobs[type].Concat(failed).ToHashSet(); _queues[type].RemoveWhere(job => failed.Contains(job)); } foreach (T type in _queues.Keys) { - HashSet overtimeJobs = RunningJobsFor(type).Where(job => job.OverTimelimit()).ToHashSet(); + HashSet> overtimeJobs = RunningJobsFor(type).Where(job => job.OverTimelimit()).ToHashSet(); if (!overtimeJobs.Any()) break; this._logger?.LogInformation($"Jobs over TimeLimit:\n\t{string.Join("\n\t", overtimeJobs)}"); - foreach (Job jobOvertime in overtimeJobs) + foreach (Job jobOvertime in overtimeJobs) jobOvertime.Cancel(); } @@ -60,10 +63,10 @@ public class JobQueue : IDisposable where T : notnull public void AddQueue(T type) { this._queues.Add(type, new()); - this.FailedJobs.Add(type, new()); + this.AllFailedJobs.Add(type, new()); } - public void AddJob(T obj, Job job) + public void AddJob(T obj, Job job) { if (!_queues.ContainsKey(obj)) throw new ArgumentException($"{obj.ToString()} does not have a queue.", nameof(obj)); @@ -71,24 +74,79 @@ public class JobQueue : IDisposable where T : notnull _queues[obj].Add(job); } - public IEnumerable JobsMatching(T obj, ProgressState state) + public void RemoveJob(Job job) + { + foreach (HashSet> queue in this._queues.Values) + { + if (queue.Contains(job)) + { + queue.Remove(job); + return; + } + } + } + + public bool TryJobWithId(string jobId, out Job? job) + { + job = JobWithId(jobId); + return job is not null; + } + + public Job? JobWithId(string jobId) + { + return _queues.Values.SelectMany(queue => queue).FirstOrDefault(job => job.JobId.Equals(jobId)); + } + + public IEnumerable> JobsWithParent(string parentId) + { + return _queues.Values.SelectMany(queue => queue).Where(job => job.ParentJobId?.Equals(parentId) ?? false); + } + + public IEnumerable> JobsWithParent(Job parentJob) + { + return JobsWithParent(parentJob.JobId); + } + + public IEnumerable> JobsMatchingState(ProgressState state) + { + return _queues.Values.SelectMany(queue => queue).Where(job => job.ProgressToken.State == state); + } + + public IEnumerable> RunningJobs() => JobsMatchingState(ProgressState.Running); + + public IEnumerable> WaitingJobs() => JobsMatchingState(ProgressState.Waiting); + + public IEnumerable> FailedJobs() => AllFailedJobs.Values.SelectMany(queue => queue).Select(job => job); + + public IEnumerable> CancelledJobs() => JobsMatchingState(ProgressState.Cancelled); + + public IEnumerable> FinishedJobs() => JobsMatchingState(ProgressState.Finished); + + public IEnumerable> AllJobs() => _queues.Values.SelectMany(queue => queue).Select(job => job); + + public IEnumerable> JobsMatchingStateFor(T obj, ProgressState state) { if (!_queues.ContainsKey(obj)) throw new ArgumentException($"{obj.ToString()} does not have a queue.", nameof(obj)); return _queues[obj].Where(job => job.ProgressToken.State == state); } - public IEnumerable RunningJobsFor(T obj) => JobsMatching(obj, ProgressState.Running); + public IEnumerable> RunningJobsFor(T obj) => JobsMatchingStateFor(obj, ProgressState.Running); - public IEnumerable WaitingJobsFor(T obj) => JobsMatching(obj, ProgressState.Waiting); + public IEnumerable> WaitingJobsFor(T obj) => JobsMatchingStateFor(obj, ProgressState.Waiting); - public IEnumerable FailedJobsFor(T obj) => JobsMatching(obj, ProgressState.Failed); + public IEnumerable> FailedJobsFor(T obj) + { + if(!AllFailedJobs.ContainsKey(obj)) + throw new ArgumentException($"{obj.ToString()} does not have a queue.", nameof(obj)); + return AllFailedJobs[obj]; + } - public IEnumerable CancelledJobsFor(T obj) => JobsMatching(obj, ProgressState.Cancelled); + public IEnumerable> CancelledJobsFor(T obj) => JobsMatchingStateFor(obj, ProgressState.Cancelled); - public IEnumerable FinishedJobsFor(T obj) => JobsMatching(obj, ProgressState.Finished); + public IEnumerable> FinishedJobsFor(T obj) => JobsMatchingStateFor(obj, ProgressState.Finished); - public IEnumerable AllJobsFor(T obj) + public IEnumerable> AllJobsFor(T obj) { if (!_queues.ContainsKey(obj)) throw new ArgumentException($"{obj.ToString()} does not have a queue.", nameof(obj)); @@ -99,7 +157,7 @@ public class JobQueue : IDisposable where T : notnull { this._logger?.LogInformation("Shutting down JobQueue."); _running = false; - foreach(Job job in _queues.Values.SelectMany(list => list)) + foreach(Job job in _queues.Values.SelectMany(list => list)) job.Cancel(); } } \ No newline at end of file diff --git a/JobQueue/ProgressToken.cs b/JobQueue/ProgressToken.cs index dc909be..a2ffa86 100644 --- a/JobQueue/ProgressToken.cs +++ b/JobQueue/ProgressToken.cs @@ -7,9 +7,9 @@ public struct ProgressToken public ProgressState State { get; private set; } public DateTime LastUpdate { get; private set; } public float Progress { get; private set; } - public int? Steps { get; init; } + public int? Steps { get; private set; } public int? FinishedSteps { get; private set; } - internal CancellationTokenSource CancellationTokenSource { get; } = new(); + public CancellationTokenSource CancellationTokenSource { get; } = new(); private readonly ILogger? _logger; public ProgressToken(int? steps = null, ILogger? logger = null) @@ -64,6 +64,12 @@ public struct ProgressToken return new ProgressToken(Steps, _logger); } + public void SetSteps(int steps) + { + this.Steps = steps; + UpdateProgress((float)FinishedSteps / (float)Steps); + } + public override string ToString() { if (Steps is null || FinishedSteps is null)