From 8b7f773d2951ab8c7764f2cb1ed381a0005a7ce9 Mon Sep 17 00:00:00 2001 From: Misaki Date: Sun, 12 Apr 2026 22:09:28 +0900 Subject: [PATCH] feat(JobScheduler): improve dependency handling logic Updated `JobScheduler` to enhance dependency tracking by counting valid dependencies upfront and dynamically adjusting counts using `Interlocked` operations. Improved job enqueueing logic to ensure jobs are only enqueued when all dependencies are met. Replaced `Interlocked.Increment` with `Interlocked.Add` for batch updates to `_totalJobCount`, improving performance. Adjusted `VirtualStack` cleanup to use the correct size variable for memory deallocation. Simplified `JobDispatchingJob` API by removing `ctx.ThreadIndex` parameter. Updated `TestJobSystem` to pass job handles as dependencies for proper execution order. Incremented assembly version to 1.5.9 to reflect these changes. --- Misaki.HighPerformance.Jobs/JobScheduler.cs | 45 ++++++++++--------- .../Misaki.HighPerformance.Jobs.csproj | 2 +- Misaki.HighPerformance.Jobs/WorkerThread.cs | 14 ++---- .../Buffer/VirtualStack.cs | 3 +- .../Jobs/JobDispatchingJob.cs | 2 +- .../UnitTest/Jobs/TestJobSystem.cs | 2 +- 6 files changed, 33 insertions(+), 35 deletions(-) diff --git a/Misaki.HighPerformance.Jobs/JobScheduler.cs b/Misaki.HighPerformance.Jobs/JobScheduler.cs index 0520cf7..9646751 100644 --- a/Misaki.HighPerformance.Jobs/JobScheduler.cs +++ b/Misaki.HighPerformance.Jobs/JobScheduler.cs @@ -228,8 +228,6 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable private bool _disposed = false; - internal volatile int _totalJobCount; - internal bool IsCancellationRequested => _cts.IsCancellationRequested; public int WorkerCount => _workerThreads.Length; @@ -242,11 +240,11 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable { var workerCount = Math.Max(1, threadCount); - _jobInfoPool = new(); - _jobQueue = new(); + _jobInfoPool = new ConcurrentSlotMap(); + _jobQueue = new ConcurrentQueue(); - _workSignal = new(0); - _cts = new(); + _workSignal = new SemaphoreSlim(0); + _cts = new CancellationTokenSource(); _workerThreads = new WorkerThread[workerCount]; @@ -297,13 +295,24 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable jobQueue.Enqueue(handle); } - Interlocked.Increment(ref _totalJobCount); _workSignal.Release(handleCount); } } private JobHandle CreateJobHandle(ref JobInfo jobInfo, params ReadOnlySpan dependencies) { + var validDepCount = 0; + for (var i = 0; i < dependencies.Length; i++) + { + if (dependencies[i].IsValid) + { + validDepCount++; + } + } + + // Advance count to account for all dependencies upfront + 1 guard lock + jobInfo.dependencyCount = validDepCount + 1; + var id = _jobInfoPool.Add(jobInfo, out var generation); ref var infoInPool = ref _jobInfoPool.GetElementReferenceAt(id, generation, out _); @@ -321,13 +330,13 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable if (!exist) { // Dependency does not exist (likely completed already) + Interlocked.Decrement(ref infoInPool.dependencyCount); continue; } // Lock-free registration: Try to acquire "Reader Lock" by incrementing RC in high bits. // If state is already Completed, we skip (dependency met). var registered = false; - var completed = false; var spin = new SpinWait(); while (true) @@ -337,7 +346,6 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable if (state == JobState.Completed) { - completed = true; break; } @@ -374,20 +382,18 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable spin.SpinOnce(-1); } - if (!registered && !completed) + // If we didn't successfully register (completed fast), drop it from the advanced counter + if (!registered) { - // Should not happen if logic is correct, unless loop logic changed - Interlocked.Increment(ref infoInPool.dependencyCount); + Interlocked.Decrement(ref infoInPool.dependencyCount); } - else if (registered) - { - // Successfully added dependency - Interlocked.Increment(ref infoInPool.dependencyCount); - } - // else: completed is true, registered is false -> Dependency is already done, so we don't increment our dependencyCount. } - EnqueueJobIfReady(handle); + // Lower the initial 1 guard lock; Enqueue if met + if (Interlocked.Decrement(ref infoInPool.dependencyCount) == 0) + { + EnqueueJobIfReady(handle); + } return handle; } @@ -509,7 +515,6 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable NativeMemory.Free(info.pJobData); _jobInfoPool.Remove(handle.ID, handle.Generation); - Interlocked.Decrement(ref _totalJobCount); for (var i = 0; i < dependentCount; i++) { diff --git a/Misaki.HighPerformance.Jobs/Misaki.HighPerformance.Jobs.csproj b/Misaki.HighPerformance.Jobs/Misaki.HighPerformance.Jobs.csproj index f1db093..1529692 100644 --- a/Misaki.HighPerformance.Jobs/Misaki.HighPerformance.Jobs.csproj +++ b/Misaki.HighPerformance.Jobs/Misaki.HighPerformance.Jobs.csproj @@ -6,7 +6,7 @@ enable True True - 1.5.8 + 1.5.9 $(AssemblyVersion) Misaki https://git.personalnas.com/Misaki/Misaki.HighPerformance.git diff --git a/Misaki.HighPerformance.Jobs/WorkerThread.cs b/Misaki.HighPerformance.Jobs/WorkerThread.cs index 36bda34..38b7fcd 100644 --- a/Misaki.HighPerformance.Jobs/WorkerThread.cs +++ b/Misaki.HighPerformance.Jobs/WorkerThread.cs @@ -33,12 +33,6 @@ internal class WorkerThread : IDisposable private bool TryFindJob(out JobHandle handle) { - if (Interlocked.CompareExchange(ref _scheduler._totalJobCount, 0, 0) == 0) - { - handle = JobHandle.Invalid; - return false; - } - if (_localQueue.TryDequeue(out handle)) { return true; @@ -112,14 +106,12 @@ internal class WorkerThread : IDisposable if (jobInfo.pExecutionFunc != null) { var ctx = new JobExecutionContext(_index, _scheduler); - if (!jobInfo.pExecutionFunc(jobInfo.pJobData, ref jobInfo.jobRanges, ref jobInfo.remainingBatches, in ctx)) + if (jobInfo.pExecutionFunc(jobInfo.pJobData, ref jobInfo.jobRanges, ref jobInfo.remainingBatches, in ctx)) { - // If the job returns false, it means it we are not the last worker to process this job, so we should not mark it as complete yet. - continue; + // If the job returns true, it means we are the last worker to process this job. + _scheduler.MarkJobComplete(handle); } } - - _scheduler.MarkJobComplete(handle); } } } diff --git a/Misaki.HighPerformance.LowLevel/Buffer/VirtualStack.cs b/Misaki.HighPerformance.LowLevel/Buffer/VirtualStack.cs index fbcbbfb..e854ff2 100644 --- a/Misaki.HighPerformance.LowLevel/Buffer/VirtualStack.cs +++ b/Misaki.HighPerformance.LowLevel/Buffer/VirtualStack.cs @@ -216,12 +216,13 @@ public unsafe struct VirtualStack : IMemoryAllocator