diff --git a/Misaki.HighPerformance.Jobs/AssemblyInfo.cs.cs b/Misaki.HighPerformance.Jobs/AssemblyInfo.cs.cs deleted file mode 100644 index f7070ce..0000000 --- a/Misaki.HighPerformance.Jobs/AssemblyInfo.cs.cs +++ /dev/null @@ -1,5 +0,0 @@ - -global using unsafe JobExecutionFunc = delegate*; -#if MHP_SUPPORT_MANAGED_JOB -global using unsafe ManagedJobExecutionFunc = delegate*; -#endif diff --git a/Misaki.HighPerformance.Jobs/JobDataPool.cs b/Misaki.HighPerformance.Jobs/JobDataPool.cs new file mode 100644 index 0000000..a078fdc --- /dev/null +++ b/Misaki.HighPerformance.Jobs/JobDataPool.cs @@ -0,0 +1,52 @@ +using Misaki.HighPerformance.Collections; +using System.Runtime.CompilerServices; + +namespace Misaki.HighPerformance.Jobs; + +/// +/// This class manages pools of job data for different types. It allows allocating, retrieving, and freeing job data instances using unique IDs and generations to ensure safe access and reuse of resources. +/// +public static class JobDataPool + where T : struct +{ + private static readonly ConcurrentSlotMap s_slots = new ConcurrentSlotMap(8); + + /// + /// Allocates a new instance of type T in the pool and returns its ID and generation. + /// + /// The type of the data to allocate. + /// The data to allocate. + /// The generation of the allocated data. + /// The ID of the allocated data. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int Allocate(ref readonly T data, out int generation) + { + return s_slots.Add(data, out generation); + } + + /// + /// Gets a reference to the data of type T associated with the given ID and generation. The 'exists' output parameter indicates whether the data exists in the pool. + /// + /// The type of the data to retrieve. + /// The ID of the data to retrieve. + /// The generation of the data to retrieve. + /// A value indicating whether the data exists in the pool. + /// A reference to the requested data. Undefined if 'exists' is false. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static ref T GetReference(int id, int generation, out bool exists) + { + return ref s_slots.GetElementReferenceAt(id, generation, out exists); + } + + /// + /// Frees the data of type T associated with the given ID and generation, making it available for future allocations. After calling this method, the ID and generation can be reused for new data. + /// + /// The type of the data to free. + /// The ID of the data to free. + /// The generation of the data to free. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void Free(int id, int generation) + { + s_slots.Remove(id, generation); + } +} \ No newline at end of file diff --git a/Misaki.HighPerformance.Jobs/JobExecutor.cs b/Misaki.HighPerformance.Jobs/JobExecutor.cs index f8ad87f..fe56b48 100644 --- a/Misaki.HighPerformance.Jobs/JobExecutor.cs +++ b/Misaki.HighPerformance.Jobs/JobExecutor.cs @@ -1,16 +1,22 @@ +using System.Diagnostics; +using System.Runtime.CompilerServices; + namespace Misaki.HighPerformance.Jobs; -internal static unsafe class JobExecutor +internal static class JobExecutor { - public static bool Execute(void* pJobData, ref JobRanges jobRanges, ref int remainingBatches, ref readonly JobExecutionContext ctx) - where T : unmanaged, IJob + public static bool Execute(int dataID, int dataGeneration, ref JobRanges jobRanges, ref int remainingBatches, ref readonly JobExecutionContext ctx) + where T : struct, IJob { - var pJob = (T*)pJobData; - pJob->Execute(in ctx); + ref var job = ref JobDataPool.GetReference(dataID, dataGeneration, out var exists); + Debug.Assert(exists, "Job data not found in the pool."); + + job.Execute(in ctx); return Interlocked.Decrement(ref remainingBatches) == 0; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] private static bool GetWorkerStealingRange(ref JobRanges jobRanges, out int start, out int end) { start = Interlocked.Add(ref jobRanges.currentIndex, jobRanges.batchSize) - jobRanges.batchSize; @@ -25,10 +31,12 @@ internal static unsafe class JobExecutor return true; } - public static bool ExecuteParallelFor(void* pJobData, ref JobRanges jobRanges, ref int remainingBatches, ref readonly JobExecutionContext ctx) - where T : unmanaged, IJobParallelFor + public static bool ExecuteParallelFor(int dataID, int dataGeneration, ref JobRanges jobRanges, ref int remainingBatches, ref readonly JobExecutionContext ctx) + where T : struct, IJobParallelFor { - var pJob = (T*)pJobData; + ref var job = ref JobDataPool.GetReference(dataID, dataGeneration, out var exists); + Debug.Assert(exists, "Job data not found in the pool."); + var wasTheLastBatch = false; while (true) @@ -40,7 +48,7 @@ internal static unsafe class JobExecutor for (var i = start; i < end; i++) { - pJob->Execute(i, in ctx); + job.Execute(i, in ctx); } if (Interlocked.Decrement(ref remainingBatches) == 0) @@ -52,12 +60,13 @@ internal static unsafe class JobExecutor return wasTheLastBatch; } - public static bool ExecuteParallel(void* pJobData, ref JobRanges jobRanges, ref int remainingBatches, ref readonly JobExecutionContext ctx) - where T : unmanaged, IJobParallel + public static bool ExecuteParallel(int dataID, int dataGeneration, ref JobRanges jobRanges, ref int remainingBatches, ref readonly JobExecutionContext ctx) + where T : struct, IJobParallel { - var pJob = (T*)pJobData; - var wasTheLastBatch = false; + ref var job = ref JobDataPool.GetReference(dataID, dataGeneration, out var exists); + Debug.Assert(exists, "Job data not found in the pool."); + var wasTheLastBatch = false; while (true) { if (!GetWorkerStealingRange(ref jobRanges, out var start, out var end)) @@ -65,7 +74,7 @@ internal static unsafe class JobExecutor break; } - pJob->Execute(start, end, in ctx); + job.Execute(start, end, in ctx); if (Interlocked.Decrement(ref remainingBatches) == 0) { wasTheLastBatch = true; diff --git a/Misaki.HighPerformance.Jobs/JobInfo.cs b/Misaki.HighPerformance.Jobs/JobInfo.cs index 86a014a..1ec6dba 100644 --- a/Misaki.HighPerformance.Jobs/JobInfo.cs +++ b/Misaki.HighPerformance.Jobs/JobInfo.cs @@ -38,7 +38,7 @@ public enum JobPriority Low = 2 } -internal unsafe struct JobInfo +public unsafe struct JobInfo { public ref struct DependentIterator { @@ -63,13 +63,13 @@ internal unsafe struct JobInfo [MethodImpl(MethodImplOptions.AggressiveInlining)] get { - if (_index < MAX_DEPENDENTS) + if (_index < MAX_LOCAL_DEPENDENTS) { return new JobHandle(_jobInfo.dependentsID[_index], _jobInfo.dependentsGeneration[_index]); } else { - return _jobInfo.additionalDependents[_index - MAX_DEPENDENTS]; + return _jobInfo.additionalDependents[_index - MAX_LOCAL_DEPENDENTS]; } } } @@ -81,14 +81,17 @@ internal unsafe struct JobInfo } } - public const int MAX_DEPENDENTS = 8; + public const int MAX_LOCAL_DEPENDENTS = 8; - public void* pJobData; - public JobExecutionFunc pExecutionFunc; + public delegate* pExecutionFunc; + public delegate* pFreeFunc; + + public int dataID; + public int dataGeneration; // The list of jobs that are waiting for THIS job to complete. - public fixed int dependentsID[MAX_DEPENDENTS]; // The actual list of IDs - public fixed int dependentsGeneration[MAX_DEPENDENTS]; // The actual list of generations + public fixed int dependentsID[MAX_LOCAL_DEPENDENTS]; // The actual list of IDs + public fixed int dependentsGeneration[MAX_LOCAL_DEPENDENTS]; // The actual list of generations public UnsafeList additionalDependents; public int dependentCount; @@ -110,7 +113,7 @@ internal unsafe struct JobInfo } } -internal struct JobRanges +public struct JobRanges { public int batchSize; public int totalIteration; diff --git a/Misaki.HighPerformance.Jobs/JobScheduler.cs b/Misaki.HighPerformance.Jobs/JobScheduler.cs index 5e7173b..df0af17 100644 --- a/Misaki.HighPerformance.Jobs/JobScheduler.cs +++ b/Misaki.HighPerformance.Jobs/JobScheduler.cs @@ -1,14 +1,13 @@ using Misaki.HighPerformance.Collections; using Misaki.HighPerformance.LowLevel.Buffer; using Misaki.HighPerformance.LowLevel.Collections; -using Misaki.HighPerformance.LowLevel.Utilities; using System.Collections.Concurrent; using System.Diagnostics; using System.Runtime.CompilerServices; namespace Misaki.HighPerformance.Jobs; -public struct JobSchedulerDesc() +public struct JobSchedulerDesc { /// /// Gets or sets the number of worker threads to be created and managed by the job scheduler. If set to less than 1, at least one worker thread will be created. @@ -21,10 +20,10 @@ public struct JobSchedulerDesc() /// /// Gets or sets the priority of the worker threads. This can be used to influence the scheduling of the threads by the operating system. The default value is . /// - public ThreadPriority ThreadPriority + public required ThreadPriority ThreadPriority { get; set; - } = ThreadPriority.Normal; + } /// /// Gets or sets the state object for the job scheduler. This can be used to store any user-defined data or context that may be needed by the jobs or worker threads. The job scheduler does not interpret or manage this state in any way; it is simply provided as a convenience for users of the job scheduler. The default value is null. @@ -118,8 +117,6 @@ public sealed unsafe partial class JobScheduler : IDisposable // Don't sleep indefinitely because that causes our 1ms job to become 15ms. private const int _SLEEP_THRESHOLD = -1; - private FreeList _freeList; - private readonly ConcurrentSlotMap _jobInfoPool; private readonly ConcurrentQueue[] _jobQueues; private readonly WorkerThread[] _workerThreads; @@ -147,8 +144,6 @@ public sealed unsafe partial class JobScheduler : IDisposable { var workerCount = Math.Max(1, desc.ThreadCount); - _freeList = new FreeList(MemoryUtility.AlignOf(), maxConcurrencyLevel: workerCount); - _jobInfoPool = new ConcurrentSlotMap(128); _jobQueues = new ConcurrentQueue[3]; @@ -181,14 +176,11 @@ public sealed unsafe partial class JobScheduler : IDisposable /// The number of worker threads to create. If less than 1, at least one thread will be created. /// The priority of the worker threads. /// The state object for the job scheduler. - /// A value indicating whether managed jobs are allowed. [Obsolete("Use JobScheduler(JobSchedulerDesc) instead.")] - public JobScheduler(int threadCount, ThreadPriority priority = ThreadPriority.Normal, object? state = null, bool allowManagedJobs = false) + public JobScheduler(int threadCount, ThreadPriority priority = ThreadPriority.Normal, object? state = null) { var workerCount = Math.Max(1, threadCount); - _freeList = new FreeList(MemoryUtility.AlignOf(), maxConcurrencyLevel: workerCount); - _jobInfoPool = new ConcurrentSlotMap(128); _jobQueues = new ConcurrentQueue[3]; @@ -263,17 +255,8 @@ public sealed unsafe partial class JobScheduler : IDisposable private JobHandle CreateJobHandle(ref JobInfo jobInfo, int threadIndex, params ReadOnlySpan dependencies) { - var validDepCount = 0; - for (var i = 0; i < dependencies.Length; i++) - { - if (dependencies[i].IsValid) - { - validDepCount++; - } - } - // Advance count to account for all dependencies upfront + 1 guard lock - Interlocked.Increment(ref jobInfo.dependencyCount); + Interlocked.Add(ref jobInfo.dependencyCount, dependencies.Length + 1); var id = _jobInfoPool.Add(jobInfo, out var generation); ref var infoInPool = ref _jobInfoPool.GetElementReferenceAt(id, generation, out _); @@ -316,7 +299,7 @@ public sealed unsafe partial class JobScheduler : IDisposable { // RC acquired. We are safe from "Remove" and state change. var count = Interlocked.Increment(ref depJobInfo.dependentCount); - if (count <= JobInfo.MAX_DEPENDENTS) + if (count <= JobInfo.MAX_LOCAL_DEPENDENTS) { // Safely write to the fixed buffer depJobInfo.dependentsID[count - 1] = id; @@ -475,39 +458,41 @@ public sealed unsafe partial class JobScheduler : IDisposable info.additionalDependents.Dispose(); - _freeList.Free(info.pJobData); + if (info.pFreeFunc != null) + { + info.pFreeFunc(info.dataID, info.dataGeneration); + } + _jobInfoPool.Remove(handle.ID, handle.Generation); } /// /// Schedules a single job for execution on a specified thread, with an optional dependency on another job. /// - /// The type of the job to execute. Must implement and be unmanaged. + /// The type of the job to execute. Must implement and be struct. /// The job instance to be executed. The job data will be copied internally. /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A collection of representing the dependencies that must be completed before this job can begin. /// The priority of the job. - /// A that can be used to track the completion of the scheduled job. - /// Returns if the job data allocation fails. + /// A that can be used to track the completion of the scheduled job. Returns if the job data allocation fails. + [MethodImpl(MethodImplOptions.AggressiveInlining)] public JobHandle Schedule(ref readonly T job, int threadIndex, JobPriority priority = JobPriority.Normal, params ReadOnlySpan dependencies) - where T : unmanaged, IJob + where T : struct, IJob { - var pJobData = _freeList.Allocate(MemoryUtility.SizeOf(), MemoryUtility.AlignOf()); - if (pJobData == null) - { - return JobHandle.Invalid; - } - - *(T*)pJobData = job; + var id = JobDataPool.Allocate(in job, out var generation); var jobInfo = new JobInfo { - pJobData = pJobData, + dataID = id, + dataGeneration = generation, + pExecutionFunc = &JobExecutor.Execute, + pFreeFunc = &JobDataPool.Free, remainingBatches = 1, threadIndex = threadIndex, + priority = priority, jobRanges = JobRanges.Single, }; @@ -517,65 +502,75 @@ public sealed unsafe partial class JobScheduler : IDisposable /// /// Schedules a single job for execution on a specified thread, with an optional dependency on another job. /// - /// The type of the job to execute. Must implement and be unmanaged. + /// The type of the job to execute. Must implement and be struct. /// The job instance to be executed. The job data will be copied internally. /// A collection of representing the dependencies that must be completed before this job can begin. /// The priority of the job. - /// A that can be used to track the completion of the scheduled job. - /// Returns if the job data allocation fails. + /// A that can be used to track the completion of the scheduled job. Returns if the job data allocation fails. + [MethodImpl(MethodImplOptions.AggressiveInlining)] public JobHandle Schedule(ref readonly T job, JobPriority priority = JobPriority.Normal, params ReadOnlySpan dependencies) - where T : unmanaged, IJob + where T : struct, IJob => Schedule(in job, -1, priority, dependencies); /// /// Schedules a single job for execution on a specified thread, with an optional dependency on another job. /// - /// The type of the job to execute. Must implement and be unmanaged. + /// The type of the job to execute. Must implement and be struct. /// The job instance to be executed. The job data will be copied internally. /// A collection of representing the dependencies that must be completed before this job can begin. - /// A that can be used to track the completion of the scheduled job. - /// Returns if the job data allocation fails. + /// A that can be used to track the completion of the scheduled job. Returns if the job data allocation fails. + [MethodImpl(MethodImplOptions.AggressiveInlining)] public JobHandle Schedule(ref readonly T job, params ReadOnlySpan dependencies) - where T : unmanaged, IJob + where T : struct, IJob => Schedule(in job, -1, JobPriority.Normal, dependencies); + /// + /// Schedules a single job for execution on a specified thread, with an optional dependency on another job. + /// + /// The type of the job to execute. Must implement and be struct. + /// The job instance to be executed. The job data will be copied internally. + /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. + /// A collection of representing the dependencies that must be completed before this job can begin. + /// A that can be used to track the completion of the scheduled job. Returns if the job data allocation fails. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public JobHandle Schedule(ref readonly T job, int threadIndex, params ReadOnlySpan dependencies) + where T : struct, IJob + => Schedule(in job, threadIndex, JobPriority.Normal, dependencies); + /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// - /// The type of the job to execute. Must implement and be unmanaged. + /// The type of the job to execute. Must implement and be struct. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A collection of representing the dependencies that must be completed before this job can begin. /// The priority of the job. - /// A that can be used to track the completion of the scheduled job. - /// Returns if the job data allocation fails. + /// A that can be used to track the completion of the scheduled job. if the job data allocation fails. + [MethodImpl(MethodImplOptions.AggressiveInlining)] public JobHandle ScheduleParallelFor(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobPriority priority = JobPriority.Normal, params ReadOnlySpan dependencies) - where T : unmanaged, IJobParallelFor + where T : struct, IJobParallelFor { - var pJobData = _freeList.Allocate(MemoryUtility.SizeOf(), MemoryUtility.AlignOf()); - if (pJobData == null) - { - return JobHandle.Invalid; - } - - *(T*)pJobData = job; + var id = JobDataPool.Allocate(in job, out var generation); var optimalBatchSize = Math.Max(1, batchSize); var totalBatches = (totalIteration + optimalBatchSize - 1) / optimalBatchSize; var jobInfo = new JobInfo { - pJobData = pJobData, + dataID = id, + dataGeneration = generation, + pExecutionFunc = &JobExecutor.ExecuteParallelFor, + pFreeFunc = &JobDataPool.Free, remainingBatches = totalBatches, threadIndex = threadIndex, - jobRanges = new JobRanges() + priority = priority, + jobRanges = new JobRanges { - currentIndex = 0, batchSize = optimalBatchSize, totalIteration = totalIteration, }, @@ -587,69 +582,81 @@ public sealed unsafe partial class JobScheduler : IDisposable /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// - /// The type of the job to execute. Must implement and be unmanaged. + /// The type of the job to execute. Must implement and be struct. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// A collection of representing the dependencies that must be completed before this job can begin. /// The priority of the job. - /// A that can be used to track the completion of the scheduled job. - /// Returns if the job data allocation fails. + /// A that can be used to track the completion of the scheduled job. Returns if the job data allocation fails. + [MethodImpl(MethodImplOptions.AggressiveInlining)] public JobHandle ScheduleParallelFor(ref readonly T job, int totalIteration, int batchSize, JobPriority priority = JobPriority.Normal, params ReadOnlySpan dependencies) - where T : unmanaged, IJobParallelFor + where T : struct, IJobParallelFor => ScheduleParallelFor(in job, totalIteration, batchSize, -1, priority, dependencies); /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// - /// The type of the job to execute. Must implement and be unmanaged. + /// The type of the job to execute. Must implement and be struct. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// A collection of representing the dependencies that must be completed before this job can begin. - /// A that can be used to track the completion of the scheduled job. - /// Returns if the job data allocation fails. + /// A that can be used to track the completion of the scheduled job. Returns if the job data allocation fails. + [MethodImpl(MethodImplOptions.AggressiveInlining)] public JobHandle ScheduleParallelFor(ref readonly T job, int totalIteration, int batchSize, params ReadOnlySpan dependencies) - where T : unmanaged, IJobParallelFor + where T : struct, IJobParallelFor => ScheduleParallelFor(in job, totalIteration, batchSize, -1, JobPriority.Normal, dependencies); /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// - /// The type of the job to execute. Must implement and be unmanaged. + /// The type of the job to execute. Must implement and be struct. + /// The job instance to be executed. The job data will be copied internally. + /// The total number of iterations to be processed by the job. + /// The number of iterations to include in each batch. + /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. + /// A collection of representing the dependencies that must be completed before this job can begin. + /// A that can be used to track the completion of the scheduled job. Returns if the job data allocation fails. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public JobHandle ScheduleParallelFor(ref readonly T job, int totalIteration, int batchSize, int threadIndex, params ReadOnlySpan dependencies) + where T : struct, IJobParallelFor + => ScheduleParallelFor(in job, totalIteration, batchSize, threadIndex, JobPriority.Normal, dependencies); + + /// + /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. + /// + /// The type of the job to execute. Must implement and be struct. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. /// A collection of representing the dependencies that must be completed before this job can begin. /// The priority of the job. - /// A that can be used to track the completion of the scheduled job. - /// Returns if the job data allocation fails. + /// A that can be used to track the completion of the scheduled job. Returns if the job data allocation fails. + [MethodImpl(MethodImplOptions.AggressiveInlining)] public JobHandle ScheduleParallel(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobPriority priority = JobPriority.Normal, params ReadOnlySpan dependencies) - where T : unmanaged, IJobParallel + where T : struct, IJobParallel { - var pJobData = _freeList.Allocate(MemoryUtility.SizeOf(), MemoryUtility.AlignOf()); - if (pJobData == null) - { - return JobHandle.Invalid; - } - - *(T*)pJobData = job; + var id = JobDataPool.Allocate(in job, out var generation); var optimalBatchSize = Math.Max(1, batchSize); var totalBatches = (totalIteration + optimalBatchSize - 1) / optimalBatchSize; var jobInfo = new JobInfo { - pJobData = pJobData, + dataID = id, + dataGeneration = generation, + pExecutionFunc = &JobExecutor.ExecuteParallel, + pFreeFunc = &JobDataPool.Free, remainingBatches = totalBatches, threadIndex = threadIndex, - jobRanges = new JobRanges() + priority = priority, + jobRanges = new JobRanges { - currentIndex = 0, batchSize = optimalBatchSize, totalIteration = totalIteration, }, @@ -661,44 +668,72 @@ public sealed unsafe partial class JobScheduler : IDisposable /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// - /// The type of the job to execute. Must implement and be unmanaged. + /// The type of the job to execute. Must implement and be struct. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// A collection of representing the dependencies that must be completed before this job can begin. /// The priority of the job. - /// A that can be used to track the completion of the scheduled job. - /// Returns if the job data allocation fails. + /// A that can be used to track the completion of the scheduled job. Returns if the job data allocation fails. + [MethodImpl(MethodImplOptions.AggressiveInlining)] public JobHandle ScheduleParallel(ref readonly T job, int totalIteration, int batchSize, JobPriority priority = JobPriority.Normal, params ReadOnlySpan dependencies) - where T : unmanaged, IJobParallel + where T : struct, IJobParallel => ScheduleParallel(in job, totalIteration, batchSize, -1, priority, dependencies); /// /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// - /// The type of the job to execute. Must implement and be unmanaged. + /// The type of the job to execute. Must implement and be struct. /// The job instance to be executed. The job data will be copied internally. /// The total number of iterations to be processed by the job. /// The number of iterations to include in each batch. /// A collection of representing the dependencies that must be completed before this job can begin. - /// A that can be used to track the completion of the scheduled job. - /// Returns if the job data allocation fails. + /// A that can be used to track the completion of the scheduled job. Returns if the job data allocation fails. + [MethodImpl(MethodImplOptions.AggressiveInlining)] public JobHandle ScheduleParallel(ref readonly T job, int totalIteration, int batchSize, params ReadOnlySpan dependencies) - where T : unmanaged, IJobParallel + where T : struct, IJobParallel => ScheduleParallel(in job, totalIteration, batchSize, -1, JobPriority.Normal, dependencies); + /// + /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. + /// + /// The type of the job to execute. Must implement and be struct. + /// The job instance to be executed. The job data will be copied internally. + /// The total number of iterations to be processed by the job. + /// The number of iterations to include in each batch. + /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. + /// A collection of representing the dependencies that must be completed before this job can begin. + /// A that can be used to track the completion of the scheduled job. Returns if the job data allocation fails. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public JobHandle ScheduleParallel(ref readonly T job, int totalIteration, int batchSize, int threadIndex, params ReadOnlySpan dependencies) + where T : struct, IJobParallel + => ScheduleParallel(in job, totalIteration, batchSize, threadIndex, JobPriority.Normal, dependencies); + + /// + /// Schedules a custom job for execution with user-defined . + /// + /// The information about the job to be scheduled. + /// The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job. + /// A collection of representing the dependencies that must be completed before this job can begin. + /// A that can be used to track the completion of the scheduled job. Returns if the job data allocation fails. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public JobHandle ScheduleCustom(ref JobInfo jobInfo, int threadIndex, params ReadOnlySpan dependencies) + { + return CreateJobHandle(ref jobInfo, threadIndex, dependencies); + } + /// /// Combines multiple job dependencies into a single . /// /// A collection of instances representing the dependencies to combine. - /// A that represents the combined dependencies. The returned handle can be used to ensure - /// that all specified dependencies are completed before proceeding. + /// A that represents the combined dependencies. The returned handle can be used to ensure that all specified dependencies are completed before proceeding. + [MethodImpl(MethodImplOptions.AggressiveInlining)] public JobHandle CombineDependencies(params ReadOnlySpan dependencies) { var jobInfo = new JobInfo { - pJobData = null, pExecutionFunc = null, + pFreeFunc = null, remainingBatches = 1, threadIndex = -1, @@ -713,8 +748,8 @@ public sealed unsafe partial class JobScheduler : IDisposable /// Retrieves the current status of a job identified by the specified handle. /// /// The handle representing the job whose status is to be retrieved. The handle must be valid. - /// The current status of the job as a value. - /// Returns if the handle is invalid or the job does not exist. + /// The current status of the job as a value. Returns if the handle is invalid or the job does not exist. + [MethodImpl(MethodImplOptions.AggressiveInlining)] public JobState GetJobStatus(JobHandle handle) { if (!handle.IsValid) @@ -905,15 +940,6 @@ public sealed unsafe partial class JobScheduler : IDisposable worker.Dispose(); } - foreach (var info in _jobInfoPool) - { - if (info.pJobData != null) - { - _freeList.Free(info.pJobData); - } - } - - _freeList.Dispose(); _workSignal.Dispose(); _cts.Dispose(); diff --git a/Misaki.HighPerformance.Jobs/Misaki.HighPerformance.Jobs.csproj b/Misaki.HighPerformance.Jobs/Misaki.HighPerformance.Jobs.csproj index 971ffea..caafbae 100644 --- a/Misaki.HighPerformance.Jobs/Misaki.HighPerformance.Jobs.csproj +++ b/Misaki.HighPerformance.Jobs/Misaki.HighPerformance.Jobs.csproj @@ -6,7 +6,7 @@ enable True True - 2.0.0 + 3.0.0 $(AssemblyVersion) Misaki https://git.personalnas.com/Misaki/Misaki.HighPerformance.git diff --git a/Misaki.HighPerformance.Jobs/WorkerThread.cs b/Misaki.HighPerformance.Jobs/WorkerThread.cs index fdd5e83..06963fe 100644 --- a/Misaki.HighPerformance.Jobs/WorkerThread.cs +++ b/Misaki.HighPerformance.Jobs/WorkerThread.cs @@ -110,6 +110,7 @@ internal class WorkerThread : IDisposable spin.SpinOnce(-1); } + // If we didn't find a job after spinning, wait for a signal if (!found) { try @@ -130,12 +131,58 @@ internal class WorkerThread : IDisposable ref var jobInfo = ref _scheduler.GetJobInfoReference(handle, out var exist); if (exist) { - var priorState = Interlocked.CompareExchange(ref jobInfo.state, JobUtility.JOBSTATE_RUNNING, JobUtility.JOBSTATE_SCHEDULED); - if (priorState != JobUtility.JOBSTATE_SCHEDULED && priorState != JobUtility.JOBSTATE_RUNNING) + // Try to acquire a reference count for the job. This ensures that the job won't be removed while we're processing it. + // This is critical that if thread A reads the job, but suddenly os scheduler delay the thread for just a moment, and thread B completes the job and removes it from the system, when thread A resumes, it might be accessing invalid memory. + // By acquiring a reference count, we ensure that even if the job is completed while we're processing it, it won't be removed until we're done. + + var rcSpin = new SpinWait(); + var rcAcquired = false; + + while (true) + { + _scheduler.GetJobInfoReference(handle, out var currentExist); + if (!currentExist) + { + break; + } + + var stateVal = Volatile.Read(ref jobInfo.state); + var state = JobUtility.GetState(stateVal); + + if (state == JobState.Completed || state == JobState.Invalid) + { + break; + } + + var newState = stateVal + JobUtility.RC_ONE; + if (state == JobState.Scheduled) + { + newState = (newState & ~JobUtility.STATE_MASK) | JobUtility.JOBSTATE_RUNNING; + } + + // Attempt to acquire a reference count by incrementing the state value. If the state has changed since we read it, we need to retry. + if (Interlocked.CompareExchange(ref jobInfo.state, newState, stateVal) == stateVal) + { + _scheduler.GetJobInfoReference(handle, out currentExist); + if (!currentExist) + { + JobUtility.ReleaseRC(ref jobInfo.state); + break; + } + + rcAcquired = true; + break; + } + + rcSpin.SpinOnce(-1); + } + + if (!rcAcquired) { continue; } + var isLastBatch = true; if (jobInfo.pExecutionFunc != null) { var ctx = new JobExecutionContext @@ -146,14 +193,15 @@ internal class WorkerThread : IDisposable SelfHandle = handle, }; - if (!jobInfo.pExecutionFunc(jobInfo.pJobData, ref jobInfo.jobRanges, ref jobInfo.remainingBatches, in ctx)) - { - // If the job returns false, it means it we are not the last worker to process this job, so we should not mark it as complete yet. - continue; - } + isLastBatch = jobInfo.pExecutionFunc(jobInfo.dataID, jobInfo.dataGeneration, ref jobInfo.jobRanges, ref jobInfo.remainingBatches, in ctx); } - _scheduler.MarkJobComplete(handle, _index); + JobUtility.ReleaseRC(ref jobInfo.state); + + if (isLastBatch) + { + _scheduler.MarkJobComplete(handle, _index); + } } } } diff --git a/Misaki.HighPerformance.LowLevel/Buffer/FreeList.cs b/Misaki.HighPerformance.LowLevel/Buffer/FreeList.cs index 5a712bf..87ee0bf 100644 --- a/Misaki.HighPerformance.LowLevel/Buffer/FreeList.cs +++ b/Misaki.HighPerformance.LowLevel/Buffer/FreeList.cs @@ -249,6 +249,11 @@ public unsafe struct FreeList : IMemoryAllocatorremoteFreeHead) == 0) + { + return; + } + var head = (FreeNode*)Interlocked.Exchange(ref cache->remoteFreeHead, 0); while (head != null) { diff --git a/Misaki.HighPerformance.Mathematics.SPMD/IJobSPMD.cs b/Misaki.HighPerformance.Mathematics.SPMD/IJobSPMD.cs index 3540677..07c598a 100644 --- a/Misaki.HighPerformance.Mathematics.SPMD/IJobSPMD.cs +++ b/Misaki.HighPerformance.Mathematics.SPMD/IJobSPMD.cs @@ -85,7 +85,7 @@ public static class IJobParallelForSPMDExtensions } } - public static JobHandle ScheduleParallelSPDM(this JobScheduler jobScheduler, ref T job, int totalCount, int batchSize, int threadIndex, JobHandle dependency) + public static JobHandle ScheduleParallelSPDM(this JobScheduler jobScheduler, ref T job, int totalCount, int batchSize, int threadIndex, params ReadOnlySpan dependencies) where T : unmanaged, IJobSPMD where TNumber : unmanaged, INumber, IBinaryNumber, IMinMaxValue, IBitwiseOperators { @@ -98,7 +98,7 @@ public static class IJobParallelForSPMDExtensions }; var iterations = (totalCount + WideLane.LaneWidth - 1) / WideLane.LaneWidth; - return jobScheduler.ScheduleParallelFor(ref warper, iterations, batchSize, threadIndex, dependency); + return jobScheduler.ScheduleParallelFor(ref warper, iterations, batchSize, threadIndex, dependencies); } else { @@ -108,7 +108,7 @@ public static class IJobParallelForSPMDExtensions totalCount = totalCount, }; - return jobScheduler.ScheduleParallelFor(ref warper, totalCount, batchSize, threadIndex, dependency); + return jobScheduler.ScheduleParallelFor(ref warper, totalCount, batchSize, threadIndex, dependencies); } } } diff --git a/Misaki.HighPerformance.Test/Benchmark/ParallelNoiseBenchmark.cs b/Misaki.HighPerformance.Test/Benchmark/ParallelNoiseBenchmark.cs index 0820c85..7b6ba71 100644 --- a/Misaki.HighPerformance.Test/Benchmark/ParallelNoiseBenchmark.cs +++ b/Misaki.HighPerformance.Test/Benchmark/ParallelNoiseBenchmark.cs @@ -10,8 +10,8 @@ namespace Misaki.HighPerformance.Test.Benchmark; [MemoryDiagnoser] public class ParallelNoiseBenchmark { - private const int _WIDTH = 2048; - private const int _HEIGHT = 2048; + private const int _WIDTH = 256; + private const int _HEIGHT = 256; private const int _LENGTH = _WIDTH * _HEIGHT; internal JobScheduler _jobScheduler = null!; @@ -20,6 +20,8 @@ public class ParallelNoiseBenchmark [GlobalSetup] public void Setup() { + AllocationManager.Initialize(AllocationManagerDesc.Default); + _jobScheduler = new JobScheduler(Environment.ProcessorCount); _buffers = new UnsafeArray(_LENGTH, AllocationHandle.Persistent); } @@ -29,9 +31,11 @@ public class ParallelNoiseBenchmark { _jobScheduler.Dispose(); _buffers.Dispose(); + + AllocationManager.Dispose(); } - [Benchmark] + [Benchmark(Baseline = true)] public unsafe void JobSystem() { var job = new NoiseJobVector() @@ -41,7 +45,7 @@ public class ParallelNoiseBenchmark height = _HEIGHT }; - var handle = _jobScheduler.ScheduleParallel(ref job, _LENGTH, 64, -1, JobHandle.Invalid); + var handle = _jobScheduler.ScheduleParallel(ref job, _LENGTH, 64, -1); _jobScheduler.Wait(handle); } @@ -57,7 +61,7 @@ public class ParallelNoiseBenchmark }); } - [Benchmark(Baseline = true)] + [Benchmark] public void For() { for (var i = 0; i < _LENGTH; i++) diff --git a/Misaki.HighPerformance.Test/Benchmark/SPMDBenchmark.cs b/Misaki.HighPerformance.Test/Benchmark/SPMDBenchmark.cs index 72a245a..644a8f2 100644 --- a/Misaki.HighPerformance.Test/Benchmark/SPMDBenchmark.cs +++ b/Misaki.HighPerformance.Test/Benchmark/SPMDBenchmark.cs @@ -36,7 +36,7 @@ public unsafe class SPMDBenchmark height = _SIZE, }; - var handle = _scheduler.ScheduleParallelSPDM(ref job, _SIZE * _SIZE, 64, -1, JobHandle.Invalid); + var handle = _scheduler.ScheduleParallelSPDM(ref job, _SIZE * _SIZE, 64, -1); _scheduler.Wait(handle); } @@ -50,7 +50,7 @@ public unsafe class SPMDBenchmark height = _SIZE, }; - var handle = _scheduler.ScheduleParallelFor(ref job, _SIZE * _SIZE, 64, -1, JobHandle.Invalid); + var handle = _scheduler.ScheduleParallelFor(ref job, _SIZE * _SIZE, 64, -1); _scheduler.Wait(handle); } @@ -64,7 +64,7 @@ public unsafe class SPMDBenchmark height = _SIZE, }; - var handle = _scheduler.ScheduleParallel(ref job, _SIZE * _SIZE, 64, -1, JobHandle.Invalid); + var handle = _scheduler.ScheduleParallel(ref job, _SIZE * _SIZE, 64, -1); _scheduler.Wait(handle); } diff --git a/Misaki.HighPerformance.Test/Program.cs b/Misaki.HighPerformance.Test/Program.cs index b7e693a..35c5a13 100644 --- a/Misaki.HighPerformance.Test/Program.cs +++ b/Misaki.HighPerformance.Test/Program.cs @@ -6,7 +6,35 @@ using Misaki.HighPerformance.Test.Benchmark; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; -BenchmarkRunner.Run(); +BenchmarkRunner.Run(); + +//var bench = new ParallelNoiseBenchmark(); +//bench.Setup(); + +//for (int i = 0; i < 4096 * 5; i++) +//{ +// bench.JobSystem(); +//} + +//bench.Cleanup(); + +//bench.Setup(); + +//for (int i = 0; i < 4096 * 5; i++) +//{ +// bench.JobSystem(); +//} + +//bench.Cleanup(); + +//bench.Setup(); + +//for (int i = 0; i < 4096 * 5; i++) +//{ +// bench.JobSystem(); +//} + +//bench.Cleanup(); //AllocationManager.Initialize(AllocationManagerInitOpts.Default); //var set = new UnsafeBitSet(100, AllocationHandle.Persistent, AllocationOption.Clear); diff --git a/Misaki.HighPerformance.Test/UnitTest/Jobs/TestJobSystem.cs b/Misaki.HighPerformance.Test/UnitTest/Jobs/TestJobSystem.cs index 7115b02..95f119f 100644 --- a/Misaki.HighPerformance.Test/UnitTest/Jobs/TestJobSystem.cs +++ b/Misaki.HighPerformance.Test/UnitTest/Jobs/TestJobSystem.cs @@ -126,8 +126,8 @@ public class TestJobSystem result = result }; - var combinedHandle = s_jobScheduler.CombineDependencies(handle1, handle2); - var handle3 = s_jobScheduler.Schedule(ref job3, combinedHandle); + //var combinedHandle = s_jobScheduler.CombineDependencies(handle1, handle2); + var handle3 = s_jobScheduler.Schedule(ref job3, handle1, handle2); s_jobScheduler.Wait(handle3); diff --git a/Misaki.HighPerformance/Collections/ConcurrentSlotMap.cs b/Misaki.HighPerformance/Collections/ConcurrentSlotMap.cs index 6cd1b63..2c18c45 100644 --- a/Misaki.HighPerformance/Collections/ConcurrentSlotMap.cs +++ b/Misaki.HighPerformance/Collections/ConcurrentSlotMap.cs @@ -158,7 +158,7 @@ public class ConcurrentSlotMap : IEnumerable } } - public int Add(T item, out int generation) + public int Add(scoped in T item, out int generation) { while (true) {