feat: implement multi-threaded JobScheduler with worker threads and dependency management

This commit is contained in:
2026-05-02 17:53:45 +09:00
parent 0265a386ba
commit b0220a2350
4 changed files with 27 additions and 100 deletions

View File

@@ -236,15 +236,15 @@ public sealed unsafe partial class JobScheduler : IDisposable
if (exist && Volatile.Read(ref jobInfo.dependencyCount) == 0) if (exist && Volatile.Read(ref jobInfo.dependencyCount) == 0)
{ {
// Note: JobState.Created is 0, JobState.Scheduled is 1. We assume RC logic doesn't touch initial state (RC=0).
if (Interlocked.CompareExchange(ref jobInfo.state, JobUtility.JOBSTATE_SCHEDULED, JobUtility.JOBSTATE_CREATED) != JobUtility.JOBSTATE_CREATED)
{
return;
}
// Ensure the count of this job handle won't exceed the number of worker threads. // Ensure the count of this job handle won't exceed the number of worker threads.
// Worker threads will steal parallel iteration ranges from each other. // Worker threads will steal parallel iteration ranges from each other.
var handleCount = Math.Min(jobInfo.jobRanges.TotalBatches, _workerThreads.Length); var handleCount = Math.Min(jobInfo.jobRanges.TotalBatches, _workerThreads.Length);
var initialState = JobUtility.JOBSTATE_SCHEDULED | (handleCount * JobUtility.RC_ONE);
if (Interlocked.CompareExchange(ref jobInfo.state, initialState, JobUtility.JOBSTATE_CREATED) != JobUtility.JOBSTATE_CREATED)
{
return;
}
var tier = (int)jobInfo.priority; var tier = (int)jobInfo.priority;
var i = 0; var i = 0;
@@ -447,45 +447,14 @@ public sealed unsafe partial class JobScheduler : IDisposable
return; return;
} }
// NOTE: We are the last handle of the same job. Because we call this on the thread that get rc = 0, not the last one to complete.
// So we can directly set state to Completed without caring about RC. This also means we don't need to preserve upper bits.
Debug.Assert(JobUtility.ReadRefCount(ref info) == 0);
// Still worth it to use spin and Interlocked.CompareExchange here?
// Theoretically there shouldn't be much contention here because only one thread can get into this block for a specific job.
#if false #if false
// Lock-free Completion:
// 1. Transition State to Completed (preserving or setting upper bits?).
// Actually, we want to block new Readers. Setting state to Completed blocks new Readers.
// 2. Wait for existing Readers (RC == 0).
var spin = new SpinWait();
while (true)
{
var stateVal = Volatile.Read(ref info.state);
var state = JobUtility.GetState(stateVal);
if (state == JobState.Completed)
{
return;
}
// Preserve upper bits (RC) and set state to Completed. This blocks new Readers.
var newState = (stateVal & ~JobUtility.STATE_MASK) | (int)JobState.Completed;
if (Interlocked.CompareExchange(ref info.state, newState, stateVal) == stateVal)
{
// Successfully set State to Completed. New readers will see Completed and back off.
// Now we must wait for existing readers to finish (RC to become 0).
while (true)
{
var current = Volatile.Read(ref info.state);
if (((uint)current >> 16) == 0)
{
break; // RC is 0. Safe to proceed.
}
spin.SpinOnce(-1);
}
break;
}
spin.SpinOnce(-1);
}
#else
// NOTE: We are the last one to complete. Because we call this on the thread that get rc = 0, not the last one to complete. So we can directly set state to Completed without caring about RC. This also means we don't need to preserve upper bits.
var spin = new SpinWait(); var spin = new SpinWait();
while (Interlocked.CompareExchange(ref info.state, JobUtility.JOBSTATE_COMPLETED, JobUtility.JOBSTATE_RUNNING) != JobUtility.JOBSTATE_RUNNING) while (Interlocked.CompareExchange(ref info.state, JobUtility.JOBSTATE_COMPLETED, JobUtility.JOBSTATE_RUNNING) != JobUtility.JOBSTATE_RUNNING)
{ {
@@ -496,6 +465,10 @@ public sealed unsafe partial class JobScheduler : IDisposable
spin.SpinOnce(-1); spin.SpinOnce(-1);
} }
#else
// We can skip the CAS loop and directly set to Completed, because we are guaranteed to be the only thread that can transition this state from Running to Completed.
// Other threads may see an intermediate state where it's still Running, but that's fine because they will just spin until it's Completed.
Volatile.Write(ref info.state, JobUtility.JOBSTATE_COMPLETED);
#endif #endif
var it = info.GetDependentIterator(_jobEdges); var it = info.GetDependentIterator(_jobEdges);

