Refactor job system to use type-safe pooled job data

Replaces unsafe pointer-based job data with JobDataPool<T> and ConcurrentSlotMap<T> for safer, type-safe management. JobInfo now references job data by (dataID, dataGeneration). JobExecutor and JobScheduler updated to use the new pool-based approach, requiring T : struct. Removed FreeList and pointer logic. WorkerThread now uses reference counting to prevent use-after-free. Updated all scheduling APIs and benchmarks to match new signatures. Improved documentation and inlining. Bumped assembly version to 3.0.0 due to breaking changes.
This commit is contained in:
2026-04-21 17:22:20 +09:00
parent f4bbef0be3
commit b7d61488bb
14 changed files with 324 additions and 154 deletions

View File

@@ -1,5 +0,0 @@
global using unsafe JobExecutionFunc = delegate*<void*, ref Misaki.HighPerformance.Jobs.JobRanges, ref int, ref readonly Misaki.HighPerformance.Jobs.JobExecutionContext, bool>;
#if MHP_SUPPORT_MANAGED_JOB
global using unsafe ManagedJobExecutionFunc = delegate*<object, ref Misaki.HighPerformance.Jobs.JobRanges, ref int, ref readonly Misaki.HighPerformance.Jobs.JobExecutionContext, bool>;
#endif

View File

@@ -0,0 +1,52 @@
using Misaki.HighPerformance.Collections;
using System.Runtime.CompilerServices;
namespace Misaki.HighPerformance.Jobs;
/// <summary>
/// This class manages pools of job data for different types. It allows allocating, retrieving, and freeing job data instances using unique IDs and generations to ensure safe access and reuse of resources.
/// </summary>
public static class JobDataPool<T>
where T : struct
{
private static readonly ConcurrentSlotMap<T> s_slots = new ConcurrentSlotMap<T>(8);
/// <summary>
/// Allocates a new instance of type T in the pool and returns its ID and generation.
/// </summary>
/// <typeparam name="T">The type of the data to allocate.</typeparam>
/// <param name="data">The data to allocate.</param>
/// <param name="generation">The generation of the allocated data.</param>
/// <returns>The ID of the allocated data.</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int Allocate(ref readonly T data, out int generation)
{
return s_slots.Add(data, out generation);
}
/// <summary>
/// Gets a reference to the data of type T associated with the given ID and generation. The 'exists' output parameter indicates whether the data exists in the pool.
/// </summary>
/// <typeparam name="T">The type of the data to retrieve.</typeparam>
/// <param name="id">The ID of the data to retrieve.</param>
/// <param name="generation">The generation of the data to retrieve.</param>
/// <param name="exists">A value indicating whether the data exists in the pool.</param>
/// <returns>A reference to the requested data. Undefined if 'exists' is false.</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ref T GetReference(int id, int generation, out bool exists)
{
return ref s_slots.GetElementReferenceAt(id, generation, out exists);
}
/// <summary>
/// Frees the data of type T associated with the given ID and generation, making it available for future allocations. After calling this method, the ID and generation can be reused for new data.
/// </summary>
/// <typeparam name="T">The type of the data to free.</typeparam>
/// <param name="id">The ID of the data to free.</param>
/// <param name="generation">The generation of the data to free.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Free(int id, int generation)
{
s_slots.Remove(id, generation);
}
}

View File

