1
0
This commit is contained in:
glax 2024-02-28 01:03:57 +01:00
parent 976cd03623
commit 60288a173a
3 changed files with 110 additions and 30 deletions

View File

@ -2,37 +2,47 @@
namespace JobQueue;
public abstract class Job : IComparable
public abstract class Job<T> : IComparable where T : notnull
{
protected JobQueue<T> Queue { get; init; }
public ProgressToken ProgressToken { get; private set; }
public TimeSpan Interval { get; private set; }
public DateTime LastStarted { get; private set; }
public string JobId { get; init; }
public string? ParentJobId { get; init; }
private List<Job<T>> childJobs = new();
public TimeSpan MaximumTimeBetweenUpdates { get; init; }
private readonly ILogger? _logger;
public Job(string? jobId = null, ILogger? logger = null) : this (TimeSpan.Zero, jobId, logger)
protected Job(JobQueue<T> queue, string? jobId = null, ILogger? logger = null) : this (queue, TimeSpan.Zero, jobId, logger)
{
}
public Job(TimeSpan interval, string? jobId = null, ILogger? logger = null) : this(interval, TimeSpan.FromMinutes(3), 1, jobId, logger)
protected Job(JobQueue<T> queue, TimeSpan interval, string? jobId = null, ILogger? logger = null) : this(queue, interval, TimeSpan.FromMinutes(3), 1, jobId, null, logger)
{
}
public Job(TimeSpan interval, TimeSpan maximumTimeBetweenUpdates, string? jobId = null, ILogger? logger = null) : this(interval, maximumTimeBetweenUpdates, 1, jobId, logger)
protected Job(JobQueue<T> queue, TimeSpan interval, TimeSpan maximumTimeBetweenUpdates, string? jobId = null, ILogger? logger = null) : this(queue, interval, maximumTimeBetweenUpdates, 1, jobId, null, logger)
{
}
protected Job(JobQueue<T> queue, TimeSpan interval, TimeSpan maximumTimeBetweenUpdates, string? jobId = null, string? parentJobId = null, ILogger? logger = null) : this(queue, interval, maximumTimeBetweenUpdates, 1, jobId, parentJobId, logger)
{
}
public Job(TimeSpan interval, TimeSpan maximumTimeBetweenUpdates, int steps, string? jobId = null, ILogger? logger = null)
protected Job(JobQueue<T> queue, TimeSpan interval, TimeSpan maximumTimeBetweenUpdates, int steps, string? jobId = null, string? parentJobId = null, ILogger? logger = null)
{
this.Queue = queue;
this._logger = logger;
const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
this.ProgressToken = new ProgressToken(steps);
this.Interval = interval;
this.MaximumTimeBetweenUpdates = maximumTimeBetweenUpdates;
const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
this.JobId = jobId ?? new string(Enumerable.Repeat(chars, 16).Select(s => s[Random.Shared.Next(s.Length)]).ToArray());
this.ParentJobId = parentJobId;
this._logger?.LogDebug($"Created Job {jobId}");
}
@ -43,7 +53,8 @@ public abstract class Job : IComparable
this.LastStarted = DateTime.UtcNow;
this.ProgressToken.Start();
this._logger?.LogDebug($"Started Job {JobId}");
this.Execute(ProgressToken.CancellationTokenSource.Token);
Task t = new (() => Execute(ProgressToken.CancellationTokenSource.Token));
t.Start();
}
public void Cancel()
@ -68,6 +79,11 @@ public abstract class Job : IComparable
return this.ProgressToken.LastUpdate.Add(MaximumTimeBetweenUpdates) > DateTime.UtcNow;
}
public IEnumerable<Job<T>> GetChildJobs()
{
return this.childJobs;
}
public override string ToString()
{
return $"Job: {JobId} Last Started: {LastStarted:dd/MM-HH:mm:ss.fff} Interval: {Interval:d-hh\\:mm\\:ss} {ProgressToken}";
@ -75,13 +91,13 @@ public abstract class Job : IComparable
public int CompareTo(object? obj)
{
if (obj is not Job other)
if (obj is not Job<T> other)
throw new ArgumentException("Type can not be compared", nameof(obj));
return TimeToNextExecution().CompareTo(other.TimeToNextExecution());
}
public override bool Equals(object? obj)
{
return obj is Job other && other.JobId.Equals(this.JobId);
return obj is Job<T> other && other.JobId.Equals(this.JobId);
}
}

