feat(jobs): add IJobScheduler interface and job scheduling improvements\n\nIntroduce IJobScheduler interface and enhance JobScheduler, WorkerThread, JobInfo and related collections. Add ConcurrentSlotMap tests and codegen generator updates.\n\nSee changed files for details.
This commit is contained in:
@@ -1,212 +1,14 @@
|
||||
using Misaki.HighPerformance.Collections;
|
||||
using Misaki.HighPerformance.LowLevel.Buffer;
|
||||
using Misaki.HighPerformance.LowLevel.Collections;
|
||||
using Misaki.HighPerformance.LowLevel.Utilities;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Diagnostics;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Runtime.InteropServices;
|
||||
|
||||
namespace Misaki.HighPerformance.Jobs;
|
||||
|
||||
public interface IJobScheduler
|
||||
{
|
||||
/// <summary>
|
||||
/// Gets the number of worker threads managed by the job scheduler.
|
||||
/// </summary>
|
||||
int WorkerCount
|
||||
{
|
||||
get;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Schedules a single job for execution on a specified thread, with an optional dependency on another job.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJob"/> and be unmanaged.</typeparam>
|
||||
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
|
||||
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
|
||||
/// <param name="dependency">A <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.
|
||||
/// Use <see cref="JobHandle.Invalid"/> if there are no dependencies.</param>
|
||||
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
|
||||
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
|
||||
JobHandle Schedule<T>(ref readonly T job, int threadIndex, JobHandle dependency)
|
||||
where T : unmanaged, IJob;
|
||||
|
||||
/// <summary>
|
||||
/// Schedules a single job for execution on a specified thread without dependency.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJob"/> and be unmanaged.</typeparam>
|
||||
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
|
||||
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
|
||||
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
|
||||
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
|
||||
JobHandle Schedule<T>(ref readonly T job, int threadIndex)
|
||||
where T : unmanaged, IJob;
|
||||
|
||||
/// <summary>
|
||||
/// Schedules a single job for execution on any thread, with an optional dependency on another job.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJob"/> and be unmanaged.</typeparam>
|
||||
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
|
||||
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
|
||||
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
|
||||
JobHandle Schedule<T>(ref readonly T job, JobHandle dependency)
|
||||
where T : unmanaged, IJob;
|
||||
|
||||
/// <summary>
|
||||
/// Schedules a single job for execution on any thread without dependency.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJob"/> and be unmanaged.</typeparam>
|
||||
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
|
||||
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
|
||||
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
|
||||
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
|
||||
JobHandle Schedule<T>(ref readonly T job)
|
||||
where T : unmanaged, IJob;
|
||||
|
||||
/// <summary>
|
||||
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be unmanaged.</typeparam>
|
||||
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
|
||||
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
|
||||
/// <param name="batchSize">The number of iterations to include in each batch.</param>
|
||||
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
|
||||
/// <param name="dependency">A <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.
|
||||
/// Use <see cref="JobHandle.Invalid"/> if there are no dependencies.</param>
|
||||
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
|
||||
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
|
||||
JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency)
|
||||
where T : unmanaged, IJobParallelFor;
|
||||
|
||||
/// <summary>
|
||||
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads on a specified thread without dependency.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be unmanaged.</typeparam>
|
||||
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
|
||||
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
|
||||
/// <param name="batchSize">The number of iterations to include in each batch.</param>
|
||||
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
|
||||
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
|
||||
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
|
||||
JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex)
|
||||
where T : unmanaged, IJobParallelFor;
|
||||
|
||||
/// <summary>
|
||||
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads on any thread, with an optional dependency on another job..
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be unmanaged.</typeparam>
|
||||
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
|
||||
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
|
||||
/// <param name="batchSize">The number of iterations to include in each batch.</param>
|
||||
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
|
||||
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
|
||||
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
|
||||
JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, JobHandle dependency)
|
||||
where T : unmanaged, IJobParallelFor;
|
||||
|
||||
/// <summary>
|
||||
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads on any thread without dependency.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be unmanaged.</typeparam>
|
||||
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
|
||||
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
|
||||
/// <param name="batchSize">The number of iterations to include in each batch.</param>
|
||||
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
|
||||
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
|
||||
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
|
||||
JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize)
|
||||
where T : unmanaged, IJobParallelFor;
|
||||
|
||||
/// <summary>
|
||||
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be unmanaged.</typeparam>
|
||||
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
|
||||
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
|
||||
/// <param name="batchSize">The number of iterations to include in each batch.</param>
|
||||
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
|
||||
/// <param name="dependency">A <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.
|
||||
/// Use <see cref="JobHandle.Invalid"/> if there are no dependencies.</param>
|
||||
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
|
||||
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
|
||||
JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency)
|
||||
where T : unmanaged, IJobParallel;
|
||||
|
||||
/// <summary>
|
||||
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads on a specified thread without dependency.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be unmanaged.</typeparam>
|
||||
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
|
||||
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
|
||||
/// <param name="batchSize">The number of iterations to include in each batch.</param>
|
||||
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
|
||||
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
|
||||
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
|
||||
JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex)
|
||||
where T : unmanaged, IJobParallel;
|
||||
|
||||
/// <summary>
|
||||
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads on any thread, with an optional dependency on another job..
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be unmanaged.</typeparam>
|
||||
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
|
||||
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
|
||||
/// <param name="batchSize">The number of iterations to include in each batch.</param>
|
||||
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
|
||||
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
|
||||
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
|
||||
JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, JobHandle dependency)
|
||||
where T : unmanaged, IJobParallel;
|
||||
|
||||
/// <summary>
|
||||
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads on any thread without dependency.
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be unmanaged.</typeparam>
|
||||
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
|
||||
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
|
||||
/// <param name="batchSize">The number of iterations to include in each batch.</param>
|
||||
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
|
||||
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
|
||||
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
|
||||
JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize)
|
||||
where T : unmanaged, IJobParallel;
|
||||
|
||||
/// <summary>
|
||||
/// Combines multiple job dependencies into a single <see cref="JobHandle"/>.
|
||||
/// </summary>
|
||||
/// <param name="dependencies">A collection of <see cref="JobHandle"/> instances representing the dependencies to combine.</param>
|
||||
/// <returns>A <see cref="JobHandle"/> that represents the combined dependencies. The returned handle can be used to ensure
|
||||
/// that all specified dependencies are completed before proceeding.</returns>
|
||||
JobHandle CombineDependencies(params ReadOnlySpan<JobHandle> dependencies);
|
||||
|
||||
/// <summary>
|
||||
/// Retrieves the current status of a job identified by the specified handle.
|
||||
/// </summary>
|
||||
/// <param name="handle">The handle representing the job whose status is to be retrieved. The handle must be valid.</param>
|
||||
/// <returns>The current status of the job as a <see cref="JobState"/> value.
|
||||
/// Returns <see cref="JobState.Invalid"/> if the handle is invalid or the job does not exist.</returns>
|
||||
JobState GetJobStatus(JobHandle handle);
|
||||
|
||||
/// <summary>
|
||||
/// Blocks the calling thread until the specified job is completed.
|
||||
/// </summary>
|
||||
/// <param name="handle">The handle of the job to wait for.</param>
|
||||
void Wait(JobHandle handle);
|
||||
|
||||
/// <summary>
|
||||
/// Blocks the calling thread until all specified job handles have completed.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The collection handles will be reordered in-place to move completed handles to the front.
|
||||
/// </remarks>
|
||||
/// <param name="handles">A collection of job handles to wait for.</param>
|
||||
void WaitAll(params Span<JobHandle> handles);
|
||||
|
||||
/// <summary>
|
||||
/// Waits until any of the specified job handles has completed and returns the first completed handle.
|
||||
/// </summary>
|
||||
/// <param name="handles">A read-only span containing the job handles to monitor for completion.</param>
|
||||
/// <returns>The first job handle from the provided collection that has completed.</returns>
|
||||
JobHandle WaitAny(params ReadOnlySpan<JobHandle> handles);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Provides a mechanism for scheduling and executing jobs across multiple worker threads.
|
||||
/// </summary>
|
||||
@@ -215,9 +17,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
// Don't sleep indefinitely because that causes our 1ms job to become 15ms.
|
||||
private const int _SLEEP_THRESHOLD = -1;
|
||||
|
||||
// Lock-Free constants: State mask (low 16 bits) and RC unit (1 << 16)
|
||||
private const int _STATE_MASK = 0xFFFF;
|
||||
private const int _RC_ONE = 0x10000;
|
||||
private FreeList _freeList;
|
||||
|
||||
private readonly ConcurrentSlotMap<JobInfo> _jobInfoPool;
|
||||
private readonly ConcurrentQueue<JobHandle> _jobQueue;
|
||||
@@ -236,11 +36,14 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
/// Initializes a new instance of the <see cref="JobScheduler"/> class with the specified number of worker threads.
|
||||
/// </summary>
|
||||
/// <param name="threadCount">The number of worker threads to create. If less than 1, at least one thread will be created.</param>
|
||||
public JobScheduler(int threadCount)
|
||||
/// <param name="priority">The priority of the worker threads.</param>
|
||||
public JobScheduler(int threadCount, ThreadPriority priority = ThreadPriority.Normal)
|
||||
{
|
||||
var workerCount = Math.Max(1, threadCount);
|
||||
|
||||
_jobInfoPool = new ConcurrentSlotMap<JobInfo>();
|
||||
_freeList = new FreeList(MemoryUtility.AlignOf<IntPtr>(), maxConcurrencyLevel: threadCount);
|
||||
|
||||
_jobInfoPool = new ConcurrentSlotMap<JobInfo>(128);
|
||||
_jobQueue = new ConcurrentQueue<JobHandle>();
|
||||
|
||||
_workSignal = new SemaphoreSlim(0);
|
||||
@@ -250,7 +53,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
|
||||
for (var i = 0; i < workerCount; i++)
|
||||
{
|
||||
_workerThreads[i] = new WorkerThread(i, this);
|
||||
_workerThreads[i] = new WorkerThread(i, this, priority);
|
||||
}
|
||||
|
||||
foreach (var worker in _workerThreads)
|
||||
@@ -271,7 +74,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
if (exist && Volatile.Read(ref jobInfo.dependencyCount) == 0)
|
||||
{
|
||||
// Note: JobState.Created is 0, JobState.Scheduled is 1. We assume RC logic doesn't touch initial state (RC=0).
|
||||
if (Interlocked.CompareExchange(ref jobInfo.state, JobState.Scheduled, JobState.Created) != JobState.Created)
|
||||
if (Interlocked.CompareExchange(ref jobInfo.state, JobUtility.JOBSTATE_SCHEDULED, JobUtility.JOBSTATE_CREATED) != JobUtility.JOBSTATE_CREATED)
|
||||
{
|
||||
return;
|
||||
}
|
||||
@@ -341,8 +144,8 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
|
||||
while (true)
|
||||
{
|
||||
var stateVal = Volatile.Read(ref Unsafe.As<JobState, int>(ref depJobInfo.state));
|
||||
var state = (JobState)(stateVal & _STATE_MASK);
|
||||
var stateVal = Volatile.Read(ref depJobInfo.state);
|
||||
var state = JobUtility.GetState(stateVal);
|
||||
|
||||
if (state == JobState.Completed)
|
||||
{
|
||||
@@ -350,7 +153,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
}
|
||||
|
||||
// Attempt to increment RC (Reader Count)
|
||||
if (Interlocked.CompareExchange(ref Unsafe.As<JobState, int>(ref depJobInfo.state), stateVal + _RC_ONE, stateVal) == stateVal)
|
||||
if (Interlocked.CompareExchange(ref depJobInfo.state, stateVal + JobUtility.RC_ONE, stateVal) == stateVal)
|
||||
{
|
||||
// RC acquired. We are safe from "Remove" and state change.
|
||||
var count = Interlocked.Increment(ref depJobInfo.dependentCount);
|
||||
@@ -359,22 +162,21 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
// Safely write to the fixed buffer
|
||||
depJobInfo.dependentsID[count - 1] = id;
|
||||
depJobInfo.dependentsGeneration[count - 1] = generation;
|
||||
registered = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!depJobInfo.additionalDependents.IsCreated)
|
||||
{
|
||||
depJobInfo.additionalDependents = new UnsafeList<JobHandle>(4, AllocationHandle.Persistent);
|
||||
}
|
||||
|
||||
depJobInfo.additionalDependents.Add(handle);
|
||||
}
|
||||
|
||||
registered = true;
|
||||
|
||||
// Release RC
|
||||
Interlocked.Add(ref Unsafe.As<JobState, int>(ref depJobInfo.state), -_RC_ONE);
|
||||
|
||||
if (!registered)
|
||||
{
|
||||
// Failed to register because MAX_DEPENDENTS reached.
|
||||
// Backtrack the counter increment.
|
||||
Interlocked.Decrement(ref depJobInfo.dependentCount);
|
||||
|
||||
// Cleanup and fail
|
||||
NativeMemory.Free(jobInfo.pJobData);
|
||||
return JobHandle.Invalid;
|
||||
}
|
||||
Interlocked.Add(ref depJobInfo.state, -JobUtility.RC_ONE);
|
||||
|
||||
break;
|
||||
}
|
||||
@@ -449,10 +251,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
|
||||
internal void MarkJobComplete(JobHandle handle)
|
||||
{
|
||||
if (!handle.IsValid)
|
||||
{
|
||||
return;
|
||||
}
|
||||
Debug.Assert(handle.IsValid);
|
||||
|
||||
ref var info = ref _jobInfoPool.GetElementReferenceAt(handle.ID, handle.Generation, out var exist);
|
||||
if (!exist)
|
||||
@@ -467,31 +266,23 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
var spin = new SpinWait();
|
||||
while (true)
|
||||
{
|
||||
var stateVal = Volatile.Read(ref Unsafe.As<JobState, int>(ref info.state));
|
||||
var state = (JobState)(stateVal & _STATE_MASK);
|
||||
var stateVal = Volatile.Read(ref info.state);
|
||||
var state = JobUtility.GetState(stateVal);
|
||||
|
||||
if (state == JobState.Completed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
//if (state != JobState.Running)
|
||||
//{
|
||||
// // If in valid state (e.g. Scheduled?), we still assume we can complete it.
|
||||
// // Usually it should be Running.
|
||||
//}
|
||||
|
||||
// Construct new value: State=Completed, preserve RC (temporarily) or strictly replace only low bits?
|
||||
// We set low bits to Completed. High bits (RC) remain.
|
||||
var newState = (stateVal & ~_STATE_MASK) | (int)JobState.Completed;
|
||||
|
||||
if (Interlocked.CompareExchange(ref Unsafe.As<JobState, int>(ref info.state), newState, stateVal) == stateVal)
|
||||
// Preserve upper bits (RC) and set state to Completed. This blocks new Readers.
|
||||
var newState = (stateVal & ~JobUtility.STATE_MASK) | (int)JobState.Completed;
|
||||
if (Interlocked.CompareExchange(ref info.state, newState, stateVal) == stateVal)
|
||||
{
|
||||
// Successfully set State to Completed. New readers will see Completed and back off.
|
||||
// Now we must wait for existing readers to finish (RC to become 0).
|
||||
while (true)
|
||||
{
|
||||
var current = Volatile.Read(ref Unsafe.As<JobState, int>(ref info.state));
|
||||
var current = Volatile.Read(ref info.state);
|
||||
if (((uint)current >> 16) == 0)
|
||||
{
|
||||
break; // RC is 0. Safe to proceed.
|
||||
@@ -505,20 +296,10 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
spin.SpinOnce(-1);
|
||||
}
|
||||
|
||||
// We now have exclusive access to dependentsID (no new readers, old readers finished).
|
||||
var dependentCount = info.dependentCount;
|
||||
var dependentsToNotify = stackalloc JobHandle[dependentCount];
|
||||
for (var i = 0; i < dependentCount; i++)
|
||||
var it = info.GetDependentIterator();
|
||||
while (it.MoveNext())
|
||||
{
|
||||
dependentsToNotify[i] = new JobHandle(info.dependentsID[i], info.dependentsGeneration[i]);
|
||||
}
|
||||
|
||||
NativeMemory.Free(info.pJobData);
|
||||
_jobInfoPool.Remove(handle.ID, handle.Generation);
|
||||
|
||||
for (var i = 0; i < dependentCount; i++)
|
||||
{
|
||||
var depHandle = dependentsToNotify[i];
|
||||
var depHandle = it.Current;
|
||||
|
||||
ref var depJobInfo = ref _jobInfoPool.GetElementReferenceAt(depHandle.ID, depHandle.Generation, out var depExist);
|
||||
if (depExist && Interlocked.Decrement(ref depJobInfo.dependencyCount) == 0)
|
||||
@@ -526,17 +307,20 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
EnqueueJobIfReady(depHandle);
|
||||
}
|
||||
}
|
||||
|
||||
_freeList.Free(info.pJobData);
|
||||
_jobInfoPool.Remove(handle.ID, handle.Generation);
|
||||
}
|
||||
|
||||
public JobHandle Schedule<T>(ref readonly T job, int threadIndex, JobHandle dependency)
|
||||
where T : unmanaged, IJob
|
||||
{
|
||||
var pJobData = NativeMemory.Alloc((nuint)sizeof(T));
|
||||
var pJobData = _freeList.Allocate(MemoryUtility.SizeOf<T>(), MemoryUtility.AlignOf<T>());
|
||||
if (pJobData == null)
|
||||
{
|
||||
return JobHandle.Invalid;
|
||||
}
|
||||
|
||||
|
||||
*(T*)pJobData = job;
|
||||
|
||||
var jobInfo = new JobInfo
|
||||
@@ -568,7 +352,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency)
|
||||
where T : unmanaged, IJobParallelFor
|
||||
{
|
||||
var pJobData = NativeMemory.Alloc((nuint)sizeof(T));
|
||||
var pJobData = _freeList.Allocate(MemoryUtility.SizeOf<T>(), MemoryUtility.AlignOf<T>());
|
||||
if (pJobData == null)
|
||||
{
|
||||
return JobHandle.Invalid;
|
||||
@@ -587,7 +371,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
remainingBatches = totalBatches,
|
||||
threadIndex = threadIndex,
|
||||
|
||||
jobRanges = new()
|
||||
jobRanges = new JobRanges()
|
||||
{
|
||||
currentIndex = 0,
|
||||
batchSize = optimalBatchSize,
|
||||
@@ -613,7 +397,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency)
|
||||
where T : unmanaged, IJobParallel
|
||||
{
|
||||
var pJobData = NativeMemory.Alloc((nuint)sizeof(T));
|
||||
var pJobData = _freeList.Allocate(MemoryUtility.SizeOf<T>(), MemoryUtility.AlignOf<T>());
|
||||
if (pJobData == null)
|
||||
{
|
||||
return JobHandle.Invalid;
|
||||
@@ -632,7 +416,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
remainingBatches = totalBatches,
|
||||
threadIndex = threadIndex,
|
||||
|
||||
jobRanges = new()
|
||||
jobRanges = new JobRanges()
|
||||
{
|
||||
currentIndex = 0,
|
||||
batchSize = optimalBatchSize,
|
||||
@@ -685,7 +469,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
}
|
||||
|
||||
// Mask out the Reader Count (upper 16 bits) to return the actual State
|
||||
return (JobState)(Volatile.Read(ref Unsafe.As<JobState, int>(ref jobInfo.state)) & _STATE_MASK);
|
||||
return JobUtility.GetState(Volatile.Read(ref jobInfo.state));
|
||||
}
|
||||
|
||||
public void Wait(JobHandle handle)
|
||||
@@ -701,14 +485,14 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
|
||||
var spin = new SpinWait();
|
||||
while (true)
|
||||
{
|
||||
ref readonly var jobInfo = ref _jobInfoPool.GetElementReferenceAt(handle.ID, handle.Generation, out var exist);
|
||||
ref var jobInfo = ref _jobInfoPool.GetElementReferenceAt(handle.ID, handle.Generation, out var exist);
|
||||
if (!exist)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Mask out RC
|
||||
if ((jobInfo.state & (JobState)_STATE_MASK) == JobState.Completed)
|
||||
if (JobUtility.ReadState(ref jobInfo) == JobState.Completed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user