@@ -1,16 +1,22 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
namespace Misaki.HighPerformance.Jobs; namespace Misaki.HighPerformance.Jobs;
internal static unsafe class JobExecutor internal static class JobExecutor
{ {
public static bool Execute<T>(void* pJobData, ref JobRanges jobRanges, ref int remainingBatches, ref readonly JobExecutionContext ctx) public static bool Execute<T>(int dataID, int dataGeneration, ref JobRanges jobRanges, ref int remainingBatches, ref readonly JobExecutionContext ctx)
where T : unmanaged, IJob where T : struct, IJob
{ {
var pJob = (T*)pJobData; ref var job = ref JobDataPool<T>.GetReference(dataID, dataGeneration, out var exists);
pJob->Execute(in ctx); Debug.Assert(exists, "Job data not found in the pool.");
job.Execute(in ctx);
return Interlocked.Decrement(ref remainingBatches) == 0; return Interlocked.Decrement(ref remainingBatches) == 0;
} }
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool GetWorkerStealingRange(ref JobRanges jobRanges, out int start, out int end) private static bool GetWorkerStealingRange(ref JobRanges jobRanges, out int start, out int end)
{ {
start = Interlocked.Add(ref jobRanges.currentIndex, jobRanges.batchSize) - jobRanges.batchSize; start = Interlocked.Add(ref jobRanges.currentIndex, jobRanges.batchSize) - jobRanges.batchSize;
@@ -25,10 +31,12 @@ internal static unsafe class JobExecutor
return true; return true;
} }
public static bool ExecuteParallelFor<T>(void* pJobData, ref JobRanges jobRanges, ref int remainingBatches, ref readonly JobExecutionContext ctx) public static bool ExecuteParallelFor<T>(int dataID, int dataGeneration, ref JobRanges jobRanges, ref int remainingBatches, ref readonly JobExecutionContext ctx)
where T : unmanaged, IJobParallelFor where T : struct, IJobParallelFor
{ {
var pJob = (T*)pJobData; ref var job = ref JobDataPool<T>.GetReference(dataID, dataGeneration, out var exists);
Debug.Assert(exists, "Job data not found in the pool.");
var wasTheLastBatch = false; var wasTheLastBatch = false;
while (true) while (true)
@@ -40,7 +48,7 @@ internal static unsafe class JobExecutor
for (var i = start; i < end; i++) for (var i = start; i < end; i++)
{ {
pJob->Execute(i, in ctx); job.Execute(i, in ctx);
} }
if (Interlocked.Decrement(ref remainingBatches) == 0) if (Interlocked.Decrement(ref remainingBatches) == 0)
@@ -52,12 +60,13 @@ internal static unsafe class JobExecutor
return wasTheLastBatch; return wasTheLastBatch;
} }
public static bool ExecuteParallel<T>(void* pJobData, ref JobRanges jobRanges, ref int remainingBatches, ref readonly JobExecutionContext ctx) public static bool ExecuteParallel<T>(int dataID, int dataGeneration, ref JobRanges jobRanges, ref int remainingBatches, ref readonly JobExecutionContext ctx)
where T : unmanaged, IJobParallel where T : struct, IJobParallel
{ {
var pJob = (T*)pJobData; ref var job = ref JobDataPool<T>.GetReference(dataID, dataGeneration, out var exists);
var wasTheLastBatch = false; Debug.Assert(exists, "Job data not found in the pool.");
var wasTheLastBatch = false;
while (true) while (true)
{ {
if (!GetWorkerStealingRange(ref jobRanges, out var start, out var end)) if (!GetWorkerStealingRange(ref jobRanges, out var start, out var end))
@@ -65,7 +74,7 @@ internal static unsafe class JobExecutor
break; break;
} }
pJob->Execute(start, end, in ctx); job.Execute(start, end, in ctx);
if (Interlocked.Decrement(ref remainingBatches) == 0) if (Interlocked.Decrement(ref remainingBatches) == 0)
{ {
wasTheLastBatch = true; wasTheLastBatch = true;

View File

@@ -38,7 +38,7 @@ public enum JobPriority
Low = 2 Low = 2
} }
internal unsafe struct JobInfo public unsafe struct JobInfo
{ {
public ref struct DependentIterator public ref struct DependentIterator
{ {
@@ -63,13 +63,13 @@ internal unsafe struct JobInfo
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
get get
{ {
if (_index < MAX_DEPENDENTS) if (_index < MAX_LOCAL_DEPENDENTS)
{ {
return new JobHandle(_jobInfo.dependentsID[_index], _jobInfo.dependentsGeneration[_index]); return new JobHandle(_jobInfo.dependentsID[_index], _jobInfo.dependentsGeneration[_index]);
} }
else else
{ {
return _jobInfo.additionalDependents[_index - MAX_DEPENDENTS]; return _jobInfo.additionalDependents[_index - MAX_LOCAL_DEPENDENTS];
} }
} }
} }
@@ -81,14 +81,17 @@ internal unsafe struct JobInfo
} }
} }
public const int MAX_DEPENDENTS = 8; public const int MAX_LOCAL_DEPENDENTS = 8;
public void* pJobData; public delegate*<int, int, ref JobRanges, ref int, ref readonly JobExecutionContext, bool> pExecutionFunc;
public JobExecutionFunc pExecutionFunc; public delegate*<int, int, void> pFreeFunc;
public int dataID;
public int dataGeneration;
// The list of jobs that are waiting for THIS job to complete. // The list of jobs that are waiting for THIS job to complete.
public fixed int dependentsID[MAX_DEPENDENTS]; // The actual list of IDs public fixed int dependentsID[MAX_LOCAL_DEPENDENTS]; // The actual list of IDs
public fixed int dependentsGeneration[MAX_DEPENDENTS]; // The actual list of generations public fixed int dependentsGeneration[MAX_LOCAL_DEPENDENTS]; // The actual list of generations
public UnsafeList<JobHandle> additionalDependents; public UnsafeList<JobHandle> additionalDependents;
public int dependentCount; public int dependentCount;
@@ -110,7 +113,7 @@ internal unsafe struct JobInfo
} }
} }
internal struct JobRanges public struct JobRanges
{ {
public int batchSize; public int batchSize;
public int totalIteration; public int totalIteration;

View File

@@ -1,14 +1,13 @@
using Misaki.HighPerformance.Collections; using Misaki.HighPerformance.Collections;
using Misaki.HighPerformance.LowLevel.Buffer; using Misaki.HighPerformance.LowLevel.Buffer;
using Misaki.HighPerformance.LowLevel.Collections; using Misaki.HighPerformance.LowLevel.Collections;
using Misaki.HighPerformance.LowLevel.Utilities;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Diagnostics; using System.Diagnostics;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
namespace Misaki.HighPerformance.Jobs; namespace Misaki.HighPerformance.Jobs;
public struct JobSchedulerDesc() public struct JobSchedulerDesc
{ {
/// <summary> /// <summary>
/// Gets or sets the number of worker threads to be created and managed by the job scheduler. If set to less than 1, at least one worker thread will be created. /// Gets or sets the number of worker threads to be created and managed by the job scheduler. If set to less than 1, at least one worker thread will be created.
@@ -21,10 +20,10 @@ public struct JobSchedulerDesc()
/// <summary> /// <summary>
/// Gets or sets the priority of the worker threads. This can be used to influence the scheduling of the threads by the operating system. The default value is <see cref="ThreadPriority.Normal"/>. /// Gets or sets the priority of the worker threads. This can be used to influence the scheduling of the threads by the operating system. The default value is <see cref="ThreadPriority.Normal"/>.
/// </summary> /// </summary>
public ThreadPriority ThreadPriority public required ThreadPriority ThreadPriority
{ {
get; set; get; set;
} = ThreadPriority.Normal; }
/// <summary> /// <summary>
/// Gets or sets the state object for the job scheduler. This can be used to store any user-defined data or context that may be needed by the jobs or worker threads. The job scheduler does not interpret or manage this state in any way; it is simply provided as a convenience for users of the job scheduler. The default value is null. /// Gets or sets the state object for the job scheduler. This can be used to store any user-defined data or context that may be needed by the jobs or worker threads. The job scheduler does not interpret or manage this state in any way; it is simply provided as a convenience for users of the job scheduler. The default value is null.
@@ -118,8 +117,6 @@ public sealed unsafe partial class JobScheduler : IDisposable
// Don't sleep indefinitely because that causes our 1ms job to become 15ms. // Don't sleep indefinitely because that causes our 1ms job to become 15ms.
private const int _SLEEP_THRESHOLD = -1; private const int _SLEEP_THRESHOLD = -1;
private FreeList _freeList;
private readonly ConcurrentSlotMap<JobInfo> _jobInfoPool; private readonly ConcurrentSlotMap<JobInfo> _jobInfoPool;
private readonly ConcurrentQueue<JobHandle>[] _jobQueues; private readonly ConcurrentQueue<JobHandle>[] _jobQueues;
private readonly WorkerThread[] _workerThreads; private readonly WorkerThread[] _workerThreads;
@@ -147,8 +144,6 @@ public sealed unsafe partial class JobScheduler : IDisposable
{ {
var workerCount = Math.Max(1, desc.ThreadCount); var workerCount = Math.Max(1, desc.ThreadCount);
_freeList = new FreeList(MemoryUtility.AlignOf<IntPtr>(), maxConcurrencyLevel: workerCount);
_jobInfoPool = new ConcurrentSlotMap<JobInfo>(128); _jobInfoPool = new ConcurrentSlotMap<JobInfo>(128);
_jobQueues = new ConcurrentQueue<JobHandle>[3]; _jobQueues = new ConcurrentQueue<JobHandle>[3];
@@ -181,14 +176,11 @@ public sealed unsafe partial class JobScheduler : IDisposable
/// <param name="threadCount">The number of worker threads to create. If less than 1, at least one thread will be created.</param> /// <param name="threadCount">The number of worker threads to create. If less than 1, at least one thread will be created.</param>
/// <param name="priority">The priority of the worker threads.</param> /// <param name="priority">The priority of the worker threads.</param>
/// <param name="state">The state object for the job scheduler.</param> /// <param name="state">The state object for the job scheduler.</param>
/// <param name="allowManagedJobs">A value indicating whether managed jobs are allowed.</param>
[Obsolete("Use JobScheduler(JobSchedulerDesc) instead.")] [Obsolete("Use JobScheduler(JobSchedulerDesc) instead.")]
public JobScheduler(int threadCount, ThreadPriority priority = ThreadPriority.Normal, object? state = null, bool allowManagedJobs = false) public JobScheduler(int threadCount, ThreadPriority priority = ThreadPriority.Normal, object? state = null)
{ {
var workerCount = Math.Max(1, threadCount); var workerCount = Math.Max(1, threadCount);
_freeList = new FreeList(MemoryUtility.AlignOf<IntPtr>(), maxConcurrencyLevel: workerCount);
_jobInfoPool = new ConcurrentSlotMap<JobInfo>(128); _jobInfoPool = new ConcurrentSlotMap<JobInfo>(128);
_jobQueues = new ConcurrentQueue<JobHandle>[3]; _jobQueues = new ConcurrentQueue<JobHandle>[3];
@@ -263,17 +255,8 @@ public sealed unsafe partial class JobScheduler : IDisposable
private JobHandle CreateJobHandle(ref JobInfo jobInfo, int threadIndex, params ReadOnlySpan<JobHandle> dependencies) private JobHandle CreateJobHandle(ref JobInfo jobInfo, int threadIndex, params ReadOnlySpan<JobHandle> dependencies)
{ {
var validDepCount = 0;
for (var i = 0; i < dependencies.Length; i++)
{
if (dependencies[i].IsValid)
{
validDepCount++;
}
}
// Advance count to account for all dependencies upfront + 1 guard lock // Advance count to account for all dependencies upfront + 1 guard lock
Interlocked.Increment(ref jobInfo.dependencyCount); Interlocked.Add(ref jobInfo.dependencyCount, dependencies.Length + 1);
var id = _jobInfoPool.Add(jobInfo, out var generation); var id = _jobInfoPool.Add(jobInfo, out var generation);
ref var infoInPool = ref _jobInfoPool.GetElementReferenceAt(id, generation, out _); ref var infoInPool = ref _jobInfoPool.GetElementReferenceAt(id, generation, out _);
@@ -316,7 +299,7 @@ public sealed unsafe partial class JobScheduler : IDisposable
{ {
// RC acquired. We are safe from "Remove" and state change. // RC acquired. We are safe from "Remove" and state change.
var count = Interlocked.Increment(ref depJobInfo.dependentCount); var count = Interlocked.Increment(ref depJobInfo.dependentCount);
if (count <= JobInfo.MAX_DEPENDENTS) if (count <= JobInfo.MAX_LOCAL_DEPENDENTS)
{ {
// Safely write to the fixed buffer // Safely write to the fixed buffer
depJobInfo.dependentsID[count - 1] = id; depJobInfo.dependentsID[count - 1] = id;
@@ -475,39 +458,41 @@ public sealed unsafe partial class JobScheduler : IDisposable
info.additionalDependents.Dispose(); info.additionalDependents.Dispose();
_freeList.Free(info.pJobData); if (info.pFreeFunc != null)
{
info.pFreeFunc(info.dataID, info.dataGeneration);
}
_jobInfoPool.Remove(handle.ID, handle.Generation); _jobInfoPool.Remove(handle.ID, handle.Generation);
} }
/// <summary> /// <summary>
/// Schedules a single job for execution on a specified thread, with an optional dependency on another job. /// Schedules a single job for execution on a specified thread, with an optional dependency on another job.
/// </summary> /// </summary>
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJob"/> and be unmanaged.</typeparam> /// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJob"/> and be struct.</typeparam>
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param> /// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param> /// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param> /// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param>
/// <param name="priority">The priority of the job.</param> /// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. /// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobHandle Schedule<T>(ref readonly T job, int threadIndex, JobPriority priority = JobPriority.Normal, params ReadOnlySpan<JobHandle> dependencies) public JobHandle Schedule<T>(ref readonly T job, int threadIndex, JobPriority priority = JobPriority.Normal, params ReadOnlySpan<JobHandle> dependencies)
where T : unmanaged, IJob where T : struct, IJob
{ {
var pJobData = _freeList.Allocate(MemoryUtility.SizeOf<T>(), MemoryUtility.AlignOf<T>()); var id = JobDataPool<T>.Allocate(in job, out var generation);
if (pJobData == null)
{
return JobHandle.Invalid;
}
*(T*)pJobData = job;
var jobInfo = new JobInfo var jobInfo = new JobInfo
{ {
pJobData = pJobData, dataID = id,
dataGeneration = generation,
pExecutionFunc = &JobExecutor.Execute<T>, pExecutionFunc = &JobExecutor.Execute<T>,
pFreeFunc = &JobDataPool<T>.Free,
remainingBatches = 1, remainingBatches = 1,
threadIndex = threadIndex, threadIndex = threadIndex,
priority = priority,
jobRanges = JobRanges.Single, jobRanges = JobRanges.Single,
}; };
@@ -517,65 +502,75 @@ public sealed unsafe partial class JobScheduler : IDisposable
/// <summary> /// <summary>
/// Schedules a single job for execution on a specified thread, with an optional dependency on another job. /// Schedules a single job for execution on a specified thread, with an optional dependency on another job.
/// </summary> /// </summary>
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJob"/> and be unmanaged.</typeparam> /// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJob"/> and be struct.</typeparam>
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param> /// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param> /// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param>
/// <param name="priority">The priority of the job.</param> /// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. /// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobHandle Schedule<T>(ref readonly T job, JobPriority priority = JobPriority.Normal, params ReadOnlySpan<JobHandle> dependencies) public JobHandle Schedule<T>(ref readonly T job, JobPriority priority = JobPriority.Normal, params ReadOnlySpan<JobHandle> dependencies)
where T : unmanaged, IJob where T : struct, IJob
=> Schedule(in job, -1, priority, dependencies); => Schedule(in job, -1, priority, dependencies);
/// <summary> /// <summary>
/// Schedules a single job for execution on a specified thread, with an optional dependency on another job. /// Schedules a single job for execution on a specified thread, with an optional dependency on another job.
/// </summary> /// </summary>
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJob"/> and be unmanaged.</typeparam> /// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJob"/> and be struct.</typeparam>
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param> /// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param> /// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. /// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobHandle Schedule<T>(ref readonly T job, params ReadOnlySpan<JobHandle> dependencies) public JobHandle Schedule<T>(ref readonly T job, params ReadOnlySpan<JobHandle> dependencies)
where T : unmanaged, IJob where T : struct, IJob
=> Schedule(in job, -1, JobPriority.Normal, dependencies); => Schedule(in job, -1, JobPriority.Normal, dependencies);
/// <summary>
/// Schedules a single job for execution on a specified thread, with an optional dependency on another job.
/// </summary>
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJob"/> and be struct.</typeparam>
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobHandle Schedule<T>(ref readonly T job, int threadIndex, params ReadOnlySpan<JobHandle> dependencies)
where T : struct, IJob
=> Schedule(in job, threadIndex, JobPriority.Normal, dependencies);
/// <summary> /// <summary>
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads.
/// </summary> /// </summary>
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be unmanaged.</typeparam> /// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be struct.</typeparam>
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param> /// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param> /// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
/// <param name="batchSize">The number of iterations to include in each batch.</param> /// <param name="batchSize">The number of iterations to include in each batch.</param>
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param> /// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param> /// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param>
/// <param name="priority">The priority of the job.</param> /// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. /// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobPriority priority = JobPriority.Normal, params ReadOnlySpan<JobHandle> dependencies) public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobPriority priority = JobPriority.Normal, params ReadOnlySpan<JobHandle> dependencies)
where T : unmanaged, IJobParallelFor where T : struct, IJobParallelFor
{ {
var pJobData = _freeList.Allocate(MemoryUtility.SizeOf<T>(), MemoryUtility.AlignOf<T>()); var id = JobDataPool<T>.Allocate(in job, out var generation);
if (pJobData == null)
{
return JobHandle.Invalid;
}
*(T*)pJobData = job;
var optimalBatchSize = Math.Max(1, batchSize); var optimalBatchSize = Math.Max(1, batchSize);
var totalBatches = (totalIteration + optimalBatchSize - 1) / optimalBatchSize; var totalBatches = (totalIteration + optimalBatchSize - 1) / optimalBatchSize;
var jobInfo = new JobInfo var jobInfo = new JobInfo
{ {
pJobData = pJobData, dataID = id,
dataGeneration = generation,
pExecutionFunc = &JobExecutor.ExecuteParallelFor<T>, pExecutionFunc = &JobExecutor.ExecuteParallelFor<T>,
pFreeFunc = &JobDataPool<T>.Free,
remainingBatches = totalBatches, remainingBatches = totalBatches,
threadIndex = threadIndex, threadIndex = threadIndex,
jobRanges = new JobRanges() priority = priority,
jobRanges = new JobRanges
{ {
currentIndex = 0,
batchSize = optimalBatchSize, batchSize = optimalBatchSize,
totalIteration = totalIteration, totalIteration = totalIteration,
}, },
@@ -587,69 +582,81 @@ public sealed unsafe partial class JobScheduler : IDisposable
/// <summary> /// <summary>
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads.
/// </summary> /// </summary>
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be unmanaged.</typeparam> /// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be struct.</typeparam>
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param> /// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param> /// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
/// <param name="batchSize">The number of iterations to include in each batch.</param> /// <param name="batchSize">The number of iterations to include in each batch.</param>
/// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param> /// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param>
/// <param name="priority">The priority of the job.</param> /// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. /// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, JobPriority priority = JobPriority.Normal, params ReadOnlySpan<JobHandle> dependencies) public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, JobPriority priority = JobPriority.Normal, params ReadOnlySpan<JobHandle> dependencies)
where T : unmanaged, IJobParallelFor where T : struct, IJobParallelFor
=> ScheduleParallelFor(in job, totalIteration, batchSize, -1, priority, dependencies); => ScheduleParallelFor(in job, totalIteration, batchSize, -1, priority, dependencies);
/// <summary> /// <summary>
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads.
/// </summary> /// </summary>
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be unmanaged.</typeparam> /// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be struct.</typeparam>
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param> /// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param> /// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
/// <param name="batchSize">The number of iterations to include in each batch.</param> /// <param name="batchSize">The number of iterations to include in each batch.</param>
/// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param> /// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. /// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, params ReadOnlySpan<JobHandle> dependencies) public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, params ReadOnlySpan<JobHandle> dependencies)
where T : unmanaged, IJobParallelFor where T : struct, IJobParallelFor
=> ScheduleParallelFor(in job, totalIteration, batchSize, -1, JobPriority.Normal, dependencies); => ScheduleParallelFor(in job, totalIteration, batchSize, -1, JobPriority.Normal, dependencies);
/// <summary> /// <summary>
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads.
/// </summary> /// </summary>
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be unmanaged.</typeparam> /// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be struct.</typeparam>
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
/// <param name="batchSize">The number of iterations to include in each batch.</param>
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, params ReadOnlySpan<JobHandle> dependencies)
where T : struct, IJobParallelFor
=> ScheduleParallelFor(in job, totalIteration, batchSize, threadIndex, JobPriority.Normal, dependencies);
/// <summary>
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads.
/// </summary>
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be struct.</typeparam>
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param> /// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param> /// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
/// <param name="batchSize">The number of iterations to include in each batch.</param> /// <param name="batchSize">The number of iterations to include in each batch.</param>
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param> /// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param> /// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param>
/// <param name="priority">The priority of the job.</param> /// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. /// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobPriority priority = JobPriority.Normal, params ReadOnlySpan<JobHandle> dependencies) public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobPriority priority = JobPriority.Normal, params ReadOnlySpan<JobHandle> dependencies)
where T : unmanaged, IJobParallel where T : struct, IJobParallel
{ {
var pJobData = _freeList.Allocate(MemoryUtility.SizeOf<T>(), MemoryUtility.AlignOf<T>()); var id = JobDataPool<T>.Allocate(in job, out var generation);
if (pJobData == null)
{
return JobHandle.Invalid;
}
*(T*)pJobData = job;
var optimalBatchSize = Math.Max(1, batchSize); var optimalBatchSize = Math.Max(1, batchSize);
var totalBatches = (totalIteration + optimalBatchSize - 1) / optimalBatchSize; var totalBatches = (totalIteration + optimalBatchSize - 1) / optimalBatchSize;
var jobInfo = new JobInfo var jobInfo = new JobInfo
{ {
pJobData = pJobData, dataID = id,
dataGeneration = generation,
pExecutionFunc = &JobExecutor.ExecuteParallel<T>, pExecutionFunc = &JobExecutor.ExecuteParallel<T>,
pFreeFunc = &JobDataPool<T>.Free,
remainingBatches = totalBatches, remainingBatches = totalBatches,
threadIndex = threadIndex, threadIndex = threadIndex,
jobRanges = new JobRanges() priority = priority,
jobRanges = new JobRanges
{ {
currentIndex = 0,
batchSize = optimalBatchSize, batchSize = optimalBatchSize,
totalIteration = totalIteration, totalIteration = totalIteration,
}, },
@@ -661,44 +668,72 @@ public sealed unsafe partial class JobScheduler : IDisposable
/// <summary> /// <summary>
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads.
/// </summary> /// </summary>
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be unmanaged.</typeparam> /// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be struct.</typeparam>
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param> /// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param> /// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
/// <param name="batchSize">The number of iterations to include in each batch.</param> /// <param name="batchSize">The number of iterations to include in each batch.</param>
/// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param> /// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param>
/// <param name="priority">The priority of the job.</param> /// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. /// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, JobPriority priority = JobPriority.Normal, params ReadOnlySpan<JobHandle> dependencies) public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, JobPriority priority = JobPriority.Normal, params ReadOnlySpan<JobHandle> dependencies)
where T : unmanaged, IJobParallel where T : struct, IJobParallel
=> ScheduleParallel(in job, totalIteration, batchSize, -1, priority, dependencies); => ScheduleParallel(in job, totalIteration, batchSize, -1, priority, dependencies);
/// <summary> /// <summary>
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads. /// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads.
/// </summary> /// </summary>
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be unmanaged.</typeparam> /// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be struct.</typeparam>
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param> /// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param> /// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
/// <param name="batchSize">The number of iterations to include in each batch.</param> /// <param name="batchSize">The number of iterations to include in each batch.</param>
/// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param> /// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. /// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, params ReadOnlySpan<JobHandle> dependencies) public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, params ReadOnlySpan<JobHandle> dependencies)
where T : unmanaged, IJobParallel where T : struct, IJobParallel
=> ScheduleParallel(in job, totalIteration, batchSize, -1, JobPriority.Normal, dependencies); => ScheduleParallel(in job, totalIteration, batchSize, -1, JobPriority.Normal, dependencies);
/// <summary>
/// Schedules a parallel job for execution, dividing the workload into batches and distributing it across threads.
/// </summary>
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJobParallelFor"/> and be struct.</typeparam>
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
/// <param name="batchSize">The number of iterations to include in each batch.</param>
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, params ReadOnlySpan<JobHandle> dependencies)
where T : struct, IJobParallel
=> ScheduleParallel(in job, totalIteration, batchSize, threadIndex, JobPriority.Normal, dependencies);
/// <summary>
/// Schedules a custom job for execution with user-defined <see cref="JobInfo"/>.
/// </summary>
/// <param name="jobInfo">The information about the job to be scheduled.</param>
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="dependencies">A collection of <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job. Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobHandle ScheduleCustom(ref JobInfo jobInfo, int threadIndex, params ReadOnlySpan<JobHandle> dependencies)
{
return CreateJobHandle(ref jobInfo, threadIndex, dependencies);
}
/// <summary> /// <summary>
/// Combines multiple job dependencies into a single <see cref="JobHandle"/>. /// Combines multiple job dependencies into a single <see cref="JobHandle"/>.
/// </summary> /// </summary>
/// <param name="dependencies">A collection of <see cref="JobHandle"/> instances representing the dependencies to combine.</param> /// <param name="dependencies">A collection of <see cref="JobHandle"/> instances representing the dependencies to combine.</param>
/// <returns>A <see cref="JobHandle"/> that represents the combined dependencies. The returned handle can be used to ensure /// <returns>A <see cref="JobHandle"/> that represents the combined dependencies. The returned handle can be used to ensure that all specified dependencies are completed before proceeding.</returns>
/// that all specified dependencies are completed before proceeding.</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobHandle CombineDependencies(params ReadOnlySpan<JobHandle> dependencies) public JobHandle CombineDependencies(params ReadOnlySpan<JobHandle> dependencies)
{ {
var jobInfo = new JobInfo var jobInfo = new JobInfo
{ {
pJobData = null,
pExecutionFunc = null, pExecutionFunc = null,
pFreeFunc = null,
remainingBatches = 1, remainingBatches = 1,
threadIndex = -1, threadIndex = -1,
@@ -713,8 +748,8 @@ public sealed unsafe partial class JobScheduler : IDisposable
/// Retrieves the current status of a job identified by the specified handle. /// Retrieves the current status of a job identified by the specified handle.
/// </summary> /// </summary>
/// <param name="handle">The handle representing the job whose status is to be retrieved. The handle must be valid.</param> /// <param name="handle">The handle representing the job whose status is to be retrieved. The handle must be valid.</param>
/// <returns>The current status of the job as a <see cref="JobState"/> value. /// <returns>The current status of the job as a <see cref="JobState"/> value. Returns <see cref="JobState.Invalid"/> if the handle is invalid or the job does not exist.</returns>
/// Returns <see cref="JobState.Invalid"/> if the handle is invalid or the job does not exist.</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)]
public JobState GetJobStatus(JobHandle handle) public JobState GetJobStatus(JobHandle handle)
{ {
if (!handle.IsValid) if (!handle.IsValid)
@@ -905,15 +940,6 @@ public sealed unsafe partial class JobScheduler : IDisposable
worker.Dispose(); worker.Dispose();
} }
foreach (var info in _jobInfoPool)
{
if (info.pJobData != null)
{
_freeList.Free(info.pJobData);
}
}
_freeList.Dispose();
_workSignal.Dispose(); _workSignal.Dispose();
_cts.Dispose(); _cts.Dispose();

View File

@@ -6,7 +6,7 @@
<Nullable>enable</Nullable> <Nullable>enable</Nullable>
<AllowUnsafeBlocks>True</AllowUnsafeBlocks> <AllowUnsafeBlocks>True</AllowUnsafeBlocks>
<GeneratePackageOnBuild>True</GeneratePackageOnBuild> <GeneratePackageOnBuild>True</GeneratePackageOnBuild>
<AssemblyVersion>2.0.0</AssemblyVersion> <AssemblyVersion>3.0.0</AssemblyVersion>
<Version>$(AssemblyVersion)</Version> <Version>$(AssemblyVersion)</Version>
<Authors>Misaki</Authors> <Authors>Misaki</Authors>
<PackageProjectUrl>https://git.personalnas.com/Misaki/Misaki.HighPerformance.git</PackageProjectUrl> <PackageProjectUrl>https://git.personalnas.com/Misaki/Misaki.HighPerformance.git</PackageProjectUrl>

View File

@@ -110,6 +110,7 @@ internal class WorkerThread : IDisposable
spin.SpinOnce(-1); spin.SpinOnce(-1);
} }
// If we didn't find a job after spinning, wait for a signal
if (!found) if (!found)
{ {
try try
@@ -130,12 +131,58 @@ internal class WorkerThread : IDisposable
ref var jobInfo = ref _scheduler.GetJobInfoReference(handle, out var exist); ref var jobInfo = ref _scheduler.GetJobInfoReference(handle, out var exist);
if (exist) if (exist)
{ {
var priorState = Interlocked.CompareExchange(ref jobInfo.state, JobUtility.JOBSTATE_RUNNING, JobUtility.JOBSTATE_SCHEDULED); // Try to acquire a reference count for the job. This ensures that the job won't be removed while we're processing it.
if (priorState != JobUtility.JOBSTATE_SCHEDULED && priorState != JobUtility.JOBSTATE_RUNNING) // This is critical that if thread A reads the job, but suddenly os scheduler delay the thread for just a moment, and thread B completes the job and removes it from the system, when thread A resumes, it might be accessing invalid memory.
// By acquiring a reference count, we ensure that even if the job is completed while we're processing it, it won't be removed until we're done.
var rcSpin = new SpinWait();
var rcAcquired = false;
while (true)
{
_scheduler.GetJobInfoReference(handle, out var currentExist);
if (!currentExist)
{
break;
}
var stateVal = Volatile.Read(ref jobInfo.state);
var state = JobUtility.GetState(stateVal);
if (state == JobState.Completed || state == JobState.Invalid)
{
break;
}
var newState = stateVal + JobUtility.RC_ONE;
if (state == JobState.Scheduled)
{
newState = (newState & ~JobUtility.STATE_MASK) | JobUtility.JOBSTATE_RUNNING;
}
// Attempt to acquire a reference count by incrementing the state value. If the state has changed since we read it, we need to retry.
if (Interlocked.CompareExchange(ref jobInfo.state, newState, stateVal) == stateVal)
{
_scheduler.GetJobInfoReference(handle, out currentExist);
if (!currentExist)
{
JobUtility.ReleaseRC(ref jobInfo.state);
break;
}
rcAcquired = true;
break;
}
rcSpin.SpinOnce(-1);
}
if (!rcAcquired)
{ {
continue; continue;
} }
var isLastBatch = true;
if (jobInfo.pExecutionFunc != null) if (jobInfo.pExecutionFunc != null)
{ {
var ctx = new JobExecutionContext var ctx = new JobExecutionContext
@@ -146,14 +193,15 @@ internal class WorkerThread : IDisposable
SelfHandle = handle, SelfHandle = handle,
}; };
if (!jobInfo.pExecutionFunc(jobInfo.pJobData, ref jobInfo.jobRanges, ref jobInfo.remainingBatches, in ctx)) isLastBatch = jobInfo.pExecutionFunc(jobInfo.dataID, jobInfo.dataGeneration, ref jobInfo.jobRanges, ref jobInfo.remainingBatches, in ctx);
{
// If the job returns false, it means it we are not the last worker to process this job, so we should not mark it as complete yet.
continue;
}
} }
_scheduler.MarkJobComplete(handle, _index); JobUtility.ReleaseRC(ref jobInfo.state);
if (isLastBatch)
{
_scheduler.MarkJobComplete(handle, _index);
}
} }
} }
} }

View File

@@ -249,6 +249,11 @@ public unsafe struct FreeList : IMemoryAllocator<FreeList, FreeList.CreationOpti
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
private readonly void DrainRemoteFrees(ThreadCache* cache) private readonly void DrainRemoteFrees(ThreadCache* cache)
{ {
if (Volatile.Read(ref cache->remoteFreeHead) == 0)
{
return;
}
var head = (FreeNode*)Interlocked.Exchange(ref cache->remoteFreeHead, 0); var head = (FreeNode*)Interlocked.Exchange(ref cache->remoteFreeHead, 0);
while (head != null) while (head != null)
{ {

View File

@@ -85,7 +85,7 @@ public static class IJobParallelForSPMDExtensions
} }
} }
public static JobHandle ScheduleParallelSPDM<T, TNumber>(this JobScheduler jobScheduler, ref T job, int totalCount, int batchSize, int threadIndex, JobHandle dependency) public static JobHandle ScheduleParallelSPDM<T, TNumber>(this JobScheduler jobScheduler, ref T job, int totalCount, int batchSize, int threadIndex, params ReadOnlySpan<JobHandle> dependencies)
where T : unmanaged, IJobSPMD<TNumber> where T : unmanaged, IJobSPMD<TNumber>
where TNumber : unmanaged, INumber<TNumber>, IBinaryNumber<TNumber>, IMinMaxValue<TNumber>, IBitwiseOperators<TNumber, TNumber, TNumber> where TNumber : unmanaged, INumber<TNumber>, IBinaryNumber<TNumber>, IMinMaxValue<TNumber>, IBitwiseOperators<TNumber, TNumber, TNumber>
{ {
@@ -98,7 +98,7 @@ public static class IJobParallelForSPMDExtensions
}; };
var iterations = (totalCount + WideLane<TNumber>.LaneWidth - 1) / WideLane<TNumber>.LaneWidth; var iterations = (totalCount + WideLane<TNumber>.LaneWidth - 1) / WideLane<TNumber>.LaneWidth;
return jobScheduler.ScheduleParallelFor(ref warper, iterations, batchSize, threadIndex, dependency); return jobScheduler.ScheduleParallelFor(ref warper, iterations, batchSize, threadIndex, dependencies);
} }
else else
{ {
@@ -108,7 +108,7 @@ public static class IJobParallelForSPMDExtensions
totalCount = totalCount, totalCount = totalCount,
}; };
return jobScheduler.ScheduleParallelFor(ref warper, totalCount, batchSize, threadIndex, dependency); return jobScheduler.ScheduleParallelFor(ref warper, totalCount, batchSize, threadIndex, dependencies);
} }
} }
} }

View File

@@ -10,8 +10,8 @@ namespace Misaki.HighPerformance.Test.Benchmark;
[MemoryDiagnoser] [MemoryDiagnoser]
public class ParallelNoiseBenchmark public class ParallelNoiseBenchmark
{ {
private const int _WIDTH = 2048; private const int _WIDTH = 256;
private const int _HEIGHT = 2048; private const int _HEIGHT = 256;
private const int _LENGTH = _WIDTH * _HEIGHT; private const int _LENGTH = _WIDTH * _HEIGHT;
internal JobScheduler _jobScheduler = null!; internal JobScheduler _jobScheduler = null!;
@@ -20,6 +20,8 @@ public class ParallelNoiseBenchmark
[GlobalSetup] [GlobalSetup]
public void Setup() public void Setup()
{ {
AllocationManager.Initialize(AllocationManagerDesc.Default);
_jobScheduler = new JobScheduler(Environment.ProcessorCount); _jobScheduler = new JobScheduler(Environment.ProcessorCount);
_buffers = new UnsafeArray<float>(_LENGTH, AllocationHandle.Persistent); _buffers = new UnsafeArray<float>(_LENGTH, AllocationHandle.Persistent);
} }
@@ -29,9 +31,11 @@ public class ParallelNoiseBenchmark
{ {
_jobScheduler.Dispose(); _jobScheduler.Dispose();
_buffers.Dispose(); _buffers.Dispose();
AllocationManager.Dispose();
} }
[Benchmark] [Benchmark(Baseline = true)]
public unsafe void JobSystem() public unsafe void JobSystem()
{ {
var job = new NoiseJobVector() var job = new NoiseJobVector()
@@ -41,7 +45,7 @@ public class ParallelNoiseBenchmark
height = _HEIGHT height = _HEIGHT
}; };
var handle = _jobScheduler.ScheduleParallel(ref job, _LENGTH, 64, -1, JobHandle.Invalid); var handle = _jobScheduler.ScheduleParallel(ref job, _LENGTH, 64, -1);
_jobScheduler.Wait(handle); _jobScheduler.Wait(handle);
} }
@@ -57,7 +61,7 @@ public class ParallelNoiseBenchmark
}); });
} }
[Benchmark(Baseline = true)] [Benchmark]
public void For() public void For()
{ {
for (var i = 0; i < _LENGTH; i++) for (var i = 0; i < _LENGTH; i++)

View File

@@ -36,7 +36,7 @@ public unsafe class SPMDBenchmark
height = _SIZE, height = _SIZE,
}; };
var handle = _scheduler.ScheduleParallelSPDM<Jobs.NoiseJobMathSPMD, float>(ref job, _SIZE * _SIZE, 64, -1, JobHandle.Invalid); var handle = _scheduler.ScheduleParallelSPDM<Jobs.NoiseJobMathSPMD, float>(ref job, _SIZE * _SIZE, 64, -1);
_scheduler.Wait(handle); _scheduler.Wait(handle);
} }
@@ -50,7 +50,7 @@ public unsafe class SPMDBenchmark
height = _SIZE, height = _SIZE,
}; };
var handle = _scheduler.ScheduleParallelFor(ref job, _SIZE * _SIZE, 64, -1, JobHandle.Invalid); var handle = _scheduler.ScheduleParallelFor(ref job, _SIZE * _SIZE, 64, -1);
_scheduler.Wait(handle); _scheduler.Wait(handle);
} }
@@ -64,7 +64,7 @@ public unsafe class SPMDBenchmark
height = _SIZE, height = _SIZE,
}; };
var handle = _scheduler.ScheduleParallel(ref job, _SIZE * _SIZE, 64, -1, JobHandle.Invalid); var handle = _scheduler.ScheduleParallel(ref job, _SIZE * _SIZE, 64, -1);
_scheduler.Wait(handle); _scheduler.Wait(handle);
} }

