Tranga CheckRunning Workers

This commit is contained in:
2025-07-02 22:34:12 +02:00
parent 8a06ed648c
commit 6f5823596a

View File

@ -38,22 +38,26 @@ public static class Tranga
AddWorker(baseWorker); AddWorker(baseWorker);
} }
} }
public static void RemoveWorker(BaseWorker worker)
internal static void StopWorker(BaseWorker worker) => RemoveWorker(worker);
public static void RemoveWorker(BaseWorker removeWorker)
{ {
IEnumerable<BaseWorker> baseWorkers = Workers.Where(w => w.DependenciesAndSelf.Any(w => w == worker)); IEnumerable<BaseWorker> baseWorkers = Workers.Where(w => w.DependenciesAndSelf.Any(worker => worker == removeWorker));
foreach (BaseWorker baseWorker in baseWorkers)
foreach (BaseWorker worker in baseWorkers)
{ {
baseWorker.Cancel(); worker.Cancel();
Workers.Remove(baseWorker); Workers.Remove(worker);
if (RunningWorkers.ContainsKey(baseWorker)) if (RunningWorkers.ContainsKey(worker))
{ {
RunningWorkers[baseWorker].Abort(); worker.Cancel();
RunningWorkers.Remove(baseWorker); RunningWorkers.Remove(worker);
} }
} }
} }
private static readonly Dictionary<BaseWorker, Thread> RunningWorkers = new(); private static readonly Dictionary<BaseWorker, Task<BaseWorker[]>> RunningWorkers = new();
public static BaseWorker[] GetRunningWorkers() => RunningWorkers.Keys.ToArray(); public static BaseWorker[] GetRunningWorkers() => RunningWorkers.Keys.ToArray();
private static readonly HashSet<BaseWorker> StartWorkers = new(); private static readonly HashSet<BaseWorker> StartWorkers = new();
private static void WorkerStarter(object? serviceProviderObj) private static void WorkerStarter(object? serviceProviderObj)
@ -68,26 +72,31 @@ public static class Tranga
while (true) while (true)
{ {
foreach (BaseWorker startWorker in StartWorkers) CheckRunningWorkers();
foreach (BaseWorker worker in StartWorkers)
{ {
IServiceScope scope = serviceProvider.CreateScope(); if (worker is BaseWorkerWithContext<DbContext> scopedWorker)
StartWorker(startWorker, scope); scopedWorker.SetScope(serviceProvider.CreateScope());
RunningWorkers.Add(worker, worker.DoWork());
} }
Thread.Sleep(TrangaSettings.workCycleTimeout); Thread.Sleep(TrangaSettings.workCycleTimeout);
} }
} }
private static void CheckRunningWorkers()
{
KeyValuePair<BaseWorker, Task<BaseWorker[]>>[] done = RunningWorkers.Where(kv => kv.Value.IsCompleted).ToArray();
Log.Info($"Done: {done.Length}");
Log.Debug(string.Join("\n", done.Select(d => d.ToString())));
foreach ((BaseWorker worker, Task<BaseWorker[]> task) in done)
{
RunningWorkers.Remove(worker);
foreach (BaseWorker newWorker in task.Result)
StartWorkers.Add(newWorker);
task.Dispose();
}
}
internal static void MarkWorkerForStart(BaseWorker worker) => StartWorkers.Add(worker); internal static void MarkWorkerForStart(BaseWorker worker) => StartWorkers.Add(worker);
private static void StartWorker(BaseWorker worker, IServiceScope scope)
{
if(worker is BaseWorkerWithContext<DbContext> w)
w.SetScope(scope);
worker.DoWork();
}
internal static void StopWorker(BaseWorker worker)
{
worker.Cancel();
}
} }