using Misaki.HighPerformance.Collections; using Misaki.HighPerformance.LowLevel.Buffer; using Misaki.HighPerformance.LowLevel.Helpers; using System.Collections.Concurrent; using System.Runtime.CompilerServices; namespace Misaki.HighPerformance.Jobs; /// /// Provides a mechanism for scheduling and executing jobs across multiple worker threads. /// /// The class is designed to manage the execution of jobs, including support /// for dependencies, parallel execution, and thread-specific job assignment. It allows developers to schedule jobs that /// implement the or interfaces, and it ensures efficient utilization /// of worker threads through job batching and work-stealing mechanisms. This class is thread-safe and can be used in /// multi-threaded environments. However, it must be disposed when no longer needed to release resources and terminate /// worker threads. public unsafe sealed class JobScheduler : IDisposable { private FreeList _jobDataAllocator; private readonly ConcurrentSlotMap _jobInfoPool; private readonly ConcurrentQueue _jobQueue; private readonly WorkerThread[] _workerThreads; private readonly Lock _lock; private readonly SemaphoreSlim _workSignal; private readonly CancellationTokenSource _cts; private bool _disposed = false; public int WorkerCount => _workerThreads.Length; internal bool IsCancellationRequested => _cts.IsCancellationRequested; /// /// Initializes a new instance of the class with the specified number of worker threads. /// /// The number of worker threads to create. If less than 1, at least one thread will be created. public JobScheduler(int threadCount) { _jobDataAllocator = new(8); _jobInfoPool = new(); _jobQueue = new(); _lock = new(); _workSignal = new(0); _cts = new(); var workerCount = Math.Max(1, threadCount); _workerThreads = new WorkerThread[workerCount]; for (var i = 0; i < workerCount; i++) { _workerThreads[i] = new WorkerThread(i, this); } foreach (var worker in _workerThreads) { worker.Start(); } } ~JobScheduler() { Dispose(); } private void EnqueueJobIfReady(JobHandle handle) { ref var jobInfo = ref _jobInfoPool.GetElementReferenceAt(handle._id, handle._generation, out var exist); if (exist && Volatile.Read(ref jobInfo.dependencyCount) == 0) { if (Interlocked.CompareExchange(ref jobInfo.status, JobStatus.Scheduled, JobStatus.Created) != JobStatus.Created) { return; } ConcurrentQueue jobQueue; if (jobInfo.threadIndex >= 0 && jobInfo.threadIndex < _workerThreads.Length) { jobQueue = _workerThreads[jobInfo.threadIndex].LocalQueue; } else { jobQueue = _jobQueue; } // Ensure the count of this job handle won't exceed the number of worker threads. // Worker threads will steal parallel iteration ranges from each other. var handleCount = Math.Min(jobInfo.remainingBatches, _workerThreads.Length); for (var i = 0; i < handleCount; i++) { jobQueue.Enqueue(handle); } _workSignal.Release(handleCount); } } private JobHandle CreateJobHandle(ref JobInfo jobInfo, params ReadOnlySpan dependencies) { var id = _jobInfoPool.Add(jobInfo, out var generation); ref var infoInPool = ref _jobInfoPool.GetElementReferenceAt(id, generation, out _); var handle = new JobHandle(id, generation); for (var i = 0; i < dependencies.Length; i++) { var dependency = dependencies[i]; if (!dependency.IsValid) { continue; } lock (_lock) { ref var depJobInfo = ref _jobInfoPool.GetElementReferenceAt(dependency._id, dependency._generation, out var exist); if (!exist || Volatile.Read(ref Unsafe.As(ref depJobInfo.status)) == (int)JobStatus.Completed) { continue; } if (depJobInfo.dependentCount >= JobInfo.MAX_DEPENDENTS) { // Too many dependents // TODO: Handle this case properly _jobDataAllocator.Free(jobInfo.pJobData); return JobHandle.Invalid; } depJobInfo.dependentsID[depJobInfo.dependentCount] = id; depJobInfo.dependentsGeneration[depJobInfo.dependentCount] = generation; depJobInfo.dependentCount++; } Interlocked.Increment(ref infoInPool.dependencyCount); } EnqueueJobIfReady(handle); return handle; } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal bool HasWork() { return !_jobQueue.IsEmpty || _workerThreads.Any(w => !w.LocalQueue.IsEmpty); } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void WaitForWork() { _workSignal.Wait(_cts.Token); } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal bool TryStealJob(int threadIndex, out JobHandle outHandle) { if (threadIndex >= 0 && threadIndex < _workerThreads.Length && _workerThreads[threadIndex].LocalQueue.TryDequeue(out outHandle)) { return true; } else if (_jobQueue.TryDequeue(out outHandle)) { return true; } outHandle = JobHandle.Invalid; return false; } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal ref JobInfo GetJobInfoReference(JobHandle handle, out bool exist) { if (!handle.IsValid) { exist = false; return ref Unsafe.NullRef(); } return ref _jobInfoPool.GetElementReferenceAt(handle._id, handle._generation, out exist); } internal void MarkJobComplete(JobHandle handle) { if (!handle.IsValid) { return; } ref var info = ref _jobInfoPool.GetElementReferenceAt(handle._id, handle._generation, out var exist); if (!exist) { return; } if (Interlocked.CompareExchange(ref info.status, JobStatus.Completed, JobStatus.Running) != JobStatus.Running) { return; } var dependentsToNotify = stackalloc JobHandle[JobInfo.MAX_DEPENDENTS]; var dependentCount = 0; lock (_lock) { dependentCount = info.dependentCount; for (var i = 0; i < dependentCount; i++) { dependentsToNotify[i] = new JobHandle(info.dependentsID[i], info.dependentsGeneration[i]); } } _jobDataAllocator.Free(info.pJobData); _jobInfoPool.Remove(handle._id, handle._generation); for (var i = 0; i < dependentCount; i++) { var depHandle = dependentsToNotify[i]; ref var depJobInfo = ref _jobInfoPool.GetElementReferenceAt(depHandle._id, depHandle._generation, out var depExist); if (depExist && Interlocked.Decrement(ref depJobInfo.dependencyCount) == 0) { EnqueueJobIfReady(depHandle); } } } /// /// Schedules a single job for execution on a specified thread, with an optional dependency on another job. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The index of the thread that will execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A representing the dependencies that must be completed before this job can begin. /// Use if there are no dependencies. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. public JobHandle Schedule(ref T job, int threadIndex, JobHandle dependency) where T : unmanaged, IJob { var jobData = _jobDataAllocator.Allocate(MemoryUtilities.SizeOf(), MemoryUtilities.AlignOf()); if (jobData == null) { return JobHandle.Invalid; } fixed (T* pJob = &job) { MemoryUtilities.MemCpy(pJob, jobData, MemoryUtilities.SizeOf()); } var jobInfo = new JobInfo { pJobData = jobData, executeDelegate = &JobExecutor.Execute, remainingBatches = 1, threadIndex = threadIndex, jobRanges = JobRanges.Single, }; return CreateJobHandle(ref jobInfo, dependency); } /// /// Schedules a single job for execution on a specified thread, with an optional dependency on another job. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The index of the thread that will execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. public JobHandle Schedule(ref T job, int threadIndex) where T : unmanaged, IJob => Schedule(ref job, threadIndex, JobHandle.Invalid); /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// The index of the thread that will execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A representing the dependencies that must be completed before this job can begin. /// Use if there are no dependencies. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. public JobHandle ScheduleParallel(ref T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency) where T : unmanaged, IJobParallelFor { var jobData = _jobDataAllocator.Allocate(MemoryUtilities.SizeOf(), MemoryUtilities.AlignOf()); if (jobData == null) { return JobHandle.Invalid; } fixed (T* pJob = &job) { MemoryUtilities.MemCpy(pJob, jobData, MemoryUtilities.SizeOf()); } var optimalBatchSize = Math.Max(1, batchSize); var totalBatches = (totalIteration + optimalBatchSize - 1) / optimalBatchSize; var jobInfo = new JobInfo { pJobData = jobData, executeDelegate = &JobExecutor.ExecuteParallel, remainingBatches = totalBatches, threadIndex = threadIndex, jobRanges = new() { currentIndex = 0, batchSize = optimalBatchSize, totalIteration = totalIteration, }, }; return CreateJobHandle(ref jobInfo, dependency); } /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// /// The type of the job to execute. Must implement and be unmanaged. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// The index of the thread that will execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A that can be used to track the completion of the scheduled job. /// Returns if the job data allocation fails. public JobHandle ScheduleParallel(ref T job, int totalIteration, int batchSize, int threadIndex) where T : unmanaged, IJobParallelFor => ScheduleParallel(ref job, totalIteration, batchSize, threadIndex, JobHandle.Invalid); /// /// Combines multiple job dependencies into a single . /// /// A collection of instances representing the dependencies to combine. /// A that represents the combined dependencies. The returned handle can be used to ensure /// that all specified dependencies are completed before proceeding. public JobHandle CombineDependencies(params ReadOnlySpan dependencies) { var jobInfo = new JobInfo { pJobData = null, executeDelegate = null, remainingBatches = 1, threadIndex = -1, jobRanges = JobRanges.Single, }; return CreateJobHandle(ref jobInfo, dependencies); } /// /// Retrieves the current status of a job identified by the specified handle. /// /// The handle representing the job whose status is to be retrieved. The handle must be valid. /// The current status of the job as a value. /// Returns if the handle is invalid or the job does not exist. public JobStatus GetJobStatus(JobHandle handle) { if (!handle.IsValid) { return JobStatus.Invalid; } ref var jobInfo = ref _jobInfoPool.GetElementReferenceAt(handle._id, handle._generation, out var exist); if (!exist) { return JobStatus.Invalid; } return (JobStatus)Volatile.Read(ref Unsafe.As(ref jobInfo.status)); } /// /// Blocks the calling thread until the specified job is completed. /// /// The handle of the job to wait for. public void WaitComplete(JobHandle handle) { if (!handle.IsValid) { return; } var spin = new SpinWait(); while (_jobInfoPool.TryGetElement(handle._id, handle._generation, out var jobInfo)) { if (jobInfo.status == JobStatus.Completed) { return; } spin.SpinOnce(-1); } } public void Dispose() { if (_disposed) { return; } _cts.Cancel(); foreach (var worker in _workerThreads) { worker.Dispose(); } _jobInfoPool.Clear(); _jobQueue.Clear(); _jobDataAllocator.Dispose(); _cts.Dispose(); _disposed = true; GC.SuppressFinalize(this); } }