mirror of
https://github.com/C9Glax/tranga.git
synced 2025-09-10 11:58:19 +02:00
Do not use a Thread to Periodically check for Due workers.
Each Periodic Worker has it's own Thread that waits for execution.
This commit is contained in:
@@ -20,7 +20,7 @@ public class WorkerController() : Controller
|
|||||||
[ProducesResponseType<string[]>(Status200OK, "application/json")]
|
[ProducesResponseType<string[]>(Status200OK, "application/json")]
|
||||||
public IActionResult GetAllWorkers()
|
public IActionResult GetAllWorkers()
|
||||||
{
|
{
|
||||||
return Ok(Tranga.AllWorkers.Select(w => w.Key).ToArray());
|
return Ok(Tranga.GetRunningWorkers().Select(w => w.Key).ToArray());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -32,7 +32,7 @@ public class WorkerController() : Controller
|
|||||||
[ProducesResponseType<BaseWorker[]>(Status200OK, "application/json")]
|
[ProducesResponseType<BaseWorker[]>(Status200OK, "application/json")]
|
||||||
public IActionResult GetWorkers([FromBody]string[] WorkerIds)
|
public IActionResult GetWorkers([FromBody]string[] WorkerIds)
|
||||||
{
|
{
|
||||||
return Ok(Tranga.AllWorkers.Where(worker => WorkerIds.Contains(worker.Key)).ToArray());
|
return Ok(Tranga.GetRunningWorkers().Where(worker => WorkerIds.Contains(worker.Key)).ToArray());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -44,7 +44,7 @@ public class WorkerController() : Controller
|
|||||||
[ProducesResponseType<BaseWorker[]>(Status200OK, "application/json")]
|
[ProducesResponseType<BaseWorker[]>(Status200OK, "application/json")]
|
||||||
public IActionResult GetWorkersInState(WorkerExecutionState State)
|
public IActionResult GetWorkersInState(WorkerExecutionState State)
|
||||||
{
|
{
|
||||||
return Ok(Tranga.AllWorkers.Where(worker => worker.State == State).ToArray());
|
return Ok(Tranga.GetRunningWorkers().Where(worker => worker.State == State).ToArray());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -58,7 +58,7 @@ public class WorkerController() : Controller
|
|||||||
[ProducesResponseType(Status404NotFound)]
|
[ProducesResponseType(Status404NotFound)]
|
||||||
public IActionResult GetWorker(string WorkerId)
|
public IActionResult GetWorker(string WorkerId)
|
||||||
{
|
{
|
||||||
if(Tranga.AllWorkers.FirstOrDefault(w => w.Key == WorkerId) is not { } worker)
|
if(Tranga.GetRunningWorkers().FirstOrDefault(w => w.Key == WorkerId) is not { } worker)
|
||||||
return NotFound(nameof(WorkerId));
|
return NotFound(nameof(WorkerId));
|
||||||
return Ok(worker);
|
return Ok(worker);
|
||||||
}
|
}
|
||||||
@@ -74,39 +74,12 @@ public class WorkerController() : Controller
|
|||||||
[ProducesResponseType(Status404NotFound)]
|
[ProducesResponseType(Status404NotFound)]
|
||||||
public IActionResult DeleteWorker(string WorkerId)
|
public IActionResult DeleteWorker(string WorkerId)
|
||||||
{
|
{
|
||||||
if(Tranga.AllWorkers.FirstOrDefault(w => w.Key == WorkerId) is not { } worker)
|
if(Tranga.GetRunningWorkers().FirstOrDefault(w => w.Key == WorkerId) is not { } worker)
|
||||||
return NotFound(nameof(WorkerId));
|
return NotFound(nameof(WorkerId));
|
||||||
Tranga.RemoveWorker(worker);
|
Tranga.StopWorker(worker);
|
||||||
return Ok();
|
return Ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Modify <see cref="BaseWorker"/> with <paramref name="WorkerId"/>
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="WorkerId"><see cref="BaseWorker"/>.Key</param>
|
|
||||||
/// <param name="modifyWorkerRecord">Fields to modify, set to null to keep previous value</param>
|
|
||||||
/// <response code="202"></response>
|
|
||||||
/// <response code="400"></response>
|
|
||||||
/// <response code="404"><see cref="BaseWorker"/> with <paramref name="WorkerId"/> could not be found</response>
|
|
||||||
/// <response code="409"><see cref="BaseWorker"/> is not <see cref="IPeriodic"/>, can not modify <paramref name="modifyWorkerRecord.IntervalMs"/></response>
|
|
||||||
[HttpPatch("{WorkerId}")]
|
|
||||||
[ProducesResponseType<BaseWorker>(Status202Accepted, "application/json")]
|
|
||||||
[ProducesResponseType(Status400BadRequest)]
|
|
||||||
[ProducesResponseType(Status404NotFound)]
|
|
||||||
[ProducesResponseType<string>(Status409Conflict, "text/plain")]
|
|
||||||
public IActionResult ModifyWorker(string WorkerId, [FromBody]ModifyWorkerRecord modifyWorkerRecord)
|
|
||||||
{
|
|
||||||
if(Tranga.AllWorkers.FirstOrDefault(w => w.Key == WorkerId) is not { } worker)
|
|
||||||
return NotFound(nameof(WorkerId));
|
|
||||||
|
|
||||||
if(modifyWorkerRecord.IntervalMs is not null && worker is not IPeriodic)
|
|
||||||
return Conflict("Can not modify Interval of non-Periodic worker");
|
|
||||||
else if(modifyWorkerRecord.IntervalMs is not null && worker is IPeriodic periodic)
|
|
||||||
periodic.Interval = TimeSpan.FromMilliseconds((long)modifyWorkerRecord.IntervalMs);
|
|
||||||
|
|
||||||
return Accepted(worker);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Starts <see cref="BaseWorker"/> with <paramref name="WorkerId"/>
|
/// Starts <see cref="BaseWorker"/> with <paramref name="WorkerId"/>
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@@ -120,13 +93,13 @@ public class WorkerController() : Controller
|
|||||||
[ProducesResponseType<string>(Status412PreconditionFailed, "text/plain")]
|
[ProducesResponseType<string>(Status412PreconditionFailed, "text/plain")]
|
||||||
public IActionResult StartWorker(string WorkerId)
|
public IActionResult StartWorker(string WorkerId)
|
||||||
{
|
{
|
||||||
if(Tranga.AllWorkers.FirstOrDefault(w => w.Key == WorkerId) is not { } worker)
|
if(Tranga.GetRunningWorkers().FirstOrDefault(w => w.Key == WorkerId) is not { } worker)
|
||||||
return NotFound(nameof(WorkerId));
|
return NotFound(nameof(WorkerId));
|
||||||
|
|
||||||
if (worker.State >= WorkerExecutionState.Waiting)
|
if (worker.State >= WorkerExecutionState.Waiting)
|
||||||
return StatusCode(Status412PreconditionFailed, "Already running");
|
return StatusCode(Status412PreconditionFailed, "Already running");
|
||||||
|
|
||||||
Tranga.MarkWorkerForStart(worker);
|
Tranga.StartWorker(worker);
|
||||||
return Ok();
|
return Ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -141,7 +114,7 @@ public class WorkerController() : Controller
|
|||||||
[ProducesResponseType(Status501NotImplemented)]
|
[ProducesResponseType(Status501NotImplemented)]
|
||||||
public IActionResult StopWorker(string WorkerId)
|
public IActionResult StopWorker(string WorkerId)
|
||||||
{
|
{
|
||||||
if(Tranga.AllWorkers.FirstOrDefault(w => w.Key == WorkerId) is not { } worker)
|
if(Tranga.GetRunningWorkers().FirstOrDefault(w => w.Key == WorkerId) is not { } worker)
|
||||||
return NotFound(nameof(WorkerId));
|
return NotFound(nameof(WorkerId));
|
||||||
|
|
||||||
if(worker.State is < WorkerExecutionState.Running or >= WorkerExecutionState.Completed)
|
if(worker.State is < WorkerExecutionState.Running or >= WorkerExecutionState.Completed)
|
||||||
|
@@ -135,12 +135,10 @@ using (IServiceScope scope = app.Services.CreateScope())
|
|||||||
context.Sync();
|
context.Sync();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Tranga.SetServiceProvider(app.Services);
|
||||||
Tranga.StartLogger();
|
Tranga.StartLogger();
|
||||||
|
|
||||||
Tranga.AddDefaultWorkers();
|
Tranga.AddDefaultWorkers();
|
||||||
|
|
||||||
Tranga.PeriodicWorkerStarterThread.Start(app.Services);
|
|
||||||
|
|
||||||
app.UseCors("AllowAll");
|
app.UseCors("AllowAll");
|
||||||
|
|
||||||
app.Run();
|
app.Run();
|
152
API/Tranga.cs
152
API/Tranga.cs
@@ -1,4 +1,5 @@
|
|||||||
using System.Diagnostics.CodeAnalysis;
|
using System.Collections.Concurrent;
|
||||||
|
using System.Diagnostics.CodeAnalysis;
|
||||||
using API.MangaConnectors;
|
using API.MangaConnectors;
|
||||||
using API.Schema.LibraryContext;
|
using API.Schema.LibraryContext;
|
||||||
using API.Schema.MangaContext;
|
using API.Schema.MangaContext;
|
||||||
@@ -23,7 +24,8 @@ public static class Tranga
|
|||||||
" |___| |__| |___._||__|__||___ ||___._|\n" +
|
" |___| |__| |___._||__|__||___ ||___._|\n" +
|
||||||
" |_____| \n\n";
|
" |_____| \n\n";
|
||||||
|
|
||||||
public static Thread PeriodicWorkerStarterThread { get; } = new (WorkerStarter);
|
private static IServiceProvider? ServiceProvider;
|
||||||
|
|
||||||
private static readonly ILog Log = LogManager.GetLogger(typeof(Tranga));
|
private static readonly ILog Log = LogManager.GetLogger(typeof(Tranga));
|
||||||
internal static readonly MetadataFetcher[] MetadataFetchers = [new MyAnimeList()];
|
internal static readonly MetadataFetcher[] MetadataFetchers = [new MyAnimeList()];
|
||||||
internal static readonly MangaConnector[] MangaConnectors = [new Global(), new MangaDex(), new ComickIo()];
|
internal static readonly MangaConnector[] MangaConnectors = [new Global(), new MangaDex(), new ComickIo()];
|
||||||
@@ -57,6 +59,11 @@ public static class Tranga
|
|||||||
AddWorker(UpdateCoversWorker);
|
AddWorker(UpdateCoversWorker);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
internal static void SetServiceProvider(IServiceProvider serviceProvider)
|
||||||
|
{
|
||||||
|
ServiceProvider = serviceProvider;
|
||||||
|
}
|
||||||
|
|
||||||
internal static bool TryGetMangaConnector(string name, [NotNullWhen(true)]out MangaConnector? mangaConnector)
|
internal static bool TryGetMangaConnector(string name, [NotNullWhen(true)]out MangaConnector? mangaConnector)
|
||||||
{
|
{
|
||||||
mangaConnector =
|
mangaConnector =
|
||||||
@@ -64,115 +71,84 @@ public static class Tranga
|
|||||||
return mangaConnector != null;
|
return mangaConnector != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
internal static HashSet<BaseWorker> AllWorkers { get; private set; } = new ();
|
internal static readonly Dictionary<IPeriodic, Task> PeriodicWorkers = new ();
|
||||||
|
|
||||||
public static void AddWorker(BaseWorker worker)
|
public static void AddWorker(BaseWorker worker)
|
||||||
{
|
{
|
||||||
Log.Debug($"Adding {worker} to AllWorkers.");
|
Log.Debug($"Adding {worker}");
|
||||||
AllWorkers.Add(worker);
|
StartWorker(worker);
|
||||||
|
if(worker is IPeriodic periodic)
|
||||||
|
AddPeriodicWorker(worker, periodic);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void AddPeriodicWorker(BaseWorker worker, IPeriodic periodic)
|
||||||
|
{
|
||||||
|
Log.Debug($"Adding Periodic {worker}");
|
||||||
|
PeriodicWorkers.Add((worker as IPeriodic)!, PeriodicTask(worker, periodic));
|
||||||
|
PeriodicWorkers[(worker as IPeriodic)!].Start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Task PeriodicTask(BaseWorker worker, IPeriodic periodic) => new (() =>
|
||||||
|
{
|
||||||
|
Log.Debug($"Waiting {periodic.Interval} for {worker}");
|
||||||
|
Thread.Sleep(periodic.Interval);
|
||||||
|
StartWorker(worker, RefreshTask(worker, periodic));
|
||||||
|
});
|
||||||
|
|
||||||
|
private static Action RefreshTask(BaseWorker worker, IPeriodic periodic) => () =>
|
||||||
|
{
|
||||||
|
if (worker.State < WorkerExecutionState.Created) //Failed
|
||||||
|
return;
|
||||||
|
PeriodicWorkers[(worker as IPeriodic)!] = PeriodicTask(worker, periodic);
|
||||||
|
PeriodicWorkers[(worker as IPeriodic)!].Start();
|
||||||
|
};
|
||||||
|
|
||||||
public static void AddWorkers(IEnumerable<BaseWorker> workers)
|
public static void AddWorkers(IEnumerable<BaseWorker> workers)
|
||||||
{
|
{
|
||||||
foreach (BaseWorker baseWorker in workers)
|
foreach (BaseWorker baseWorker in workers)
|
||||||
{
|
|
||||||
AddWorker(baseWorker);
|
AddWorker(baseWorker);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void RemoveWorker(BaseWorker removeWorker)
|
|
||||||
{
|
|
||||||
IEnumerable<BaseWorker> baseWorkers = AllWorkers.Where(w => w.DependenciesAndSelf.Any(worker => worker == removeWorker));
|
|
||||||
|
|
||||||
foreach (BaseWorker worker in baseWorkers)
|
|
||||||
{
|
|
||||||
StopWorker(worker);
|
|
||||||
Log.Debug($"Removing {removeWorker} from AllWorkers.");
|
|
||||||
AllWorkers.Remove(worker);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static readonly Dictionary<BaseWorker, Task<BaseWorker[]>> RunningWorkers = new();
|
private static readonly ConcurrentDictionary<BaseWorker, Task<BaseWorker[]>> RunningWorkers = new();
|
||||||
public static BaseWorker[] GetRunningWorkers() => RunningWorkers.Keys.ToArray();
|
public static BaseWorker[] GetRunningWorkers() => RunningWorkers.Keys.ToArray();
|
||||||
private static readonly HashSet<BaseWorker> StartWorkers = new();
|
|
||||||
private static void WorkerStarter(object? serviceProviderObj)
|
internal static void StartWorker(BaseWorker worker, Action? callback = null)
|
||||||
{
|
{
|
||||||
Log.Info("WorkerStarter Thread running.");
|
Log.Debug($"Starting {worker}");
|
||||||
if (serviceProviderObj is null)
|
if (ServiceProvider is null)
|
||||||
{
|
{
|
||||||
Log.Error("serviceProviderObj is null");
|
Log.Fatal("ServiceProvider is null");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
IServiceProvider serviceProvider = (IServiceProvider)serviceProviderObj;
|
Action callBack = AfterWork(worker, callback);
|
||||||
|
if (worker is BaseWorkerWithContext<MangaContext> mangaContextWorker)
|
||||||
while (true)
|
|
||||||
{
|
{
|
||||||
CheckRunningWorkers();
|
mangaContextWorker.SetScope(ServiceProvider.CreateScope());
|
||||||
|
RunningWorkers.TryAdd(mangaContextWorker, mangaContextWorker.DoWork(callBack));
|
||||||
foreach (BaseWorker baseWorker in AllWorkers.DueWorkers().ToArray())
|
}else if (worker is BaseWorkerWithContext<NotificationsContext> notificationContextWorker)
|
||||||
StartWorkers.Add(baseWorker);
|
{
|
||||||
|
notificationContextWorker.SetScope(ServiceProvider.CreateScope());
|
||||||
foreach (BaseWorker worker in StartWorkers.ToArray())
|
RunningWorkers.TryAdd(notificationContextWorker, notificationContextWorker.DoWork(callBack));
|
||||||
{
|
}else if (worker is BaseWorkerWithContext<LibraryContext> libraryContextWorker)
|
||||||
if(RunningWorkers.ContainsKey(worker))
|
{
|
||||||
continue;
|
libraryContextWorker.SetScope(ServiceProvider.CreateScope());
|
||||||
if (worker is BaseWorkerWithContext<MangaContext> mangaContextWorker)
|
RunningWorkers.TryAdd(libraryContextWorker, libraryContextWorker.DoWork(callBack));
|
||||||
{
|
}else
|
||||||
mangaContextWorker.SetScope(serviceProvider.CreateScope());
|
RunningWorkers.TryAdd(worker, worker.DoWork(callBack));
|
||||||
RunningWorkers.Add(mangaContextWorker, mangaContextWorker.DoWork());
|
|
||||||
}else if (worker is BaseWorkerWithContext<NotificationsContext> notificationContextWorker)
|
|
||||||
{
|
|
||||||
notificationContextWorker.SetScope(serviceProvider.CreateScope());
|
|
||||||
RunningWorkers.Add(notificationContextWorker, notificationContextWorker.DoWork());
|
|
||||||
}else if (worker is BaseWorkerWithContext<LibraryContext> libraryContextWorker)
|
|
||||||
{
|
|
||||||
libraryContextWorker.SetScope(serviceProvider.CreateScope());
|
|
||||||
RunningWorkers.Add(libraryContextWorker, libraryContextWorker.DoWork());
|
|
||||||
}else
|
|
||||||
RunningWorkers.Add(worker, worker.DoWork());
|
|
||||||
|
|
||||||
StartWorkers.Remove(worker);
|
|
||||||
}
|
|
||||||
Thread.Sleep(Settings.WorkCycleTimeoutMs);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void CheckRunningWorkers()
|
private static Action AfterWork(BaseWorker worker, Action? callback) => () =>
|
||||||
{
|
{
|
||||||
KeyValuePair<BaseWorker, Task<BaseWorker[]>>[] done = RunningWorkers.Where(kv => kv.Value.IsCompleted).ToArray();
|
RunningWorkers.Remove(worker, out _);
|
||||||
if (done.Length < 1)
|
callback?.Invoke();
|
||||||
return;
|
};
|
||||||
Log.Info($"Done: {done.Length}");
|
|
||||||
Log.Debug(string.Join("\n", done.Select(d => d.Key.ToString())));
|
|
||||||
foreach ((BaseWorker worker, Task<BaseWorker[]> task) in done)
|
|
||||||
{
|
|
||||||
RunningWorkers.Remove(worker);
|
|
||||||
foreach (BaseWorker newWorker in task.Result)
|
|
||||||
AllWorkers.Add(newWorker);
|
|
||||||
if (worker is not IPeriodic)
|
|
||||||
AllWorkers.Remove(worker);
|
|
||||||
task.Dispose();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static IEnumerable<BaseWorker> DueWorkers(this IEnumerable<BaseWorker> workers)
|
|
||||||
{
|
|
||||||
return workers.Where(worker =>
|
|
||||||
{
|
|
||||||
if (worker.State is >= WorkerExecutionState.Running and < WorkerExecutionState.Completed)
|
|
||||||
return false;
|
|
||||||
if (worker is IPeriodic periodicWorker)
|
|
||||||
return periodicWorker.IsDue;
|
|
||||||
return true;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
internal static void MarkWorkerForStart(BaseWorker worker) => StartWorkers.Add(worker);
|
|
||||||
|
|
||||||
internal static void StopWorker(BaseWorker worker)
|
internal static void StopWorker(BaseWorker worker)
|
||||||
{
|
{
|
||||||
StartWorkers.Remove(worker);
|
if(worker is IPeriodic periodicWorker)
|
||||||
|
PeriodicWorkers.Remove(periodicWorker);
|
||||||
worker.Cancel();
|
worker.Cancel();
|
||||||
RunningWorkers.Remove(worker);
|
RunningWorkers.Remove(worker, out _);
|
||||||
}
|
}
|
||||||
|
|
||||||
internal static bool AddMangaToContext((Manga, MangaConnectorId<Manga>) addManga, MangaContext context, [NotNullWhen(true)]out Manga? manga) => AddMangaToContext(addManga.Item1, addManga.Item2, context, out manga);
|
internal static bool AddMangaToContext((Manga, MangaConnectorId<Manga>) addManga, MangaContext context, [NotNullWhen(true)]out Manga? manga) => AddMangaToContext(addManga.Item1, addManga.Item2, context, out manga);
|
||||||
|
@@ -68,7 +68,7 @@ public abstract class BaseWorker : Identifiable
|
|||||||
/// <item>If <see cref="BaseWorker"/> has run, additional <see cref="BaseWorker"/>.</item>
|
/// <item>If <see cref="BaseWorker"/> has run, additional <see cref="BaseWorker"/>.</item>
|
||||||
/// </list>
|
/// </list>
|
||||||
/// </returns>
|
/// </returns>
|
||||||
public Task<BaseWorker[]> DoWork()
|
public Task<BaseWorker[]> DoWork(Action? callback = null)
|
||||||
{
|
{
|
||||||
Log.Debug($"Checking {this}");
|
Log.Debug($"Checking {this}");
|
||||||
this.State = WorkerExecutionState.Waiting;
|
this.State = WorkerExecutionState.Waiting;
|
||||||
@@ -93,6 +93,7 @@ public abstract class BaseWorker : Identifiable
|
|||||||
});
|
});
|
||||||
task.Start();
|
task.Start();
|
||||||
this.State = WorkerExecutionState.Running;
|
this.State = WorkerExecutionState.Running;
|
||||||
|
callback?.Invoke();
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user