View File

@ -4,8 +4,8 @@ namespace JobQueue;
public class JobQueue<T> : IDisposable where T : notnull
{
private readonly Dictionary<T, HashSet<Job>> _queues = new();
private Dictionary<T, HashSet<Job>> FailedJobs { get; init; } = new();
private readonly Dictionary<T, HashSet<Job<T>>> _queues = new();
private Dictionary<T, HashSet<Job<T>>> AllFailedJobs { get; init; } = new();
private bool _running = true;
private readonly ILogger? _logger;
@ -21,35 +21,38 @@ public class JobQueue<T> : IDisposable where T : notnull
{
foreach (T type in _queues.Where(q => !RunningJobsFor(q.Key).Any()).Select(kv => kv.Key))
{
Job? startJob = WaitingJobsFor(type).Min();
Job<T>? startJob = WaitingJobsFor(type).Min();
this._logger?.LogInformation($"Starting job: {startJob}");
startJob?.Start();
}
foreach (Job job in _queues.Values.SelectMany(job => job).Where(job =>
foreach (Job<T> job in _queues.Values.SelectMany(job => job).Where(job =>
job.ProgressToken.State is ProgressState.Finished or ProgressState.Cancelled))
{
this._logger?.LogInformation($"Job finished: {job}");
job.Reset();
if(job.Interval.TotalMilliseconds != 0)
job.Reset();
else
RemoveJob(job);
}
foreach (T type in _queues.Keys)
{
HashSet<Job> failed = FailedJobsFor(type).ToHashSet();
HashSet<Job<T>> failed = _queues[type].Where(job => job.ProgressToken.State is ProgressState.Failed).ToHashSet();
if (!failed.Any())
break;
this._logger?.LogInformation($"Failed Jobs:\n\t{string.Join("\n\t", failed)}");
FailedJobs[type] = FailedJobs[type].Concat(failed).ToHashSet();
AllFailedJobs[type] = AllFailedJobs[type].Concat(failed).ToHashSet();
_queues[type].RemoveWhere(job => failed.Contains(job));
}
foreach (T type in _queues.Keys)
{
HashSet<Job> overtimeJobs = RunningJobsFor(type).Where(job => job.OverTimelimit()).ToHashSet();
HashSet<Job<T>> overtimeJobs = RunningJobsFor(type).Where(job => job.OverTimelimit()).ToHashSet();
if (!overtimeJobs.Any())
break;
this._logger?.LogInformation($"Jobs over TimeLimit:\n\t{string.Join("\n\t", overtimeJobs)}");
foreach (Job jobOvertime in overtimeJobs)
foreach (Job<T> jobOvertime in overtimeJobs)
jobOvertime.Cancel();
}
@ -60,10 +63,10 @@ public class JobQueue<T> : IDisposable where T : notnull
public void AddQueue(T type)
{
this._queues.Add(type, new());
this.FailedJobs.Add(type, new());
this.AllFailedJobs.Add(type, new());
}
public void AddJob(T obj, Job job)
public void AddJob(T obj, Job<T> job)
{
if (!_queues.ContainsKey(obj))
throw new ArgumentException($"{obj.ToString()} does not have a queue.", nameof(obj));
@ -71,24 +74,79 @@ public class JobQueue<T> : IDisposable where T : notnull
_queues[obj].Add(job);
}
public IEnumerable<Job> JobsMatching(T obj, ProgressState state)
public void RemoveJob(Job<T> job)
{
foreach (HashSet<Job<T>> queue in this._queues.Values)
{
if (queue.Contains(job))
{
queue.Remove(job);
return;
}
}
}
public bool TryJobWithId(string jobId, out Job<T>? job)
{
job = JobWithId(jobId);
return job is not null;
}
public Job<T>? JobWithId(string jobId)
{
return _queues.Values.SelectMany(queue => queue).FirstOrDefault(job => job.JobId.Equals(jobId));
}
public IEnumerable<Job<T>> JobsWithParent(string parentId)
{
return _queues.Values.SelectMany(queue => queue).Where(job => job.ParentJobId?.Equals(parentId) ?? false);
}
public IEnumerable<Job<T>> JobsWithParent(Job<T> parentJob)
{
return JobsWithParent(parentJob.JobId);
}
public IEnumerable<Job<T>> JobsMatchingState(ProgressState state)
{
return _queues.Values.SelectMany(queue => queue).Where(job => job.ProgressToken.State == state);
}
public IEnumerable<Job<T>> RunningJobs() => JobsMatchingState(ProgressState.Running);
public IEnumerable<Job<T>> WaitingJobs() => JobsMatchingState(ProgressState.Waiting);
public IEnumerable<Job<T>> FailedJobs() => AllFailedJobs.Values.SelectMany(queue => queue).Select(job => job);
public IEnumerable<Job<T>> CancelledJobs() => JobsMatchingState(ProgressState.Cancelled);
public IEnumerable<Job<T>> FinishedJobs() => JobsMatchingState(ProgressState.Finished);
public IEnumerable<Job<T>> AllJobs() => _queues.Values.SelectMany(queue => queue).Select(job => job);
public IEnumerable<Job<T>> JobsMatchingStateFor(T obj, ProgressState state)
{
if (!_queues.ContainsKey(obj))
throw new ArgumentException($"{obj.ToString()} does not have a queue.", nameof(obj));
return _queues[obj].Where(job => job.ProgressToken.State == state);
}
public IEnumerable<Job> RunningJobsFor(T obj) => JobsMatching(obj, ProgressState.Running);
public IEnumerable<Job<T>> RunningJobsFor(T obj) => JobsMatchingStateFor(obj, ProgressState.Running);
public IEnumerable<Job> WaitingJobsFor(T obj) => JobsMatching(obj, ProgressState.Waiting);
public IEnumerable<Job<T>> WaitingJobsFor(T obj) => JobsMatchingStateFor(obj, ProgressState.Waiting);
public IEnumerable<Job> FailedJobsFor(T obj) => JobsMatching(obj, ProgressState.Failed);
public IEnumerable<Job<T>> FailedJobsFor(T obj)
{
if(!AllFailedJobs.ContainsKey(obj))
throw new ArgumentException($"{obj.ToString()} does not have a queue.", nameof(obj));
return AllFailedJobs[obj];
}
public IEnumerable<Job> CancelledJobsFor(T obj) => JobsMatching(obj, ProgressState.Cancelled);
public IEnumerable<Job<T>> CancelledJobsFor(T obj) => JobsMatchingStateFor(obj, ProgressState.Cancelled);
public IEnumerable<Job> FinishedJobsFor(T obj) => JobsMatching(obj, ProgressState.Finished);
public IEnumerable<Job<T>> FinishedJobsFor(T obj) => JobsMatchingStateFor(obj, ProgressState.Finished);
public IEnumerable<Job> AllJobsFor(T obj)
public IEnumerable<Job<T>> AllJobsFor(T obj)
{
if (!_queues.ContainsKey(obj))
throw new ArgumentException($"{obj.ToString()} does not have a queue.", nameof(obj));
@ -99,7 +157,7 @@ public class JobQueue<T> : IDisposable where T : notnull
{
this._logger?.LogInformation("Shutting down JobQueue.");
_running = false;
foreach(Job job in _queues.Values.SelectMany(list => list))
foreach(Job<T> job in _queues.Values.SelectMany(list => list))
job.Cancel();
}
}

View File

@ -7,9 +7,9 @@ public struct ProgressToken
public ProgressState State { get; private set; }
public DateTime LastUpdate { get; private set; }
public float Progress { get; private set; }
public int? Steps { get; init; }
public int? Steps { get; private set; }
public int? FinishedSteps { get; private set; }
internal CancellationTokenSource CancellationTokenSource { get; } = new();
public CancellationTokenSource CancellationTokenSource { get; } = new();
private readonly ILogger? _logger;
public ProgressToken(int? steps = null, ILogger? logger = null)
@ -64,6 +64,12 @@ public struct ProgressToken
return new ProgressToken(Steps, _logger);
}
public void SetSteps(int steps)
{
this.Steps = steps;
UpdateProgress((float)FinishedSteps / (float)Steps);
}
public override string ToString()
{
if (Steps is null || FinishedSteps is null)