diff --git a/API/Controllers/WorkerController.cs b/API/Controllers/WorkerController.cs index 42aaa6b..3df961e 100644 --- a/API/Controllers/WorkerController.cs +++ b/API/Controllers/WorkerController.cs @@ -20,7 +20,7 @@ public class WorkerController() : Controller [ProducesResponseType(Status200OK, "application/json")] public IActionResult GetAllWorkers() { - return Ok(Tranga.AllWorkers.Select(w => w.Key).ToArray()); + return Ok(Tranga.GetRunningWorkers().Select(w => w.Key).ToArray()); } /// @@ -32,7 +32,7 @@ public class WorkerController() : Controller [ProducesResponseType(Status200OK, "application/json")] public IActionResult GetWorkers([FromBody]string[] WorkerIds) { - return Ok(Tranga.AllWorkers.Where(worker => WorkerIds.Contains(worker.Key)).ToArray()); + return Ok(Tranga.GetRunningWorkers().Where(worker => WorkerIds.Contains(worker.Key)).ToArray()); } /// @@ -44,7 +44,7 @@ public class WorkerController() : Controller [ProducesResponseType(Status200OK, "application/json")] public IActionResult GetWorkersInState(WorkerExecutionState State) { - return Ok(Tranga.AllWorkers.Where(worker => worker.State == State).ToArray()); + return Ok(Tranga.GetRunningWorkers().Where(worker => worker.State == State).ToArray()); } /// @@ -58,7 +58,7 @@ public class WorkerController() : Controller [ProducesResponseType(Status404NotFound)] public IActionResult GetWorker(string WorkerId) { - if(Tranga.AllWorkers.FirstOrDefault(w => w.Key == WorkerId) is not { } worker) + if(Tranga.GetRunningWorkers().FirstOrDefault(w => w.Key == WorkerId) is not { } worker) return NotFound(nameof(WorkerId)); return Ok(worker); } @@ -74,39 +74,12 @@ public class WorkerController() : Controller [ProducesResponseType(Status404NotFound)] public IActionResult DeleteWorker(string WorkerId) { - if(Tranga.AllWorkers.FirstOrDefault(w => w.Key == WorkerId) is not { } worker) + if(Tranga.GetRunningWorkers().FirstOrDefault(w => w.Key == WorkerId) is not { } worker) return NotFound(nameof(WorkerId)); - Tranga.RemoveWorker(worker); + Tranga.StopWorker(worker); return Ok(); } - /// - /// Modify with - /// - /// .Key - /// Fields to modify, set to null to keep previous value - /// - /// - /// with could not be found - /// is not , can not modify - [HttpPatch("{WorkerId}")] - [ProducesResponseType(Status202Accepted, "application/json")] - [ProducesResponseType(Status400BadRequest)] - [ProducesResponseType(Status404NotFound)] - [ProducesResponseType(Status409Conflict, "text/plain")] - public IActionResult ModifyWorker(string WorkerId, [FromBody]ModifyWorkerRecord modifyWorkerRecord) - { - if(Tranga.AllWorkers.FirstOrDefault(w => w.Key == WorkerId) is not { } worker) - return NotFound(nameof(WorkerId)); - - if(modifyWorkerRecord.IntervalMs is not null && worker is not IPeriodic) - return Conflict("Can not modify Interval of non-Periodic worker"); - else if(modifyWorkerRecord.IntervalMs is not null && worker is IPeriodic periodic) - periodic.Interval = TimeSpan.FromMilliseconds((long)modifyWorkerRecord.IntervalMs); - - return Accepted(worker); - } - /// /// Starts with /// @@ -120,13 +93,13 @@ public class WorkerController() : Controller [ProducesResponseType(Status412PreconditionFailed, "text/plain")] public IActionResult StartWorker(string WorkerId) { - if(Tranga.AllWorkers.FirstOrDefault(w => w.Key == WorkerId) is not { } worker) + if(Tranga.GetRunningWorkers().FirstOrDefault(w => w.Key == WorkerId) is not { } worker) return NotFound(nameof(WorkerId)); if (worker.State >= WorkerExecutionState.Waiting) return StatusCode(Status412PreconditionFailed, "Already running"); - Tranga.MarkWorkerForStart(worker); + Tranga.StartWorker(worker); return Ok(); } @@ -141,7 +114,7 @@ public class WorkerController() : Controller [ProducesResponseType(Status501NotImplemented)] public IActionResult StopWorker(string WorkerId) { - if(Tranga.AllWorkers.FirstOrDefault(w => w.Key == WorkerId) is not { } worker) + if(Tranga.GetRunningWorkers().FirstOrDefault(w => w.Key == WorkerId) is not { } worker) return NotFound(nameof(WorkerId)); if(worker.State is < WorkerExecutionState.Running or >= WorkerExecutionState.Completed) diff --git a/API/Program.cs b/API/Program.cs index 8a4301d..4a53e16 100644 --- a/API/Program.cs +++ b/API/Program.cs @@ -135,12 +135,10 @@ using (IServiceScope scope = app.Services.CreateScope()) context.Sync(); } +Tranga.SetServiceProvider(app.Services); Tranga.StartLogger(); - Tranga.AddDefaultWorkers(); -Tranga.PeriodicWorkerStarterThread.Start(app.Services); - app.UseCors("AllowAll"); app.Run(); \ No newline at end of file diff --git a/API/Tranga.cs b/API/Tranga.cs index 1fae851..721e2ba 100644 --- a/API/Tranga.cs +++ b/API/Tranga.cs @@ -1,4 +1,5 @@ -using System.Diagnostics.CodeAnalysis; +using System.Collections.Concurrent; +using System.Diagnostics.CodeAnalysis; using API.MangaConnectors; using API.Schema.LibraryContext; using API.Schema.MangaContext; @@ -23,7 +24,8 @@ public static class Tranga " |___| |__| |___._||__|__||___ ||___._|\n" + " |_____| \n\n"; - public static Thread PeriodicWorkerStarterThread { get; } = new (WorkerStarter); + private static IServiceProvider? ServiceProvider; + private static readonly ILog Log = LogManager.GetLogger(typeof(Tranga)); internal static readonly MetadataFetcher[] MetadataFetchers = [new MyAnimeList()]; internal static readonly MangaConnector[] MangaConnectors = [new Global(), new MangaDex(), new ComickIo()]; @@ -57,6 +59,11 @@ public static class Tranga AddWorker(UpdateCoversWorker); } + internal static void SetServiceProvider(IServiceProvider serviceProvider) + { + ServiceProvider = serviceProvider; + } + internal static bool TryGetMangaConnector(string name, [NotNullWhen(true)]out MangaConnector? mangaConnector) { mangaConnector = @@ -64,115 +71,84 @@ public static class Tranga return mangaConnector != null; } - internal static HashSet AllWorkers { get; private set; } = new (); + internal static readonly Dictionary PeriodicWorkers = new (); public static void AddWorker(BaseWorker worker) { - Log.Debug($"Adding {worker} to AllWorkers."); - AllWorkers.Add(worker); + Log.Debug($"Adding {worker}"); + StartWorker(worker); + if(worker is IPeriodic periodic) + AddPeriodicWorker(worker, periodic); } + + private static void AddPeriodicWorker(BaseWorker worker, IPeriodic periodic) + { + Log.Debug($"Adding Periodic {worker}"); + PeriodicWorkers.Add((worker as IPeriodic)!, PeriodicTask(worker, periodic)); + PeriodicWorkers[(worker as IPeriodic)!].Start(); + } + + private static Task PeriodicTask(BaseWorker worker, IPeriodic periodic) => new (() => + { + Log.Debug($"Waiting {periodic.Interval} for {worker}"); + Thread.Sleep(periodic.Interval); + StartWorker(worker, RefreshTask(worker, periodic)); + }); + + private static Action RefreshTask(BaseWorker worker, IPeriodic periodic) => () => + { + if (worker.State < WorkerExecutionState.Created) //Failed + return; + PeriodicWorkers[(worker as IPeriodic)!] = PeriodicTask(worker, periodic); + PeriodicWorkers[(worker as IPeriodic)!].Start(); + }; + public static void AddWorkers(IEnumerable workers) { foreach (BaseWorker baseWorker in workers) - { AddWorker(baseWorker); - } - } - - public static void RemoveWorker(BaseWorker removeWorker) - { - IEnumerable baseWorkers = AllWorkers.Where(w => w.DependenciesAndSelf.Any(worker => worker == removeWorker)); - - foreach (BaseWorker worker in baseWorkers) - { - StopWorker(worker); - Log.Debug($"Removing {removeWorker} from AllWorkers."); - AllWorkers.Remove(worker); - } } - private static readonly Dictionary> RunningWorkers = new(); + private static readonly ConcurrentDictionary> RunningWorkers = new(); public static BaseWorker[] GetRunningWorkers() => RunningWorkers.Keys.ToArray(); - private static readonly HashSet StartWorkers = new(); - private static void WorkerStarter(object? serviceProviderObj) + + internal static void StartWorker(BaseWorker worker, Action? callback = null) { - Log.Info("WorkerStarter Thread running."); - if (serviceProviderObj is null) + Log.Debug($"Starting {worker}"); + if (ServiceProvider is null) { - Log.Error("serviceProviderObj is null"); + Log.Fatal("ServiceProvider is null"); return; } - IServiceProvider serviceProvider = (IServiceProvider)serviceProviderObj; - - while (true) + Action callBack = AfterWork(worker, callback); + if (worker is BaseWorkerWithContext mangaContextWorker) { - CheckRunningWorkers(); - - foreach (BaseWorker baseWorker in AllWorkers.DueWorkers().ToArray()) - StartWorkers.Add(baseWorker); - - foreach (BaseWorker worker in StartWorkers.ToArray()) - { - if(RunningWorkers.ContainsKey(worker)) - continue; - if (worker is BaseWorkerWithContext mangaContextWorker) - { - mangaContextWorker.SetScope(serviceProvider.CreateScope()); - RunningWorkers.Add(mangaContextWorker, mangaContextWorker.DoWork()); - }else if (worker is BaseWorkerWithContext notificationContextWorker) - { - notificationContextWorker.SetScope(serviceProvider.CreateScope()); - RunningWorkers.Add(notificationContextWorker, notificationContextWorker.DoWork()); - }else if (worker is BaseWorkerWithContext libraryContextWorker) - { - libraryContextWorker.SetScope(serviceProvider.CreateScope()); - RunningWorkers.Add(libraryContextWorker, libraryContextWorker.DoWork()); - }else - RunningWorkers.Add(worker, worker.DoWork()); - - StartWorkers.Remove(worker); - } - Thread.Sleep(Settings.WorkCycleTimeoutMs); - } + mangaContextWorker.SetScope(ServiceProvider.CreateScope()); + RunningWorkers.TryAdd(mangaContextWorker, mangaContextWorker.DoWork(callBack)); + }else if (worker is BaseWorkerWithContext notificationContextWorker) + { + notificationContextWorker.SetScope(ServiceProvider.CreateScope()); + RunningWorkers.TryAdd(notificationContextWorker, notificationContextWorker.DoWork(callBack)); + }else if (worker is BaseWorkerWithContext libraryContextWorker) + { + libraryContextWorker.SetScope(ServiceProvider.CreateScope()); + RunningWorkers.TryAdd(libraryContextWorker, libraryContextWorker.DoWork(callBack)); + }else + RunningWorkers.TryAdd(worker, worker.DoWork(callBack)); } - private static void CheckRunningWorkers() + private static Action AfterWork(BaseWorker worker, Action? callback) => () => { - KeyValuePair>[] done = RunningWorkers.Where(kv => kv.Value.IsCompleted).ToArray(); - if (done.Length < 1) - return; - Log.Info($"Done: {done.Length}"); - Log.Debug(string.Join("\n", done.Select(d => d.Key.ToString()))); - foreach ((BaseWorker worker, Task task) in done) - { - RunningWorkers.Remove(worker); - foreach (BaseWorker newWorker in task.Result) - AllWorkers.Add(newWorker); - if (worker is not IPeriodic) - AllWorkers.Remove(worker); - task.Dispose(); - } - } - - private static IEnumerable DueWorkers(this IEnumerable workers) - { - return workers.Where(worker => - { - if (worker.State is >= WorkerExecutionState.Running and < WorkerExecutionState.Completed) - return false; - if (worker is IPeriodic periodicWorker) - return periodicWorker.IsDue; - return true; - }); - } - - internal static void MarkWorkerForStart(BaseWorker worker) => StartWorkers.Add(worker); + RunningWorkers.Remove(worker, out _); + callback?.Invoke(); + }; internal static void StopWorker(BaseWorker worker) { - StartWorkers.Remove(worker); + if(worker is IPeriodic periodicWorker) + PeriodicWorkers.Remove(periodicWorker); worker.Cancel(); - RunningWorkers.Remove(worker); + RunningWorkers.Remove(worker, out _); } internal static bool AddMangaToContext((Manga, MangaConnectorId) addManga, MangaContext context, [NotNullWhen(true)]out Manga? manga) => AddMangaToContext(addManga.Item1, addManga.Item2, context, out manga); diff --git a/API/Workers/BaseWorker.cs b/API/Workers/BaseWorker.cs index 2be3601..68f43db 100644 --- a/API/Workers/BaseWorker.cs +++ b/API/Workers/BaseWorker.cs @@ -68,7 +68,7 @@ public abstract class BaseWorker : Identifiable /// If has run, additional . /// /// - public Task DoWork() + public Task DoWork(Action? callback = null) { Log.Debug($"Checking {this}"); this.State = WorkerExecutionState.Waiting; @@ -93,6 +93,7 @@ public abstract class BaseWorker : Identifiable }); task.Start(); this.State = WorkerExecutionState.Running; + callback?.Invoke(); return task; }