1
0
This commit is contained in:
glax 2024-02-22 23:37:00 +01:00
parent 257dd5c175
commit e29a142935
13 changed files with 421 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
bin/
obj/
/packages/
riderModule.iml
/_ReSharper.Caches/

13
.idea/.idea.JobQueue/.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,13 @@
# Default ignored files
/shelf/
/workspace.xml
# Rider ignored files
/.idea.TaskQueue.iml
/modules.xml
/contentModel.xml
/projectSettingsUpdater.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -0,0 +1 @@
JobQueue

View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding" addBOMForNewFiles="with BOM under Windows, with no BOM otherwise" />
</project>

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="UserContentModel">
<attachedFolders />
<explicitIncludes />
<explicitExcludes />
</component>
</project>

View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

View File

@ -0,0 +1,85 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AutoImportSettings">
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="a5539feb-580a-4d73-b339-a1169a4234ae" name="Changes" comment="">
<change afterPath="$PROJECT_DIR$/.gitignore" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/.idea.JobQueue/.idea/.gitignore" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/.idea.JobQueue/.idea/encodings.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/.idea.JobQueue/.idea/indexLayout.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/.idea.JobQueue/.idea/vcs.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/.idea/.idea.TaskQueue/.idea/workspace.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/JobQueue.sln" afterDir="false" />
<change afterPath="$PROJECT_DIR$/JobQueue/Job.cs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/JobQueue/JobQueue.cs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/JobQueue/JobQueue.csproj" afterDir="false" />
<change afterPath="$PROJECT_DIR$/JobQueue/ProgressState.cs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/JobQueue/ProgressToken.cs" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="MetaFilesCheckinStateConfiguration" checkMetaFiles="true" />
<component name="ProjectColorInfo"><![CDATA[{
"associatedIndex": 7
}]]></component>
<component name="ProjectId" id="2cjtf5g9tVn756sskExZMVncoeO" />
<component name="ProjectLevelVcsManager" settingsEditedManually="true">
<ConfirmationsSetting value="2" id="Add" />
</component>
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"ASKED_ADD_EXTERNAL_FILES": "true",
"ASKED_SHARE_PROJECT_CONFIGURATION_FILES": "true",
"RunOnceActivity.OpenProjectViewOnStart": "true",
"RunOnceActivity.ShowReadmeOnStart": "true",
"SHARE_PROJECT_CONFIGURATION_FILES": "true",
"git-widget-placeholder": "master",
"ignore.virus.scanning.warn.message": "true",
"node.js.detected.package.eslint": "true",
"node.js.detected.package.tslint": "true",
"node.js.selected.package.eslint": "(autodetect)",
"node.js.selected.package.tslint": "(autodetect)",
"nodejs_package_manager_path": "npm",
"vue.rearranger.settings.migration": "true"
},
"keyToStringList": {
"rider.external.source.directories": [
"C:\\Users\\Glax\\AppData\\Roaming\\JetBrains\\Rider2023.3\\resharper-host\\DecompilerCache",
"C:\\Users\\Glax\\AppData\\Roaming\\JetBrains\\Rider2023.3\\resharper-host\\SourcesCache",
"C:\\Users\\Glax\\AppData\\Local\\Symbols\\src"
]
}
}]]></component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="a5539feb-580a-4d73-b339-a1169a4234ae" name="Changes" comment="" />
<created>1708636916152</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1708636916152</updated>
<workItem from="1708636917346" duration="3150000" />
</task>
<servers />
</component>
<component name="TypeScriptGeneratedFilesManager">
<option name="version" value="3" />
</component>
<component name="UnityCheckinConfiguration" checkUnsavedScenes="true" />
<component name="VcsManagerConfiguration">
<option name="ADD_EXTERNAL_FILES_SILENTLY" value="true" />
<option name="CLEAR_INITIAL_COMMIT_MESSAGE" value="true" />
</component>
</project>

16
JobQueue.sln Normal file
View File

@ -0,0 +1,16 @@

