using Misaki.HighPerformance.Collections; using Misaki.HighPerformance.LowLevel.Buffer; using Misaki.HighPerformance.LowLevel.Utilities; using System.Collections.Concurrent; using System.Runtime.CompilerServices; namespace Misaki.HighPerformance.Jobs; public interface IJobScheduler { /// /// Gets the number of worker threads managed by the job scheduler. /// int WorkerCount { get; } /// /// Schedules a single job for execution on a specified thread, with an optional dependency on another job. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A representing the dependencies that must be completed before this job can begin. /// Use if there are no dependencies. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. JobHandle Schedule(ref readonly T job, int threadIndex, JobHandle dependency) where T : unmanaged, IJob; /// /// Schedules a single job for execution on a specified thread without dependency. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. JobHandle Schedule(ref readonly T job, int threadIndex) where T : unmanaged, IJob; /// /// Schedules a single job for execution on any thread, with an optional dependency on another job. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. JobHandle Schedule(ref readonly T job, JobHandle dependency) where T : unmanaged, IJob; /// /// Schedules a single job for execution on any thread without dependency. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. JobHandle Schedule(ref readonly T job) where T : unmanaged, IJob; /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A representing the dependencies that must be completed before this job can begin. /// Use if there are no dependencies. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. JobHandle ScheduleParallelFor(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency) where T : unmanaged, IJobParallelFor; /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads on a specified thread without dependency. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. JobHandle ScheduleParallelFor(ref readonly T job, int totalIteration, int batchSize, int threadIndex) where T : unmanaged, IJobParallelFor; /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads on any thread, with an optional dependency on another job.. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. JobHandle ScheduleParallelFor(ref readonly T job, int totalIteration, int batchSize, JobHandle dependency) where T : unmanaged, IJobParallelFor; /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads on any thread without dependency. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. JobHandle ScheduleParallelFor(ref readonly T job, int totalIteration, int batchSize) where T : unmanaged, IJobParallelFor; /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A representing the dependencies that must be completed before this job can begin. /// Use if there are no dependencies. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. JobHandle ScheduleParallel(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency) where T : unmanaged, IJobParallel; /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads on a specified thread without dependency. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. JobHandle ScheduleParallel(ref readonly T job, int totalIteration, int batchSize, int threadIndex) where T : unmanaged, IJobParallel; /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads on any thread, with an optional dependency on another job.. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. JobHandle ScheduleParallel(ref readonly T job, int totalIteration, int batchSize, JobHandle dependency) where T : unmanaged, IJobParallel; /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads on any thread without dependency. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. JobHandle ScheduleParallel(ref readonly T job, int totalIteration, int batchSize) where T : unmanaged, IJobParallel; /// /// Combines multiple job dependencies into a single . /// /// A collection of instances representing the dependencies to combine. /// A that represents the combined dependencies. The returned handle can be used to ensure /// that all specified dependencies are completed before proceeding. JobHandle CombineDependencies(params ReadOnlySpan dependencies); /// /// Retrieves the current status of a job identified by the specified handle. /// /// The handle representing the job whose status is to be retrieved. The handle must be valid. /// The current status of the job as a value. /// Returns if the handle is invalid or the job does not exist. JobState GetJobStatus(JobHandle handle); /// /// Blocks the calling thread until the specified job is completed. /// /// The handle of the job to wait for. void WaitComplete(JobHandle handle); /// /// Blocks the calling thread until all specified job handles have completed. /// /// This method waits for all jobs referenced by the provided handles to complete before /// returning. The calling thread will be blocked until every job has finished. If any handle is invalid or does not /// correspond to an active job, it is considered completed. This method is not thread-safe and should not be called /// concurrently from multiple threads. /// A collection of job handles to wait for. Each handle represents an asynchronous job whose completion is awaited. /// The collection must not be empty. void WaitAll(params ReadOnlySpan handles); /// /// Waits until any of the specified job handles has completed and returns the first completed handle. /// /// This method blocks the calling thread until at least one of the specified jobs has finished. /// The returned handle corresponds to the job that completed first among those provided. The order of handles in /// the span may affect which handle is returned if multiple jobs complete simultaneously. /// A read-only span containing the job handles to monitor for completion. Each handle represents a job whose /// completion status will be checked. /// The first job handle from the provided collection that has completed. JobHandle WaitAny(params ReadOnlySpan handles); } public unsafe partial class JobScheduler { public static int MainThreadIndex => -1; public static TempJobAllocator* pTempAllocator; /// /// Gets the allocation handle for the temporary job allocator. /// /// /// You must dispose the allocation before the fourth time you call after obtaining this handle. /// public static AllocationHandle TempAllocatorHandle => pTempAllocator->Handle; public static void InitTempAllocator() { pTempAllocator = (TempJobAllocator*)MemoryUtility.Malloc((nuint)sizeof(TempJobAllocator)); pTempAllocator->Init(); } public static void ReleaseTempAllocator() { if (pTempAllocator != null) { pTempAllocator->Dispose(); MemoryUtility.Free(pTempAllocator); } } } /// /// Provides a mechanism for scheduling and executing jobs across multiple worker threads. /// public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable { // Don't sleep indefinitely because that causes our 1ms job to become 15ms. private const int _SLEEP_THRESHOLD = -1; // Lock-Free constants: State mask (low 16 bits) and RC unit (1 << 16) private const int _STATE_MASK = 0xFFFF; private const int _RC_ONE = 0x10000; private FreeList _jobDataAllocator; private readonly ConcurrentSlotMap _jobInfoPool; private readonly ConcurrentQueue _jobQueue; private readonly WorkerThread[] _workerThreads; private readonly SemaphoreSlim _workSignal; private readonly CancellationTokenSource _cts; private bool _disposed = false; internal bool IsCancellationRequested => _cts.IsCancellationRequested; public int WorkerCount => _workerThreads.Length; /// /// Initializes a new instance of the class with the specified number of worker threads. /// /// The number of worker threads to create. If less than 1, at least one thread will be created. public JobScheduler(int threadCount) { _jobDataAllocator = new(8); _jobInfoPool = new(); _jobQueue = new(); _workSignal = new(0); _cts = new(); var workerCount = Math.Max(1, threadCount); _workerThreads = new WorkerThread[workerCount]; for (var i = 0; i < workerCount; i++) { _workerThreads[i] = new WorkerThread(i, this); } foreach (var worker in _workerThreads) { worker.Start(); } } ~JobScheduler() { Dispose(); } private void EnqueueJobIfReady(JobHandle handle) { ref var jobInfo = ref _jobInfoPool.GetElementReferenceAt(handle.ID, handle.Generation, out var exist); if (exist && Volatile.Read(ref jobInfo.dependencyCount) == 0) { // Note: JobState.Created is 0, JobState.Scheduled is 1. We assume RC logic doesn't touch initial state (RC=0). if (Interlocked.CompareExchange(ref jobInfo.state, JobState.Scheduled, JobState.Created) != JobState.Created) { return; } ConcurrentQueue jobQueue; if (jobInfo.threadIndex >= 0 && jobInfo.threadIndex < _workerThreads.Length) { jobQueue = _workerThreads[jobInfo.threadIndex].LocalQueue; } else { jobQueue = _jobQueue; } // Ensure the count of this job handle won't exceed the number of worker threads. // Worker threads will steal parallel iteration ranges from each other. var handleCount = Math.Min(jobInfo.remainingBatches, _workerThreads.Length); for (var i = 0; i < handleCount; i++) { jobQueue.Enqueue(handle); } _workSignal.Release(handleCount); } } private JobHandle CreateJobHandle(ref JobInfo jobInfo, params ReadOnlySpan dependencies) { var id = _jobInfoPool.Add(jobInfo, out var generation); ref var infoInPool = ref _jobInfoPool.GetElementReferenceAt(id, generation, out _); var handle = new JobHandle(id, generation); for (var i = 0; i < dependencies.Length; i++) { var dependency = dependencies[i]; if (!dependency.IsValid) { continue; } ref var depJobInfo = ref _jobInfoPool.GetElementReferenceAt(dependency.ID, dependency.Generation, out var exist); if (!exist) { // Dependency does not exist (likely completed already) continue; } // Lock-free registration: Try to acquire "Reader Lock" by incrementing RC in high bits. // If state is already Completed, we skip (dependency met). var registered = false; var completed = false; var spin = new SpinWait(); while (true) { var stateVal = Volatile.Read(ref Unsafe.As(ref depJobInfo.state)); var state = (JobState)(stateVal & _STATE_MASK); if (state == JobState.Completed) { completed = true; break; } // Attempt to increment RC (Reader Count) if (Interlocked.CompareExchange(ref Unsafe.As(ref depJobInfo.state), stateVal + _RC_ONE, stateVal) == stateVal) { // RC acquired. We are safe from "Remove" and state change. var count = Interlocked.Increment(ref depJobInfo.dependentCount); if (count <= JobInfo.MAX_DEPENDENTS) { // Safely write to the fixed buffer depJobInfo.dependentsID[count - 1] = id; depJobInfo.dependentsGeneration[count - 1] = generation; registered = true; } // Release RC Interlocked.Add(ref Unsafe.As(ref depJobInfo.state), -_RC_ONE); if (!registered) { // Failed to register because MAX_DEPENDENTS reached. // Backtrack the counter increment. Interlocked.Decrement(ref depJobInfo.dependentCount); // Cleanup and fail _jobDataAllocator.Free(jobInfo.pJobData); return JobHandle.Invalid; } break; } spin.SpinOnce(-1); } if (!registered && !completed) { // Should not happen if logic is correct, unless loop logic changed Interlocked.Increment(ref infoInPool.dependencyCount); } else if (registered) { // Successfully added dependency Interlocked.Increment(ref infoInPool.dependencyCount); } // else: completed is true, registered is false -> Dependency is already done, so we don't increment our dependencyCount. } EnqueueJobIfReady(handle); return handle; } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal bool HasWork() { if (!_jobQueue.IsEmpty) { return true; } for (var i = 0; i < _workerThreads.Length; i++) { if (!_workerThreads[i].LocalQueue.IsEmpty) { return true; } } return false; } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void WaitForWork(int timeout) { _workSignal.Wait(timeout, _cts.Token); } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal bool TryStealFromMain(int threadIndex, out JobHandle outHandle) { return _jobQueue.TryDequeue(out outHandle); } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal bool TryStealFromWorker(int threadIndex, out JobHandle outHandle) { return _workerThreads[threadIndex].LocalQueue.TryDequeue(out outHandle); } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal ref JobInfo GetJobInfoReference(JobHandle handle, out bool exist) { if (!handle.IsValid) { exist = false; return ref Unsafe.NullRef(); } return ref _jobInfoPool.GetElementReferenceAt(handle.ID, handle.Generation, out exist); } internal void MarkJobComplete(JobHandle handle) { if (!handle.IsValid) { return; } ref var info = ref _jobInfoPool.GetElementReferenceAt(handle.ID, handle.Generation, out var exist); if (!exist) { return; } // Lock-free Completion: // 1. Transition State to Completed (preserving or setting upper bits?). // Actually, we want to block new Readers. Setting state to Completed blocks new Readers. // 2. Wait for existing Readers (RC == 0). var spin = new SpinWait(); while (true) { var stateVal = Volatile.Read(ref Unsafe.As(ref info.state)); var state = (JobState)(stateVal & _STATE_MASK); if (state == JobState.Completed) { return; // Already completed (shouldn't happen for single-execution jobs) } //if (state != JobState.Running) //{ // // If in valid state (e.g. Scheduled?), we still assume we can complete it. // // Usually it should be Running. //} // Construct new value: State=Completed, preserve RC (temporarily) or strictly replace only low bits? // We set low bits to Completed. High bits (RC) remain. var newState = (stateVal & ~_STATE_MASK) | (int)JobState.Completed; if (Interlocked.CompareExchange(ref Unsafe.As(ref info.state), newState, stateVal) == stateVal) { // Successfully set State to Completed. New readers will see Completed and back off. // Now we must wait for existing readers to finish (RC to become 0). while (true) { var current = Volatile.Read(ref Unsafe.As(ref info.state)); if (((uint)current >> 16) == 0) { break; // RC is 0. Safe to proceed. } spin.SpinOnce(-1); } break; } spin.SpinOnce(-1); } // We now have exclusive access to dependentsID (no new readers, old readers finished). // Safely capture dependents. var dependentCount = info.dependentCount; dependentCount = Math.Min(dependentCount, JobInfo.MAX_DEPENDENTS); // Safety cap // Use stackalloc to avoid allocation, but we'll copy to notify after freeing parent. var dependentsToNotify = stackalloc JobHandle[dependentCount]; for (var i = 0; i < dependentCount; i++) { dependentsToNotify[i] = new JobHandle(info.dependentsID[i], info.dependentsGeneration[i]); } _jobDataAllocator.Free(info.pJobData); _jobInfoPool.Remove(handle.ID, handle.Generation); for (var i = 0; i < dependentCount; i++) { var depHandle = dependentsToNotify[i]; ref var depJobInfo = ref _jobInfoPool.GetElementReferenceAt(depHandle.ID, depHandle.Generation, out var depExist); if (depExist && Interlocked.Decrement(ref depJobInfo.dependencyCount) == 0) { EnqueueJobIfReady(depHandle); } } } public JobHandle Schedule(ref readonly T job, int threadIndex, JobHandle dependency) where T : unmanaged, IJob { var pJobData = _jobDataAllocator.Allocate(MemoryUtility.SizeOf(), MemoryUtility.AlignOf()); if (pJobData == null) { return JobHandle.Invalid; } fixed (T* pJob = &job) { MemoryUtility.MemCpy(pJobData, pJob, MemoryUtility.SizeOf()); } var jobInfo = new JobInfo { pJobData = pJobData, pExecutionFunc = &JobExecutor.Execute, remainingBatches = 1, threadIndex = threadIndex, jobRanges = JobRanges.Single, }; return CreateJobHandle(ref jobInfo, dependency); } public JobHandle Schedule(ref readonly T job, int threadIndex) where T : unmanaged, IJob => Schedule(in job, threadIndex, JobHandle.Invalid); public JobHandle Schedule(ref readonly T job, JobHandle dependency) where T : unmanaged, IJob => Schedule(in job, -1, dependency); public JobHandle Schedule(ref readonly T job) where T : unmanaged, IJob => Schedule(in job, -1, JobHandle.Invalid); public JobHandle ScheduleParallelFor(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency) where T : unmanaged, IJobParallelFor { var pJobData = _jobDataAllocator.Allocate(MemoryUtility.SizeOf(), MemoryUtility.AlignOf()); if (pJobData == null) { return JobHandle.Invalid; } fixed (T* pJob = &job) { MemoryUtility.MemCpy(pJobData, pJob, MemoryUtility.SizeOf()); } var optimalBatchSize = Math.Max(1, batchSize); var totalBatches = (totalIteration + optimalBatchSize - 1) / optimalBatchSize; var jobInfo = new JobInfo { pJobData = pJobData, pExecutionFunc = &JobExecutor.ExecuteParallelFor, remainingBatches = totalBatches, threadIndex = threadIndex, jobRanges = new() { currentIndex = 0, batchSize = optimalBatchSize, totalIteration = totalIteration, }, }; return CreateJobHandle(ref jobInfo, dependency); } public JobHandle ScheduleParallelFor(ref readonly T job, int totalIteration, int batchSize, int threadIndex) where T : unmanaged, IJobParallelFor => ScheduleParallelFor(in job, totalIteration, batchSize, threadIndex, JobHandle.Invalid); public JobHandle ScheduleParallelFor(ref readonly T job, int totalIteration, int batchSize, JobHandle dependency) where T : unmanaged, IJobParallelFor => ScheduleParallelFor(in job, totalIteration, batchSize, -1, dependency); public JobHandle ScheduleParallelFor(ref readonly T job, int totalIteration, int batchSize) where T : unmanaged, IJobParallelFor => ScheduleParallelFor(in job, totalIteration, batchSize, -1, JobHandle.Invalid); public JobHandle ScheduleParallel(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency) where T : unmanaged, IJobParallel { var pJobData = _jobDataAllocator.Allocate(MemoryUtility.SizeOf(), MemoryUtility.AlignOf()); if (pJobData == null) { return JobHandle.Invalid; } fixed (T* pJob = &job) { MemoryUtility.MemCpy(pJobData, pJob, MemoryUtility.SizeOf()); } var optimalBatchSize = Math.Max(1, batchSize); var totalBatches = (totalIteration + optimalBatchSize - 1) / optimalBatchSize; var jobInfo = new JobInfo { pJobData = pJobData, pExecutionFunc = &JobExecutor.ExecuteParallel, remainingBatches = totalBatches, threadIndex = threadIndex, jobRanges = new() { currentIndex = 0, batchSize = optimalBatchSize, totalIteration = totalIteration, }, }; return CreateJobHandle(ref jobInfo, dependency); } public JobHandle ScheduleParallel(ref readonly T job, int totalIteration, int batchSize, int threadIndex) where T : unmanaged, IJobParallel => ScheduleParallel(in job, totalIteration, batchSize, threadIndex, JobHandle.Invalid); public JobHandle ScheduleParallel(ref readonly T job, int totalIteration, int batchSize, JobHandle dependency) where T : unmanaged, IJobParallel => ScheduleParallel(in job, totalIteration, batchSize, -1, dependency); public JobHandle ScheduleParallel(ref readonly T job, int totalIteration, int batchSize) where T : unmanaged, IJobParallel => ScheduleParallel(in job, totalIteration, batchSize, -1, JobHandle.Invalid); public JobHandle CombineDependencies(params ReadOnlySpan dependencies) { var jobInfo = new JobInfo { pJobData = null, pExecutionFunc = null, remainingBatches = 1, threadIndex = -1, jobRanges = JobRanges.Single, }; return CreateJobHandle(ref jobInfo, dependencies); } public JobState GetJobStatus(JobHandle handle) { if (!handle.IsValid) { return JobState.Invalid; } ref var jobInfo = ref _jobInfoPool.GetElementReferenceAt(handle.ID, handle.Generation, out var exist); if (!exist) { return JobState.Completed; // We assume completed if not found. Invalid state is reserved for error. } // Mask out the Reader Count (upper 16 bits) to return the actual State return (JobState)(Volatile.Read(ref Unsafe.As(ref jobInfo.state)) & _STATE_MASK); } public void WaitComplete(JobHandle handle) { if (!handle.IsValid) { return; } // TODO: We can steal a up stream job to execute while waiting. // For example, if we wait on job A which depends on job B, and both are not scheduled yet, we can steal and execute job B to speed up the completion of A. // And then maybe we can even execute A after B if we can guarantee the order and avoid deadlock. This is a common optimization in job systems called "helping" or "work stealing with dependencies". var spin = new SpinWait(); while (_jobInfoPool.TryGetElement(handle.ID, handle.Generation, out var jobInfo)) { // Mask out RC if ((jobInfo.state & (JobState)_STATE_MASK) == JobState.Completed) { return; } spin.SpinOnce(_SLEEP_THRESHOLD); } } public void WaitAll(params ReadOnlySpan handles) { var spin = new SpinWait(); while (true) { var completedCount = 0; foreach (var handle in handles) { if (!_jobInfoPool.Contains(handle.ID, handle.Generation)) { completedCount++; } } if (completedCount == handles.Length) { return; } spin.SpinOnce(_SLEEP_THRESHOLD); } } public JobHandle WaitAny(params ReadOnlySpan handles) { var spin = new SpinWait(); while (true) { foreach (var handle in handles) { if (!_jobInfoPool.Contains(handle.ID, handle.Generation)) { return handle; } } spin.SpinOnce(_SLEEP_THRESHOLD); } } public void Dispose() { if (_disposed) { return; } _cts.Cancel(); foreach (var worker in _workerThreads) { worker.Dispose(); } _jobInfoPool.Clear(); _jobQueue.Clear(); _jobDataAllocator.Dispose(); _workSignal.Dispose(); _cts.Dispose(); _disposed = true; GC.SuppressFinalize(this); } }