Do not use a Thread to Periodically check for Due workers.

Each Periodic Worker has it's own Thread that waits for execution.
This commit is contained in:
2025-07-21 13:45:39 +02:00
parent fab70501ce
commit 64b89d4537
4 changed files with 76 additions and 128 deletions

View File

@@ -1,4 +1,5 @@
using System.Diagnostics.CodeAnalysis;
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using API.MangaConnectors;
using API.Schema.LibraryContext;
using API.Schema.MangaContext;
@@ -23,7 +24,8 @@ public static class Tranga
" |___| |__| |___._||__|__||___ ||___._|\n" +
" |_____| \n\n";
public static Thread PeriodicWorkerStarterThread { get; } = new (WorkerStarter);
private static IServiceProvider? ServiceProvider;
private static readonly ILog Log = LogManager.GetLogger(typeof(Tranga));
internal static readonly MetadataFetcher[] MetadataFetchers = [new MyAnimeList()];
internal static readonly MangaConnector[] MangaConnectors = [new Global(), new MangaDex(), new ComickIo()];
@@ -57,6 +59,11 @@ public static class Tranga
AddWorker(UpdateCoversWorker);
}
internal static void SetServiceProvider(IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
}
internal static bool TryGetMangaConnector(string name, [NotNullWhen(true)]out MangaConnector? mangaConnector)
{
mangaConnector =
@@ -64,115 +71,84 @@ public static class Tranga
return mangaConnector != null;
}
internal static HashSet<BaseWorker> AllWorkers { get; private set; } = new ();
internal static readonly Dictionary<IPeriodic, Task> PeriodicWorkers = new ();
public static void AddWorker(BaseWorker worker)
{
Log.Debug($"Adding {worker} to AllWorkers.");
AllWorkers.Add(worker);
Log.Debug($"Adding {worker}");
StartWorker(worker);
if(worker is IPeriodic periodic)
AddPeriodicWorker(worker, periodic);
}
private static void AddPeriodicWorker(BaseWorker worker, IPeriodic periodic)
{
Log.Debug($"Adding Periodic {worker}");
PeriodicWorkers.Add((worker as IPeriodic)!, PeriodicTask(worker, periodic));
PeriodicWorkers[(worker as IPeriodic)!].Start();
}
private static Task PeriodicTask(BaseWorker worker, IPeriodic periodic) => new (() =>
{
Log.Debug($"Waiting {periodic.Interval} for {worker}");
Thread.Sleep(periodic.Interval);
StartWorker(worker, RefreshTask(worker, periodic));
});
private static Action RefreshTask(BaseWorker worker, IPeriodic periodic) => () =>
{
if (worker.State < WorkerExecutionState.Created) //Failed
return;
PeriodicWorkers[(worker as IPeriodic)!] = PeriodicTask(worker, periodic);
PeriodicWorkers[(worker as IPeriodic)!].Start();
};
public static void AddWorkers(IEnumerable<BaseWorker> workers)
{
foreach (BaseWorker baseWorker in workers)
{
AddWorker(baseWorker);
}
}
public static void RemoveWorker(BaseWorker removeWorker)
{
IEnumerable<BaseWorker> baseWorkers = AllWorkers.Where(w => w.DependenciesAndSelf.Any(worker => worker == removeWorker));
foreach (BaseWorker worker in baseWorkers)
{
StopWorker(worker);
Log.Debug($"Removing {removeWorker} from AllWorkers.");
AllWorkers.Remove(worker);
}
}
private static readonly Dictionary<BaseWorker, Task<BaseWorker[]>> RunningWorkers = new();
private static readonly ConcurrentDictionary<BaseWorker, Task<BaseWorker[]>> RunningWorkers = new();
public static BaseWorker[] GetRunningWorkers() => RunningWorkers.Keys.ToArray();
private static readonly HashSet<BaseWorker> StartWorkers = new();
private static void WorkerStarter(object? serviceProviderObj)
internal static void StartWorker(BaseWorker worker, Action? callback = null)
{
Log.Info("WorkerStarter Thread running.");
if (serviceProviderObj is null)
Log.Debug($"Starting {worker}");
if (ServiceProvider is null)
{
Log.Error("serviceProviderObj is null");
Log.Fatal("ServiceProvider is null");
return;
}
IServiceProvider serviceProvider = (IServiceProvider)serviceProviderObj;
while (true)
Action callBack = AfterWork(worker, callback);
if (worker is BaseWorkerWithContext<MangaContext> mangaContextWorker)
{
CheckRunningWorkers();
foreach (BaseWorker baseWorker in AllWorkers.DueWorkers().ToArray())
StartWorkers.Add(baseWorker);
foreach (BaseWorker worker in StartWorkers.ToArray())
{
if(RunningWorkers.ContainsKey(worker))
continue;
if (worker is BaseWorkerWithContext<MangaContext> mangaContextWorker)
{
mangaContextWorker.SetScope(serviceProvider.CreateScope());
RunningWorkers.Add(mangaContextWorker, mangaContextWorker.DoWork());
}else if (worker is BaseWorkerWithContext<NotificationsContext> notificationContextWorker)
{
notificationContextWorker.SetScope(serviceProvider.CreateScope());
RunningWorkers.Add(notificationContextWorker, notificationContextWorker.DoWork());
}else if (worker is BaseWorkerWithContext<LibraryContext> libraryContextWorker)
{
libraryContextWorker.SetScope(serviceProvider.CreateScope());
RunningWorkers.Add(libraryContextWorker, libraryContextWorker.DoWork());
}else
RunningWorkers.Add(worker, worker.DoWork());
StartWorkers.Remove(worker);
}
Thread.Sleep(Settings.WorkCycleTimeoutMs);
}
mangaContextWorker.SetScope(ServiceProvider.CreateScope());
RunningWorkers.TryAdd(mangaContextWorker, mangaContextWorker.DoWork(callBack));
}else if (worker is BaseWorkerWithContext<NotificationsContext> notificationContextWorker)
{
notificationContextWorker.SetScope(ServiceProvider.CreateScope());
RunningWorkers.TryAdd(notificationContextWorker, notificationContextWorker.DoWork(callBack));
}else if (worker is BaseWorkerWithContext<LibraryContext> libraryContextWorker)
{
libraryContextWorker.SetScope(ServiceProvider.CreateScope());
RunningWorkers.TryAdd(libraryContextWorker, libraryContextWorker.DoWork(callBack));
}else
RunningWorkers.TryAdd(worker, worker.DoWork(callBack));
}
private static void CheckRunningWorkers()
private static Action AfterWork(BaseWorker worker, Action? callback) => () =>
{
KeyValuePair<BaseWorker, Task<BaseWorker[]>>[] done = RunningWorkers.Where(kv => kv.Value.IsCompleted).ToArray();
if (done.Length < 1)
return;
Log.Info($"Done: {done.Length}");
Log.Debug(string.Join("\n", done.Select(d => d.Key.ToString())));
foreach ((BaseWorker worker, Task<BaseWorker[]> task) in done)
{
RunningWorkers.Remove(worker);
foreach (BaseWorker newWorker in task.Result)
AllWorkers.Add(newWorker);
if (worker is not IPeriodic)
AllWorkers.Remove(worker);
task.Dispose();
}
}
private static IEnumerable<BaseWorker> DueWorkers(this IEnumerable<BaseWorker> workers)
{
return workers.Where(worker =>
{
if (worker.State is >= WorkerExecutionState.Running and < WorkerExecutionState.Completed)
return false;
if (worker is IPeriodic periodicWorker)
return periodicWorker.IsDue;
return true;
});
}
internal static void MarkWorkerForStart(BaseWorker worker) => StartWorkers.Add(worker);
RunningWorkers.Remove(worker, out _);
callback?.Invoke();
};
internal static void StopWorker(BaseWorker worker)
{
StartWorkers.Remove(worker);
if(worker is IPeriodic periodicWorker)
PeriodicWorkers.Remove(periodicWorker);
worker.Cancel();
RunningWorkers.Remove(worker);
RunningWorkers.Remove(worker, out _);
}
internal static bool AddMangaToContext((Manga, MangaConnectorId<Manga>) addManga, MangaContext context, [NotNullWhen(true)]out Manga? manga) => AddMangaToContext(addManga.Item1, addManga.Item2, context, out manga);