fixe the locking issue in job scheduler

This commit is contained in:
2026-05-02 21:58:12 +09:00
parent 7bf63c0521
commit 54d0941e62
2 changed files with 101 additions and 48 deletions

View File

@@ -6,8 +6,6 @@ internal class WorkerThread : IDisposable
{
[ThreadStatic]
private static int t_threadIndex;
[ThreadStatic]
private static bool t_isWorkerThread;
private readonly SPMCQueue<JobHandle>[] _localQueue;
private readonly Thread _thread;
@@ -19,7 +17,6 @@ internal class WorkerThread : IDisposable
private uint _priorityTick;
public static int ThreadIndex => t_threadIndex;
public static bool IsWorkerThread => t_isWorkerThread;
public ReadOnlySpan<SPMCQueue<JobHandle>> LocalQueues => _localQueue;
@@ -103,7 +100,6 @@ internal class WorkerThread : IDisposable
Debug.Assert(index != null);
t_threadIndex = (int)index;
t_isWorkerThread = true;
while (!_scheduler.IsCancellationRequested)
{
@@ -148,12 +144,61 @@ internal class WorkerThread : IDisposable
continue;
}
var currentState = Volatile.Read(ref jobInfo.state);
if (JobUtility.GetState(currentState) == JobState.Scheduled)
// Try to acquire a reference count for the job. This ensures that the job won't be removed while we're processing it.
// This is critical that if thread A reads the job, but suddenly os scheduler delay the thread for just a moment, and thread B completes the job and removes it from the system, when thread A resumes, it might be accessing invalid memory.
// By acquiring a reference count, we ensure that even if the job is completed while we're processing it, it won't be removed until we're done.
var rcSpin = new SpinWait();
var rcAcquired = false;
int rc;
while (true)
{
// Try to flag as running, but don't loop if we fail. Someone else might have already flagged it.
var newState = (currentState & ~JobUtility.STATE_MASK) | JobUtility.JOBSTATE_RUNNING;
Interlocked.CompareExchange(ref jobInfo.state, newState, currentState);
_scheduler.GetJobInfoReference(handle, out var currentExist);
if (!currentExist)
{
break;
}
var stateVal = Volatile.Read(ref jobInfo.state);
var state = JobUtility.GetState(stateVal);
if (state == JobState.Completed || state == JobState.Invalid)
{
break;
}
var newState = stateVal + JobUtility.RC_ONE;
if (state == JobState.Scheduled)
{
newState = (newState & ~JobUtility.STATE_MASK) | JobUtility.JOBSTATE_RUNNING;
}
// Attempt to acquire a reference count by incrementing the state value. If the state has changed since we read it, we need to retry.
if (Interlocked.CompareExchange(ref jobInfo.state, newState, stateVal) == stateVal)
{
_scheduler.GetJobInfoReference(handle, out currentExist);
if (!currentExist)
{
rc = JobUtility.ReleaseRC(ref jobInfo.state);
if (rc == 0)
{
_scheduler.MarkJobComplete(handle);
}
break;
}
rcAcquired = true;
break;
}
rcSpin.SpinOnce(-1);
}
if (!rcAcquired)
{
continue;
}
if (jobInfo.pExecutionFunc != null)
@@ -169,7 +214,7 @@ internal class WorkerThread : IDisposable
jobInfo.pExecutionFunc(jobInfo.dataID, jobInfo.dataGeneration, ref jobInfo.jobRanges, in ctx);
}
var rc = JobUtility.ReleaseRC(ref jobInfo.state);
rc = JobUtility.ReleaseRC(ref jobInfo.state);
if (rc == 0)
{
_scheduler.MarkJobComplete(handle);