View File

@@ -61,6 +61,7 @@ public class SPMCQueue<T>
if (Interlocked.CompareExchange(ref _head, head + 1, head) == head) if (Interlocked.CompareExchange(ref _head, head + 1, head) == head)
{ {
Volatile.Write(ref _tail, head + 1);
return true; return true;
} }

View File

@@ -144,61 +144,12 @@ internal class WorkerThread : IDisposable
continue; continue;
} }
// Try to acquire a reference count for the job. This ensures that the job won't be removed while we're processing it. var currentState = Volatile.Read(ref jobInfo.state);
// This is critical that if thread A reads the job, but suddenly os scheduler delay the thread for just a moment, and thread B completes the job and removes it from the system, when thread A resumes, it might be accessing invalid memory. if (JobUtility.GetState(currentState) == JobState.Scheduled)
// By acquiring a reference count, we ensure that even if the job is completed while we're processing it, it won't be removed until we're done.
var rcSpin = new SpinWait();
var rcAcquired = false;
int rc;
while (true)
{ {
_scheduler.GetJobInfoReference(handle, out var currentExist); // Try to flag as running, but don't loop if we fail. Someone else might have already flagged it.
if (!currentExist) var newState = (currentState & ~JobUtility.STATE_MASK) | JobUtility.JOBSTATE_RUNNING;
{ Interlocked.CompareExchange(ref jobInfo.state, newState, currentState);
break;
}
var stateVal = Volatile.Read(ref jobInfo.state);
var state = JobUtility.GetState(stateVal);
if (state == JobState.Completed || state == JobState.Invalid)
{
break;
}
var newState = stateVal + JobUtility.RC_ONE;
if (state == JobState.Scheduled)
{
newState = (newState & ~JobUtility.STATE_MASK) | JobUtility.JOBSTATE_RUNNING;
}
// Attempt to acquire a reference count by incrementing the state value. If the state has changed since we read it, we need to retry.
if (Interlocked.CompareExchange(ref jobInfo.state, newState, stateVal) == stateVal)
{
_scheduler.GetJobInfoReference(handle, out currentExist);
if (!currentExist)
{
rc = JobUtility.ReleaseRC(ref jobInfo.state);
if (rc == 0)
{
_scheduler.MarkJobComplete(handle);
}
break;
}
rcAcquired = true;
break;
}
rcSpin.SpinOnce(-1);
}
if (!rcAcquired)
{
continue;
} }
if (jobInfo.pExecutionFunc != null) if (jobInfo.pExecutionFunc != null)
@@ -214,7 +165,7 @@ internal class WorkerThread : IDisposable
jobInfo.pExecutionFunc(jobInfo.dataID, jobInfo.dataGeneration, ref jobInfo.jobRanges, in ctx); jobInfo.pExecutionFunc(jobInfo.dataID, jobInfo.dataGeneration, ref jobInfo.jobRanges, in ctx);
} }
rc = JobUtility.ReleaseRC(ref jobInfo.state); var rc = JobUtility.ReleaseRC(ref jobInfo.state);
if (rc == 0) if (rc == 0)
{ {
_scheduler.MarkJobComplete(handle); _scheduler.MarkJobComplete(handle);

View File

@@ -3,6 +3,8 @@ using Misaki.HighPerformance.LowLevel.Buffer;
using Misaki.HighPerformance.LowLevel.Collections; using Misaki.HighPerformance.LowLevel.Collections;
using Misaki.HighPerformance.LowLevel.Utilities; using Misaki.HighPerformance.LowLevel.Utilities;
using Misaki.HighPerformance.Test.Benchmark; using Misaki.HighPerformance.Test.Benchmark;
using Misaki.HighPerformance.Test.UnitTest;
using Misaki.HighPerformance.Test.UnitTest.Jobs;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;