Microsoft Visual Studio Solution File, Format Version 12.00
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JobQueue", "JobQueue\JobQueue.csproj", "{271080AB-869B-4377-9699-8DBBA3060EF2}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{271080AB-869B-4377-9699-8DBBA3060EF2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{271080AB-869B-4377-9699-8DBBA3060EF2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{271080AB-869B-4377-9699-8DBBA3060EF2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{271080AB-869B-4377-9699-8DBBA3060EF2}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal

82
JobQueue/Job.cs Normal file
View File

@ -0,0 +1,82 @@
using Microsoft.Extensions.Logging;
namespace JobQueue;
public abstract class Job : IComparable
{
public ProgressToken ProgressToken { get; private set; }
public TimeSpan Interval { get; private set; }
public DateTime LastStarted { get; private set; }
public string JobId { get; init; }
public TimeSpan MaximumTimeBetweenUpdates { get; init; }
private readonly ILogger? _logger;
public Job(string? jobId = null, ILogger? logger = null) : this (TimeSpan.Zero, jobId, logger)
{
}
public Job(TimeSpan interval, string? jobId = null, ILogger? logger = null) : this(interval, TimeSpan.FromMinutes(3), 1, jobId, logger)
{
}
public Job(TimeSpan interval, TimeSpan maximumTimeBetweenUpdates, string? jobId = null, ILogger? logger = null) : this(interval, maximumTimeBetweenUpdates, 1, jobId, logger)
{
}
public Job(TimeSpan interval, TimeSpan maximumTimeBetweenUpdates, int steps, string? jobId = null, ILogger? logger = null)
{
this._logger = logger;
const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
this.ProgressToken = new ProgressToken(steps);
this.Interval = interval;
this.MaximumTimeBetweenUpdates = maximumTimeBetweenUpdates;
this.JobId = jobId ?? new string(Enumerable.Repeat(chars, 16).Select(s => s[Random.Shared.Next(s.Length)]).ToArray());
this._logger?.LogDebug($"Created Job {jobId}");
}
protected abstract void Execute(CancellationToken cancellationToken);
public void Start()
{
this.LastStarted = DateTime.UtcNow;
this.ProgressToken.Start();
this._logger?.LogDebug($"Started Job {JobId}");
this.Execute(ProgressToken._cancellationTokenSource.Token);
}
public void Cancel()
{
this.ProgressToken.Cancel();
this._logger?.LogDebug($"Stopped Job {JobId}");
}
public void Reset()
{
this.ProgressToken._cancellationTokenSource.Cancel();
this.ProgressToken = ProgressToken.Clone();
}
public TimeSpan TimeToNextExecution()
{
return LastStarted.Add(Interval).Subtract(DateTime.UtcNow);
}
internal bool OverTimelimit()
{
return this.ProgressToken.LastUpdate.Add(MaximumTimeBetweenUpdates) > DateTime.UtcNow;
}
public override string ToString()
{
return $"Job: {JobId} Last Started: {LastStarted:dd/MM-HH:mm:ss.fff} Interval: {Interval:d-hh\\:mm\\:ss} {ProgressToken}";
}
public int CompareTo(object? obj)
{
if (obj is not Job other)
throw new ArgumentException("Type can not be compared", nameof(obj));
return TimeToNextExecution().CompareTo(other.TimeToNextExecution());
}
}

105
JobQueue/JobQueue.cs Normal file
View File

@ -0,0 +1,105 @@
using Microsoft.Extensions.Logging;
namespace JobQueue;
public class JobQueue<T> : IDisposable where T : notnull
{
private readonly Dictionary<T, HashSet<Job>> _queues = new();
public Dictionary<T, HashSet<Job>> FailedJobs { get; init; } = new();
private bool _running = true;
private readonly ILogger? _logger;
public JobQueue(int checkIntervalMs, ILogger? logger = null) : this(TimeSpan.FromMilliseconds(checkIntervalMs), logger)
{
}
public JobQueue(TimeSpan checkInterval, ILogger? logger = null)
{
this._logger = logger;
while (_running)
{
foreach (T type in _queues.Where(q => !RunningJobsFor(q.Key).Any()).Select(kv => kv.Key))
{
Job? startJob = WaitingJobsFor(type).Min();
this._logger?.LogInformation($"Starting job: {startJob}");
startJob?.Start();
}
foreach (Job job in _queues.Values.Select(queue =>
queue.Select(job => job.ProgressToken.State is ProgressState.Finished or ProgressState.Cancelled)))
{
this._logger?.LogInformation($"Job finished: {job}");
job.Reset();
}
foreach (T type in _queues.Keys)
{
HashSet<Job> failed = FailedJobsFor(type).ToHashSet();
if (!failed.Any())
break;
this._logger?.LogInformation($"Failed Jobs:\n\t{string.Join("\n\t", failed)}");
FailedJobs[type] = FailedJobs[type].Concat(failed).ToHashSet();
_queues[type].RemoveWhere(job => failed.Contains(job));
}
foreach (T type in _queues.Keys)
{
HashSet<Job> overtimeJobs = RunningJobsFor(type).Where(job => job.OverTimelimit()).ToHashSet();
if (!overtimeJobs.Any())
break;
this._logger?.LogInformation($"Jobs over TimeLimit:\n\t{string.Join("\n\t", overtimeJobs)}");
foreach (Job jobOvertime in overtimeJobs)
jobOvertime.Cancel();
}
Thread.Sleep(checkInterval);
}
}
public void AddQueue(T type)
{
this._queues.Add(type, new());
this.FailedJobs.Add(type, new());
}
public void AddJob(T obj, Job job)
{
if (!_queues.ContainsKey(obj))
throw new ArgumentException($"{obj.ToString()} does not have a queue.", nameof(obj));
this._logger?.LogInformation($"Adding to Queue {job}");
_queues[obj].Add(job);
}
public IEnumerable<Job> JobsMatching(T obj, ProgressState state)
{
if (!_queues.ContainsKey(obj))
throw new ArgumentException($"{obj.ToString()} does not have a queue.", nameof(obj));
return _queues[obj].Where(job => job.ProgressToken.State == state);
}
public IEnumerable<Job> RunningJobsFor(T obj) => JobsMatching(obj, ProgressState.Running);
public IEnumerable<Job> WaitingJobsFor(T obj) => JobsMatching(obj, ProgressState.Waiting);
public IEnumerable<Job> FailedJobsFor(T obj) => JobsMatching(obj, ProgressState.Failed);
public IEnumerable<Job> CancelledJobsFor(T obj) => JobsMatching(obj, ProgressState.Cancelled);
public IEnumerable<Job> FinishedJobsFor(T obj) => JobsMatching(obj, ProgressState.Finished);
public IEnumerable<Job> AllJobsFor(T obj)
{
if (!_queues.ContainsKey(obj))
throw new ArgumentException($"{obj.ToString()} does not have a queue.", nameof(obj));
return _queues[obj];
}
public void Dispose()
{
this._logger?.LogInformation("Shutting down JobQueue.");
_running = false;
foreach(Job job in _queues.Values.Select(set => set.Select(_ => true)))
job.Cancel();
}
}

13
JobQueue/JobQueue.csproj Normal file
View File

@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
</ItemGroup>
</Project>

10
JobQueue/ProgressState.cs Normal file
View File

@ -0,0 +1,10 @@
namespace JobQueue;
public enum ProgressState
{
Waiting = 0,
Running = 1,
Cancelled = -1,
Finished = 2,
Failed = -2
}

73
JobQueue/ProgressToken.cs Normal file
View File

@ -0,0 +1,73 @@
using Microsoft.Extensions.Logging;
namespace JobQueue;
public struct ProgressToken
{
public ProgressState State { get; private set; }
public DateTime LastUpdate { get; private set; }
public float Progress { get; private set; }
public int? Steps { get; init; }
public int? FinishedSteps { get; private set; }
internal CancellationTokenSource _cancellationTokenSource { get; } = new();
private readonly ILogger? _logger;
public ProgressToken(int? steps = null, ILogger? logger = null)
{
this._logger = logger;
State = ProgressState.Waiting;
LastUpdate = DateTime.UtcNow;
Progress = 0;
Steps = steps;
FinishedSteps = steps is null ? null : 0;
}
public void UpdateProgress(float progress)
{
this._logger?.LogDebug($"Progress Update: {this}");
this.Progress = progress;
this.LastUpdate = DateTime.UtcNow;
}
public void UpdateProgress(int newlyFinishedSteps)
{
if (Steps is null || FinishedSteps is null)
throw new InvalidOperationException("This progress token has not been configured to use steps");
FinishedSteps += newlyFinishedSteps;
UpdateProgress((float)FinishedSteps / (float)Steps);
}
public void ChangeState(ProgressState state)
{
this._logger?.LogDebug($"State Update: {this}");
this.State = state;
this.LastUpdate = DateTime.UtcNow;
}
public void Start() => ChangeState(ProgressState.Running);
public void Cancel()
{
_cancellationTokenSource.Cancel();
ChangeState(ProgressState.Cancelled);
}
public void MarkFailed()
{
_cancellationTokenSource.Cancel();
ChangeState(ProgressState.Failed);
}
public void MarkFinished() => ChangeState(ProgressState.Finished);
public ProgressToken Clone()
{
return new ProgressToken(Steps, _logger);
}
public override string ToString()
{
if (Steps is null || FinishedSteps is null)
return $"State: {Enum.GetName(State)} {Progress:0.##\\%} {LastUpdate:dd/MM-HH:mm:ss.fff}";
return $"State: {Enum.GetName(State)} {Progress:0.##\\%} {FinishedSteps}/{Steps} {LastUpdate:dd/MM-HH:mm:ss.fff}";
}
}