feat(JobScheduler): improve dependency handling logic
Updated `JobScheduler` to enhance dependency tracking by counting valid dependencies upfront and dynamically adjusting counts using `Interlocked` operations. Improved job enqueueing logic to ensure jobs are only enqueued when all dependencies are met. Replaced `Interlocked.Increment` with `Interlocked.Add` for batch updates to `_totalJobCount`, improving performance. Adjusted `VirtualStack` cleanup to use the correct size variable for memory deallocation. Simplified `JobDispatchingJob` API by removing `ctx.ThreadIndex` parameter. Updated `TestJobSystem` to pass job handles as dependencies for proper execution order. Incremented assembly version to 1.5.9 to reflect these changes.
This commit is contained in:
@@ -228,8 +228,6 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
|||||||
|
|
||||||
private bool _disposed = false;
|
private bool _disposed = false;
|
||||||
|
|
||||||
internal volatile int _totalJobCount;
|
|
||||||
|
|
||||||
internal bool IsCancellationRequested => _cts.IsCancellationRequested;
|
internal bool IsCancellationRequested => _cts.IsCancellationRequested;
|
||||||
|
|
||||||
public int WorkerCount => _workerThreads.Length;
|
public int WorkerCount => _workerThreads.Length;
|
||||||
@@ -242,11 +240,11 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
|||||||
{
|
{
|
||||||
var workerCount = Math.Max(1, threadCount);
|
var workerCount = Math.Max(1, threadCount);
|
||||||
|
|
||||||
_jobInfoPool = new();
|
_jobInfoPool = new ConcurrentSlotMap<JobInfo>();
|
||||||
_jobQueue = new();
|
_jobQueue = new ConcurrentQueue<JobHandle>();
|
||||||
|
|
||||||
_workSignal = new(0);
|
_workSignal = new SemaphoreSlim(0);
|
||||||
_cts = new();
|
_cts = new CancellationTokenSource();
|
||||||
|
|
||||||
_workerThreads = new WorkerThread[workerCount];
|
_workerThreads = new WorkerThread[workerCount];
|
||||||
|
|
||||||
@@ -297,13 +295,24 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
|||||||
jobQueue.Enqueue(handle);
|
jobQueue.Enqueue(handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
Interlocked.Increment(ref _totalJobCount);
|
|
||||||
_workSignal.Release(handleCount);
|
_workSignal.Release(handleCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private JobHandle CreateJobHandle(ref JobInfo jobInfo, params ReadOnlySpan<JobHandle> dependencies)
|
private JobHandle CreateJobHandle(ref JobInfo jobInfo, params ReadOnlySpan<JobHandle> dependencies)
|
||||||
{
|
{
|
||||||
|
var validDepCount = 0;
|
||||||
|
for (var i = 0; i < dependencies.Length; i++)
|
||||||
|
{
|
||||||
|
if (dependencies[i].IsValid)
|
||||||
|
{
|
||||||
|
validDepCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Advance count to account for all dependencies upfront + 1 guard lock
|
||||||
|
jobInfo.dependencyCount = validDepCount + 1;
|
||||||
|
|
||||||
var id = _jobInfoPool.Add(jobInfo, out var generation);
|
var id = _jobInfoPool.Add(jobInfo, out var generation);
|
||||||
ref var infoInPool = ref _jobInfoPool.GetElementReferenceAt(id, generation, out _);
|
ref var infoInPool = ref _jobInfoPool.GetElementReferenceAt(id, generation, out _);
|
||||||
|
|
||||||
@@ -321,13 +330,13 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
|||||||
if (!exist)
|
if (!exist)
|
||||||
{
|
{
|
||||||
// Dependency does not exist (likely completed already)
|
// Dependency does not exist (likely completed already)
|
||||||
|
Interlocked.Decrement(ref infoInPool.dependencyCount);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock-free registration: Try to acquire "Reader Lock" by incrementing RC in high bits.
|
// Lock-free registration: Try to acquire "Reader Lock" by incrementing RC in high bits.
|
||||||
// If state is already Completed, we skip (dependency met).
|
// If state is already Completed, we skip (dependency met).
|
||||||
var registered = false;
|
var registered = false;
|
||||||
var completed = false;
|
|
||||||
var spin = new SpinWait();
|
var spin = new SpinWait();
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
@@ -337,7 +346,6 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
|||||||
|
|
||||||
if (state == JobState.Completed)
|
if (state == JobState.Completed)
|
||||||
{
|
{
|
||||||
completed = true;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -374,20 +382,18 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
|||||||
spin.SpinOnce(-1);
|
spin.SpinOnce(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!registered && !completed)
|
// If we didn't successfully register (completed fast), drop it from the advanced counter
|
||||||
|
if (!registered)
|
||||||
{
|
{
|
||||||
// Should not happen if logic is correct, unless loop logic changed
|
Interlocked.Decrement(ref infoInPool.dependencyCount);
|
||||||
Interlocked.Increment(ref infoInPool.dependencyCount);
|
|
||||||
}
|
}
|
||||||
else if (registered)
|
|
||||||
{
|
|
||||||
// Successfully added dependency
|
|
||||||
Interlocked.Increment(ref infoInPool.dependencyCount);
|
|
||||||
}
|
|
||||||
// else: completed is true, registered is false -> Dependency is already done, so we don't increment our dependencyCount.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
EnqueueJobIfReady(handle);
|
// Lower the initial 1 guard lock; Enqueue if met
|
||||||
|
if (Interlocked.Decrement(ref infoInPool.dependencyCount) == 0)
|
||||||
|
{
|
||||||
|
EnqueueJobIfReady(handle);
|
||||||
|
}
|
||||||
|
|
||||||
return handle;
|
return handle;
|
||||||
}
|
}
|
||||||
@@ -509,7 +515,6 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
|||||||
|
|
||||||
NativeMemory.Free(info.pJobData);
|
NativeMemory.Free(info.pJobData);
|
||||||
_jobInfoPool.Remove(handle.ID, handle.Generation);
|
_jobInfoPool.Remove(handle.ID, handle.Generation);
|
||||||
Interlocked.Decrement(ref _totalJobCount);
|
|
||||||
|
|
||||||
for (var i = 0; i < dependentCount; i++)
|
for (var i = 0; i < dependentCount; i++)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
<Nullable>enable</Nullable>
|
<Nullable>enable</Nullable>
|
||||||
<AllowUnsafeBlocks>True</AllowUnsafeBlocks>
|
<AllowUnsafeBlocks>True</AllowUnsafeBlocks>
|
||||||
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
|
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
|
||||||
<AssemblyVersion>1.5.8</AssemblyVersion>
|
<AssemblyVersion>1.5.9</AssemblyVersion>
|
||||||
<Version>$(AssemblyVersion)</Version>
|
<Version>$(AssemblyVersion)</Version>
|
||||||
<Authors>Misaki</Authors>
|
<Authors>Misaki</Authors>
|
||||||
<PackageProjectUrl>https://git.personalnas.com/Misaki/Misaki.HighPerformance.git</PackageProjectUrl>
|
<PackageProjectUrl>https://git.personalnas.com/Misaki/Misaki.HighPerformance.git</PackageProjectUrl>
|
||||||
|
|||||||
@@ -33,12 +33,6 @@ internal class WorkerThread : IDisposable
|
|||||||
|
|
||||||
private bool TryFindJob(out JobHandle handle)
|
private bool TryFindJob(out JobHandle handle)
|
||||||
{
|
{
|
||||||
if (Interlocked.CompareExchange(ref _scheduler._totalJobCount, 0, 0) == 0)
|
|
||||||
{
|
|
||||||
handle = JobHandle.Invalid;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_localQueue.TryDequeue(out handle))
|
if (_localQueue.TryDequeue(out handle))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
@@ -112,14 +106,12 @@ internal class WorkerThread : IDisposable
|
|||||||
if (jobInfo.pExecutionFunc != null)
|
if (jobInfo.pExecutionFunc != null)
|
||||||
{
|
{
|
||||||
var ctx = new JobExecutionContext(_index, _scheduler);
|
var ctx = new JobExecutionContext(_index, _scheduler);
|
||||||
if (!jobInfo.pExecutionFunc(jobInfo.pJobData, ref jobInfo.jobRanges, ref jobInfo.remainingBatches, in ctx))
|
if (jobInfo.pExecutionFunc(jobInfo.pJobData, ref jobInfo.jobRanges, ref jobInfo.remainingBatches, in ctx))
|
||||||
{
|
{
|
||||||
// If the job returns false, it means it we are not the last worker to process this job, so we should not mark it as complete yet.
|
// If the job returns true, it means we are the last worker to process this job.
|
||||||
continue;
|
_scheduler.MarkJobComplete(handle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_scheduler.MarkJobComplete(handle);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -216,12 +216,13 @@ public unsafe struct VirtualStack : IMemoryAllocator<VirtualStack, VirtualStack.
|
|||||||
}
|
}
|
||||||
|
|
||||||
var ptr = _baseAddress;
|
var ptr = _baseAddress;
|
||||||
|
var size = _reserveCapacity;
|
||||||
|
|
||||||
_baseAddress = null;
|
_baseAddress = null;
|
||||||
_allocatedOffset = 0;
|
_allocatedOffset = 0;
|
||||||
_committedSize = 0;
|
_committedSize = 0;
|
||||||
_reserveCapacity = 0;
|
_reserveCapacity = 0;
|
||||||
|
|
||||||
Munmap(ptr, _reserveCapacity);
|
Munmap(ptr, size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ internal struct JobDispatchingJob : IJobParallelFor
|
|||||||
data = data[loopIndex]
|
data = data[loopIndex]
|
||||||
};
|
};
|
||||||
|
|
||||||
var handle = ctx.JobScheduler.ScheduleParallelFor(in innerJob, data[loopIndex].Length, 64, ctx.ThreadIndex);
|
var handle = ctx.JobScheduler.ScheduleParallelFor(in innerJob, data[loopIndex].Length, 64);
|
||||||
handles.AddNoResize(handle);
|
handles.AddNoResize(handle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -199,7 +199,7 @@ public unsafe class TestJobSystem
|
|||||||
};
|
};
|
||||||
|
|
||||||
var handle1 = s_jobScheduler.ScheduleParallel(ref addJob, arraySize, 64);
|
var handle1 = s_jobScheduler.ScheduleParallel(ref addJob, arraySize, 64);
|
||||||
var handle2 = s_jobScheduler.ScheduleParallel(ref multiplyJob, arraySize, 64);
|
var handle2 = s_jobScheduler.ScheduleParallel(ref multiplyJob, arraySize, 64, handle1);
|
||||||
var handle3 = s_jobScheduler.Schedule(ref sumJob, handle2);
|
var handle3 = s_jobScheduler.Schedule(ref sumJob, handle2);
|
||||||
|
|
||||||
s_jobScheduler.Wait(handle3);
|
s_jobScheduler.Wait(handle3);
|
||||||
|
|||||||
Reference in New Issue
Block a user