View File

@@ -6,7 +6,35 @@ using Misaki.HighPerformance.Test.Benchmark;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
BenchmarkRunner.Run<ConcurrentSlotMapBenchmark>(); BenchmarkRunner.Run<ParallelNoiseBenchmark>();
//var bench = new ParallelNoiseBenchmark();
//bench.Setup();
//for (int i = 0; i < 4096 * 5; i++)
//{
// bench.JobSystem();
//}
//bench.Cleanup();
//bench.Setup();
//for (int i = 0; i < 4096 * 5; i++)
//{
// bench.JobSystem();
//}
//bench.Cleanup();
//bench.Setup();
//for (int i = 0; i < 4096 * 5; i++)
//{
// bench.JobSystem();
//}
//bench.Cleanup();
//AllocationManager.Initialize(AllocationManagerInitOpts.Default); //AllocationManager.Initialize(AllocationManagerInitOpts.Default);
//var set = new UnsafeBitSet(100, AllocationHandle.Persistent, AllocationOption.Clear); //var set = new UnsafeBitSet(100, AllocationHandle.Persistent, AllocationOption.Clear);

View File

@@ -126,8 +126,8 @@ public class TestJobSystem
result = result result = result
}; };
var combinedHandle = s_jobScheduler.CombineDependencies(handle1, handle2); //var combinedHandle = s_jobScheduler.CombineDependencies(handle1, handle2);
var handle3 = s_jobScheduler.Schedule(ref job3, combinedHandle); var handle3 = s_jobScheduler.Schedule(ref job3, handle1, handle2);
s_jobScheduler.Wait(handle3); s_jobScheduler.Wait(handle3);

View File

@@ -158,7 +158,7 @@ public class ConcurrentSlotMap<T> : IEnumerable<T>
} }
} }
public int Add(T item, out int generation) public int Add(scoped in T item, out int generation)
{ {
while (true) while (true)
{ {