Fix CancellationToken Source crashing all Workers after 10 Minutes of runtime

This commit is contained in:
2025-07-21 19:32:29 +02:00
parent 2b527e15b0
commit d79dd8c3d5
2 changed files with 27 additions and 20 deletions

View File

@@ -123,24 +123,25 @@ public static class Tranga
Log.Fatal("ServiceProvider is null"); Log.Fatal("ServiceProvider is null");
return; return;
} }
Action callBack = AfterWork(worker, callback); Action afterWorkCallback = AfterWork(worker, callback);
if (worker is BaseWorkerWithContext<MangaContext> mangaContextWorker) if (worker is BaseWorkerWithContext<MangaContext> mangaContextWorker)
{ {
mangaContextWorker.SetScope(ServiceProvider.CreateScope()); mangaContextWorker.SetScope(ServiceProvider.CreateScope());
RunningWorkers.TryAdd(mangaContextWorker, mangaContextWorker.DoWork(callBack)); RunningWorkers.TryAdd(mangaContextWorker, mangaContextWorker.DoWork(afterWorkCallback));
}else if (worker is BaseWorkerWithContext<NotificationsContext> notificationContextWorker) }else if (worker is BaseWorkerWithContext<NotificationsContext> notificationContextWorker)
{ {
notificationContextWorker.SetScope(ServiceProvider.CreateScope()); notificationContextWorker.SetScope(ServiceProvider.CreateScope());
RunningWorkers.TryAdd(notificationContextWorker, notificationContextWorker.DoWork(callBack)); RunningWorkers.TryAdd(notificationContextWorker, notificationContextWorker.DoWork(afterWorkCallback));
}else if (worker is BaseWorkerWithContext<LibraryContext> libraryContextWorker) }else if (worker is BaseWorkerWithContext<LibraryContext> libraryContextWorker)
{ {
libraryContextWorker.SetScope(ServiceProvider.CreateScope()); libraryContextWorker.SetScope(ServiceProvider.CreateScope());
RunningWorkers.TryAdd(libraryContextWorker, libraryContextWorker.DoWork(callBack)); RunningWorkers.TryAdd(libraryContextWorker, libraryContextWorker.DoWork(afterWorkCallback));
}else }else
RunningWorkers.TryAdd(worker, worker.DoWork(callBack)); RunningWorkers.TryAdd(worker, worker.DoWork(afterWorkCallback));
} }
private static Action AfterWork(BaseWorker worker, Action? callback) => () => private static Action AfterWork(BaseWorker worker, Action? callback = null) => () =>
{ {
Log.Debug($"AfterWork {worker}"); Log.Debug($"AfterWork {worker}");
RunningWorkers.Remove(worker, out _); RunningWorkers.Remove(worker, out _);

View File

@@ -23,7 +23,7 @@ public abstract class BaseWorker : Identifiable
public IEnumerable<BaseWorker> MissingDependencies => DependsOn.Where(d => d.State < WorkerExecutionState.Completed); public IEnumerable<BaseWorker> MissingDependencies => DependsOn.Where(d => d.State < WorkerExecutionState.Completed);
public bool AllDependenciesFulfilled => DependsOn.All(d => d.State >= WorkerExecutionState.Completed); public bool AllDependenciesFulfilled => DependsOn.All(d => d.State >= WorkerExecutionState.Completed);
internal WorkerExecutionState State { get; private set; } internal WorkerExecutionState State { get; private set; }
private static readonly CancellationTokenSource CancellationTokenSource = new(TimeSpan.FromMinutes(10)); private CancellationTokenSource? CancellationTokenSource = null;
protected ILog Log { get; init; } protected ILog Log { get; init; }
/// <summary> /// <summary>
@@ -33,7 +33,7 @@ public abstract class BaseWorker : Identifiable
{ {
Log.Debug($"Cancelled {this}"); Log.Debug($"Cancelled {this}");
this.State = WorkerExecutionState.Cancelled; this.State = WorkerExecutionState.Cancelled;
CancellationTokenSource.Cancel(); CancellationTokenSource?.Cancel();
} }
/// <summary> /// <summary>
@@ -43,7 +43,7 @@ public abstract class BaseWorker : Identifiable
{ {
Log.Debug($"Failed {this}"); Log.Debug($"Failed {this}");
this.State = WorkerExecutionState.Failed; this.State = WorkerExecutionState.Failed;
CancellationTokenSource.Cancel(); CancellationTokenSource?.Cancel();
} }
public BaseWorker(IEnumerable<BaseWorker>? dependsOn = null) public BaseWorker(IEnumerable<BaseWorker>? dependsOn = null)
@@ -70,9 +70,12 @@ public abstract class BaseWorker : Identifiable
/// </returns> /// </returns>
public Task<BaseWorker[]> DoWork(Action? callback = null) public Task<BaseWorker[]> DoWork(Action? callback = null)
{ {
// Start the worker
Log.Debug($"Checking {this}"); Log.Debug($"Checking {this}");
this.CancellationTokenSource = new(TimeSpan.FromMinutes(10));
this.State = WorkerExecutionState.Waiting; this.State = WorkerExecutionState.Waiting;
// Wait for dependencies, start them if necessary
BaseWorker[] missingDependenciesThatNeedStarting = MissingDependencies.Where(d => d.State < WorkerExecutionState.Waiting).ToArray(); BaseWorker[] missingDependenciesThatNeedStarting = MissingDependencies.Where(d => d.State < WorkerExecutionState.Waiting).ToArray();
if(missingDependenciesThatNeedStarting.Any()) if(missingDependenciesThatNeedStarting.Any())
return new Task<BaseWorker[]>(() => missingDependenciesThatNeedStarting); return new Task<BaseWorker[]>(() => missingDependenciesThatNeedStarting);
@@ -80,29 +83,32 @@ public abstract class BaseWorker : Identifiable
if (MissingDependencies.Any()) if (MissingDependencies.Any())
return new Task<BaseWorker[]>(WaitForDependencies); return new Task<BaseWorker[]>(WaitForDependencies);
// Run the actual work
Log.Info($"Running {this}"); Log.Info($"Running {this}");
DateTime startTime = DateTime.UtcNow; DateTime startTime = DateTime.UtcNow;
Task<BaseWorker[]> task = new (DoWorkInternal, CancellationTokenSource.Token); Task<BaseWorker[]> task = new (DoWorkInternal, CancellationTokenSource.Token);
task.GetAwaiter().OnCompleted(() => task.GetAwaiter().OnCompleted(Finish(startTime, callback));
{
DateTime endTime = DateTime.UtcNow;
Log.Info($"Completed {this}\n\t{endTime.Subtract(startTime).TotalMilliseconds} ms");
this.State = WorkerExecutionState.Completed;
if(this is IPeriodic periodic)
periodic.LastExecution = DateTime.UtcNow;
});
task.Start();
this.State = WorkerExecutionState.Running; this.State = WorkerExecutionState.Running;
callback?.Invoke(); task.Start();
return task; return task;
} }
private Action Finish(DateTime startTime, Action? callback = null) => () =>
{
DateTime endTime = DateTime.UtcNow;
Log.Info($"Completed {this}\n\t{endTime.Subtract(startTime).TotalMilliseconds} ms");
this.State = WorkerExecutionState.Completed;
if(this is IPeriodic periodic)
periodic.LastExecution = DateTime.UtcNow;
callback?.Invoke();
};
protected abstract BaseWorker[] DoWorkInternal(); protected abstract BaseWorker[] DoWorkInternal();
private BaseWorker[] WaitForDependencies() private BaseWorker[] WaitForDependencies()
{ {
Log.Info($"Waiting for {MissingDependencies.Count()} Dependencies {this}:\n\t{string.Join("\n\t", MissingDependencies.Select(d => d.ToString()))}"); Log.Info($"Waiting for {MissingDependencies.Count()} Dependencies {this}:\n\t{string.Join("\n\t", MissingDependencies.Select(d => d.ToString()))}");
while (CancellationTokenSource.IsCancellationRequested == false && MissingDependencies.Any()) while (CancellationTokenSource?.IsCancellationRequested == false && MissingDependencies.Any())
{ {
Thread.Sleep(Tranga.Settings.WorkCycleTimeoutMs); Thread.Sleep(Tranga.Settings.WorkCycleTimeoutMs);
} }