Job system priorities, async waits, parallel map/queue

Major refactor:
- Add job priority tiers and async wait APIs to IJobScheduler
- Implement priority-based job queues and scheduling logic
- Introduce UnsafeParallelHashMap and refactor UnsafeParallelQueue
- Refactor UnsafeSlotMap to chunked storage for scalability
- Update SlotMap/ConcurrentSlotMap for consistency and perf
- Add new benchmarks and unit tests for parallel collections
- Misc: add MemoryUtility.AlignUp, version bumps, test improvements, bug fixes
This commit is contained in:
2026-04-18 11:26:08 +09:00
parent d5616daa05
commit 13802ca6c8
22 changed files with 1459 additions and 267 deletions

View File

@@ -1,5 +1,80 @@
namespace Misaki.HighPerformance.Jobs;
internal sealed class WaitItem : IThreadPoolWorkItem
{
private readonly IJobScheduler _scheduler;
private readonly JobHandle _jobHandle;
private readonly TaskCompletionSource _completionSource;
public Task Task => _completionSource.Task;
public WaitItem(IJobScheduler scheduler, JobHandle jobHandle, CancellationToken cancellationToken)
{
_scheduler = scheduler;
_jobHandle = jobHandle;
_completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
cancellationToken.Register((cs, tk) => ((TaskCompletionSource)cs!).TrySetCanceled(tk), _completionSource);
}
public void Execute()
{
_scheduler.Wait(_jobHandle);
_completionSource.SetResult();
}
}
internal sealed class WaitAllItem : IThreadPoolWorkItem
{
private readonly IJobScheduler _scheduler;
private readonly Memory<JobHandle> _jobHandles;
private readonly TaskCompletionSource _completionSource;
public Task Task => _completionSource.Task;
public WaitAllItem(IJobScheduler scheduler, Memory<JobHandle> jobHandles, CancellationToken cancellationToken)
{
_scheduler = scheduler;
_jobHandles = jobHandles;
_completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
cancellationToken.Register((cs, tk) => ((TaskCompletionSource)cs!).TrySetCanceled(tk), _completionSource);
}
public void Execute()
{
_scheduler.WaitAll(_jobHandles.Span);
_completionSource.SetResult();
}
}
internal sealed class WaitAnyItem : IThreadPoolWorkItem
{
private readonly IJobScheduler _scheduler;
private readonly ReadOnlyMemory<JobHandle> _jobHandles;
private readonly TaskCompletionSource<JobHandle> _completionSource;
public Task<JobHandle> Task => _completionSource.Task;
public WaitAnyItem(IJobScheduler scheduler, ReadOnlyMemory<JobHandle> jobHandles, CancellationToken cancellationToken)
{
_scheduler = scheduler;
_jobHandles = jobHandles;
_completionSource = new TaskCompletionSource<JobHandle>(TaskCreationOptions.RunContinuationsAsynchronously);
cancellationToken.Register((cs, tk) => ((TaskCompletionSource)cs!).TrySetCanceled(tk), _completionSource);
}
public void Execute()
{
var completedHandle = _scheduler.WaitAny(_jobHandles.Span);
_completionSource.SetResult(completedHandle);
}
}
public interface IJobScheduler
{
/// <summary>
@@ -18,9 +93,10 @@ public interface IJobScheduler
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="dependency">A <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.
/// Use <see cref="JobHandle.Invalid"/> if there are no dependencies.</param>
/// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
JobHandle Schedule<T>(ref readonly T job, int threadIndex, JobHandle dependency)
JobHandle Schedule<T>(ref readonly T job, int threadIndex, JobHandle dependency, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJob;
/// <summary>
@@ -31,7 +107,7 @@ public interface IJobScheduler
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
JobHandle Schedule<T>(ref readonly T job, int threadIndex)
JobHandle Schedule<T>(ref readonly T job, int threadIndex, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJob;
/// <summary>
@@ -39,9 +115,10 @@ public interface IJobScheduler
/// </summary>
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJob"/> and be unmanaged.</typeparam>
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
JobHandle Schedule<T>(ref readonly T job, JobHandle dependency)
JobHandle Schedule<T>(ref readonly T job, JobHandle dependency, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJob;
/// <summary>
@@ -50,9 +127,10 @@ public interface IJobScheduler
/// <typeparam name="T">The type of the job to execute. Must implement <see cref="IJob"/> and be unmanaged.</typeparam>
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
JobHandle Schedule<T>(ref readonly T job)
JobHandle Schedule<T>(ref readonly T job, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJob;
/// <summary>
@@ -65,9 +143,10 @@ public interface IJobScheduler
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="dependency">A <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.
/// Use <see cref="JobHandle.Invalid"/> if there are no dependencies.</param>
/// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency)
JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallelFor;
/// <summary>
@@ -78,9 +157,10 @@ public interface IJobScheduler
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
/// <param name="batchSize">The number of iterations to include in each batch.</param>
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex)
JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallelFor;
/// <summary>
@@ -90,10 +170,11 @@ public interface IJobScheduler
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
/// <param name="batchSize">The number of iterations to include in each batch.</param>
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="dependency">The job that this job depends on.</param>
/// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, JobHandle dependency)
JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, JobHandle dependency, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallelFor;
/// <summary>
@@ -103,10 +184,10 @@ public interface IJobScheduler
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
/// <param name="batchSize">The number of iterations to include in each batch.</param>
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize)
JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallelFor;
/// <summary>
@@ -119,9 +200,10 @@ public interface IJobScheduler
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="dependency">A <see cref="JobHandle"/> representing the dependencies that must be completed before this job can begin.
/// Use <see cref="JobHandle.Invalid"/> if there are no dependencies.</param>
/// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency)
JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallel;
/// <summary>
@@ -132,9 +214,10 @@ public interface IJobScheduler
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
/// <param name="batchSize">The number of iterations to include in each batch.</param>
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex)
JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallel;
/// <summary>
@@ -144,10 +227,11 @@ public interface IJobScheduler
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
/// <param name="batchSize">The number of iterations to include in each batch.</param>
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="dependency">The job that this job depends on.</param>
/// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, JobHandle dependency)
JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, JobHandle dependency, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallel;
/// <summary>
@@ -157,10 +241,10 @@ public interface IJobScheduler
/// <param name="job">The job instance to be executed. The job data will be copied internally.</param>
/// <param name="totalIteration">The total number of iterations to be processed by the job.</param>
/// <param name="batchSize">The number of iterations to include in each batch.</param>
/// <param name="threadIndex">The index of the thread that is preferred to execute the job. This is used to assign thread-specific data. Use -1 to allow any thread to execute the job.</param>
/// <param name="priority">The priority of the job.</param>
/// <returns>A <see cref="JobHandle"/> that can be used to track the completion of the scheduled job.
/// Returns <see cref="JobHandle.Invalid"/> if the job data allocation fails.</returns>
JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize)
JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallel;
/// <summary>
@@ -200,4 +284,31 @@ public interface IJobScheduler
/// <param name="handles">A read-only span containing the job handles to monitor for completion.</param>
/// <returns>The first job handle from the provided collection that has completed.</returns>
JobHandle WaitAny(params ReadOnlySpan<JobHandle> handles);
/// <summary>
/// Waits asynchronously until the specified job is completed, allowing the calling thread to perform other work while waiting.
/// </summary>
/// <param name="handle">The handle of the job to wait for.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the wait operation.</param>
/// <returns>A task that represents the asynchronous wait operation.</returns>
Task WaitAsync(JobHandle handle, CancellationToken cancellationToken = default);
/// <summary>
/// Waits asynchronously until all specified job handles have completed, allowing the calling thread to perform other work while waiting.
/// </summary>
/// <remarks>
/// The collection handles will be reordered in-place to move completed handles to the front.
/// </remarks>
/// <param name="handles">A read-only memory containing the job handles to monitor for completion.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the wait operation.</param>
/// <returns>A task that represents the asynchronous wait operation.</returns>
Task WaitAllAsync(Memory<JobHandle> handles, CancellationToken cancellationToken = default);
/// <summary>
/// Waits asynchronously until any of the specified job handles has completed, allowing the calling thread to perform other work while waiting, and returns the first completed handle.
/// </summary>
/// <param name="handles">A read-only memory containing the job handles to monitor for completion.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the wait operation.</param>
/// <returns>A task that represents the asynchronous wait operation.</returns>
Task<JobHandle> WaitAnyAsync(ReadOnlyMemory<JobHandle> handles, CancellationToken cancellationToken = default);
}

View File

@@ -1,8 +1,6 @@
using Misaki.HighPerformance.LowLevel.Buffer;
using Misaki.HighPerformance.LowLevel.Collections;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
namespace Misaki.HighPerformance.Jobs;
@@ -33,10 +31,11 @@ public enum JobState
Completed = 3
}
internal enum HeapType
public enum JobPriority
{
Native,
Managed,
High = 0,
Normal = 1,
Low = 2
}
internal unsafe struct JobInfo
@@ -58,7 +57,7 @@ internal unsafe struct JobInfo
_index++;
return _index < _jobInfo.dependentCount;
}
public JobHandle Current
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -95,6 +94,7 @@ internal unsafe struct JobInfo
public int dependentCount;
public JobRanges jobRanges;
public JobPriority priority;
public int state;
public int remainingBatches;

View File

@@ -8,6 +8,24 @@ using System.Runtime.CompilerServices;
namespace Misaki.HighPerformance.Jobs;
public struct JobSchedulerDesc
{
public int ThreadCount
{
get; set;
}
public ThreadPriority ThreadPriority
{
get; set;
}
public object? State
{
get; set;
}
}
/// <summary>
/// Provides a mechanism for scheduling and executing jobs across multiple worker threads.
/// </summary>
@@ -19,7 +37,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
private FreeList _freeList;
private readonly ConcurrentSlotMap<JobInfo> _jobInfoPool;
private readonly ConcurrentQueue<JobHandle> _jobQueue;
private readonly ConcurrentQueue<JobHandle>[] _jobQueues;
private readonly WorkerThread[] _workerThreads;
private readonly SemaphoreSlim _workSignal;
@@ -29,25 +47,68 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
private bool _disposed = false;
internal bool IsCancellationRequested => _cts.IsCancellationRequested;
internal object? State => _state;
internal bool IsCancellationRequested => _cts.IsCancellationRequested;
public int WorkerCount => _workerThreads.Length;
/// <summary>
/// Initializes a new instance of the <see cref="JobScheduler"/> class with the specified description.
/// </summary>
/// <param name="desc">The description for the job scheduler.</param>
public JobScheduler(ref readonly JobSchedulerDesc desc)
{
var workerCount = Math.Max(1, desc.ThreadCount);
_freeList = new FreeList(MemoryUtility.AlignOf<IntPtr>(), maxConcurrencyLevel: workerCount);
_jobInfoPool = new ConcurrentSlotMap<JobInfo>(128);
_jobQueues = new ConcurrentQueue<JobHandle>[3];
for (var i = 0; i < 3; i++)
{
_jobQueues[i] = new ConcurrentQueue<JobHandle>();
}
_workSignal = new SemaphoreSlim(0);
_cts = new CancellationTokenSource();
_state = desc.State;
_workerThreads = new WorkerThread[workerCount];
for (var i = 0; i < workerCount; i++)
{
_workerThreads[i] = new WorkerThread(i, this, desc.ThreadPriority);
}
foreach (var worker in _workerThreads)
{
worker.Start();
}
}
/// <summary>
/// Initializes a new instance of the <see cref="JobScheduler"/> class with the specified number of worker threads.
/// </summary>
/// <param name="threadCount">The number of worker threads to create. If less than 1, at least one thread will be created.</param>
/// <param name="priority">The priority of the worker threads.</param>
/// <param name="state">The state object for the job scheduler.</param>
public JobScheduler(int threadCount, ThreadPriority priority = ThreadPriority.Normal, object? state = null)
/// <param name="allowManagedJobs">A value indicating whether managed jobs are allowed.</param>
[Obsolete("Use JobScheduler(JobSchedulerDesc) instead.")]
public JobScheduler(int threadCount, ThreadPriority priority = ThreadPriority.Normal, object? state = null, bool allowManagedJobs = false)
{
var workerCount = Math.Max(1, threadCount);
_freeList = new FreeList(MemoryUtility.AlignOf<IntPtr>(), maxConcurrencyLevel: threadCount);
_freeList = new FreeList(MemoryUtility.AlignOf<IntPtr>(), maxConcurrencyLevel: workerCount);
_jobInfoPool = new ConcurrentSlotMap<JobInfo>(128);
_jobQueue = new ConcurrentQueue<JobHandle>();
_jobQueues = new ConcurrentQueue<JobHandle>[3];
for (var i = 0; i < 3; i++)
{
_jobQueues[i] = new ConcurrentQueue<JobHandle>();
}
_workSignal = new SemaphoreSlim(0);
_cts = new CancellationTokenSource();
@@ -72,7 +133,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
Dispose();
}
private void EnqueueJobIfReady(JobHandle handle)
private void EnqueueJobIfReady(JobHandle handle, int threadIndex)
{
ref var jobInfo = ref _jobInfoPool.GetElementReferenceAt(handle.ID, handle.Generation, out var exist);
@@ -84,14 +145,20 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
return;
}
var tier = (int)jobInfo.priority;
ConcurrentQueue<JobHandle> jobQueue;
if (jobInfo.threadIndex >= 0 && jobInfo.threadIndex < _workerThreads.Length)
{
jobQueue = _workerThreads[jobInfo.threadIndex].LocalQueue;
jobQueue = _workerThreads[jobInfo.threadIndex].LocalQueues[tier];
}
else if (threadIndex >= 0 && threadIndex < _workerThreads.Length)
{
// Put into the local thread queue if the scheduling thread is a worker thread. This can improve cache locality and reduce contention on the main queue.
jobQueue = _workerThreads[threadIndex].LocalQueues[tier];
}
else
{
jobQueue = _jobQueue;
jobQueue = _jobQueues[tier];
}
// Ensure the count of this job handle won't exceed the number of worker threads.
@@ -107,7 +174,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
}
}
private JobHandle CreateJobHandle(ref JobInfo jobInfo, params ReadOnlySpan<JobHandle> dependencies)
private JobHandle CreateJobHandle(ref JobInfo jobInfo, int threadIndex, params ReadOnlySpan<JobHandle> dependencies)
{
var validDepCount = 0;
for (var i = 0; i < dependencies.Length; i++)
@@ -199,7 +266,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
// Lower the initial 1 guard lock; Enqueue if met
if (Interlocked.Decrement(ref infoInPool.dependencyCount) == 0)
{
EnqueueJobIfReady(handle);
EnqueueJobIfReady(handle, threadIndex);
}
return handle;
@@ -208,16 +275,22 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal bool HasWork()
{
if (!_jobQueue.IsEmpty)
for (var i = 0; i < _jobQueues.Length; i++)
{
return true;
if (!_jobQueues[i].IsEmpty)
{
return true;
}
}
for (var i = 0; i < _workerThreads.Length; i++)
{
if (!_workerThreads[i].LocalQueue.IsEmpty)
for (var j = 0; j < _workerThreads[i].LocalQueues.Length; j++)
{
return true;
if (!_workerThreads[i].LocalQueues[j].IsEmpty)
{
return true;
}
}
}
@@ -231,15 +304,15 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal bool TryStealFromMain(int threadIndex, out JobHandle outHandle)
internal bool TryStealFromMain(int tier, out JobHandle outHandle)
{
return _jobQueue.TryDequeue(out outHandle);
return _jobQueues[tier].TryDequeue(out outHandle);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal bool TryStealFromWorker(int threadIndex, out JobHandle outHandle)
internal bool TryStealFromWorker(int threadIndex, int tier, out JobHandle outHandle)
{
return _workerThreads[threadIndex].LocalQueue.TryDequeue(out outHandle);
return _workerThreads[threadIndex].LocalQueues[tier].TryDequeue(out outHandle);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -254,7 +327,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
return ref _jobInfoPool.GetElementReferenceAt(handle.ID, handle.Generation, out exist);
}
internal void MarkJobComplete(JobHandle handle)
internal void MarkJobComplete(JobHandle handle, int threadIndex)
{
Debug.Assert(handle.IsValid);
@@ -309,7 +382,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
ref var depJobInfo = ref _jobInfoPool.GetElementReferenceAt(depHandle.ID, depHandle.Generation, out var depExist);
if (depExist && Interlocked.Decrement(ref depJobInfo.dependencyCount) == 0)
{
EnqueueJobIfReady(depHandle);
EnqueueJobIfReady(depHandle, threadIndex);
}
}
@@ -319,7 +392,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
_jobInfoPool.Remove(handle.ID, handle.Generation);
}
public JobHandle Schedule<T>(ref readonly T job, int threadIndex, JobHandle dependency)
public JobHandle Schedule<T>(ref readonly T job, int threadIndex, JobHandle dependency, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJob
{
var pJobData = _freeList.Allocate(MemoryUtility.SizeOf<T>(), MemoryUtility.AlignOf<T>());
@@ -341,22 +414,22 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
jobRanges = JobRanges.Single,
};
return CreateJobHandle(ref jobInfo, dependency);
return CreateJobHandle(ref jobInfo, threadIndex, dependency);
}
public JobHandle Schedule<T>(ref readonly T job, int threadIndex)
public JobHandle Schedule<T>(ref readonly T job, int threadIndex, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJob
=> Schedule(in job, threadIndex, JobHandle.Invalid);
=> Schedule(in job, threadIndex, JobHandle.Invalid, priority);
public JobHandle Schedule<T>(ref readonly T job, JobHandle dependency)
public JobHandle Schedule<T>(ref readonly T job, JobHandle dependency, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJob
=> Schedule(in job, -1, dependency);
=> Schedule(in job, -1, dependency, priority);
public JobHandle Schedule<T>(ref readonly T job)
public JobHandle Schedule<T>(ref readonly T job, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJob
=> Schedule(in job, -1, JobHandle.Invalid);
=> Schedule(in job, -1, JobHandle.Invalid, priority);
public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency)
public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallelFor
{
var pJobData = _freeList.Allocate(MemoryUtility.SizeOf<T>(), MemoryUtility.AlignOf<T>());
@@ -386,22 +459,22 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
},
};
return CreateJobHandle(ref jobInfo, dependency);
return CreateJobHandle(ref jobInfo, threadIndex, dependency);
}
public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex)
public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallelFor
=> ScheduleParallelFor(in job, totalIteration, batchSize, threadIndex, JobHandle.Invalid);
=> ScheduleParallelFor(in job, totalIteration, batchSize, threadIndex, JobHandle.Invalid, priority);
public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, JobHandle dependency)
public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, JobHandle dependency, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallelFor
=> ScheduleParallelFor(in job, totalIteration, batchSize, -1, dependency);
=> ScheduleParallelFor(in job, totalIteration, batchSize, -1, dependency, priority);
public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize)
public JobHandle ScheduleParallelFor<T>(ref readonly T job, int totalIteration, int batchSize, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallelFor
=> ScheduleParallelFor(in job, totalIteration, batchSize, -1, JobHandle.Invalid);
=> ScheduleParallelFor(in job, totalIteration, batchSize, -1, JobHandle.Invalid, priority);
public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency)
public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobHandle dependency, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallel
{
var pJobData = _freeList.Allocate(MemoryUtility.SizeOf<T>(), MemoryUtility.AlignOf<T>());
@@ -431,20 +504,20 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
},
};
return CreateJobHandle(ref jobInfo, dependency);
return CreateJobHandle(ref jobInfo, threadIndex, dependency);
}
public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex)
public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, int threadIndex, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallel
=> ScheduleParallel(in job, totalIteration, batchSize, threadIndex, JobHandle.Invalid);
=> ScheduleParallel(in job, totalIteration, batchSize, threadIndex, JobHandle.Invalid, priority);
public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, JobHandle dependency)
public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, JobHandle dependency, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallel
=> ScheduleParallel(in job, totalIteration, batchSize, -1, dependency);
=> ScheduleParallel(in job, totalIteration, batchSize, -1, dependency, priority);
public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize)
public JobHandle ScheduleParallel<T>(ref readonly T job, int totalIteration, int batchSize, JobPriority priority = JobPriority.Normal)
where T : unmanaged, IJobParallel
=> ScheduleParallel(in job, totalIteration, batchSize, -1, JobHandle.Invalid);
=> ScheduleParallel(in job, totalIteration, batchSize, -1, JobHandle.Invalid, priority);
public JobHandle CombineDependencies(params ReadOnlySpan<JobHandle> dependencies)
{
@@ -459,7 +532,7 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
jobRanges = JobRanges.Single,
};
return CreateJobHandle(ref jobInfo, dependencies);
return CreateJobHandle(ref jobInfo, -1, dependencies);
}
public JobState GetJobStatus(JobHandle handle)
@@ -562,6 +635,45 @@ public sealed unsafe partial class JobScheduler : IJobScheduler, IDisposable
}
}
public Task WaitAsync(JobHandle handle, CancellationToken cancellationToken = default)
{
if (!handle.IsValid)
{
return Task.CompletedTask;
}
var workItem = new WaitItem(this, handle, cancellationToken);
ThreadPool.UnsafeQueueUserWorkItem(workItem, preferLocal: true);
return workItem.Task;
}
public Task WaitAllAsync(Memory<JobHandle> handles, CancellationToken cancellationToken = default)
{
if (handles.Length == 0)
{
return Task.CompletedTask;
}
var workItem = new WaitAllItem(this, handles, cancellationToken);
ThreadPool.UnsafeQueueUserWorkItem(workItem, preferLocal: true);
return workItem.Task;
}
public Task<JobHandle> WaitAnyAsync(ReadOnlyMemory<JobHandle> handles, CancellationToken cancellationToken = default)
{
if (handles.Length == 0)
{
return Task.FromResult(JobHandle.Invalid);
}
var workItem = new WaitAnyItem(this, handles, cancellationToken);
ThreadPool.UnsafeQueueUserWorkItem(workItem, preferLocal: true);
return workItem.Task;
}
public void Dispose()
{
if (_disposed)

View File

@@ -6,7 +6,7 @@
<Nullable>enable</Nullable>
<AllowUnsafeBlocks>True</AllowUnsafeBlocks>
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
<AssemblyVersion>1.6.1</AssemblyVersion>
<AssemblyVersion>2.0.0</AssemblyVersion>
<Version>$(AssemblyVersion)</Version>
<Authors>Misaki</Authors>
<PackageProjectUrl>https://git.personalnas.com/Misaki/Misaki.HighPerformance.git</PackageProjectUrl>

View File

@@ -4,23 +4,33 @@ namespace Misaki.HighPerformance.Jobs;
internal class WorkerThread : IDisposable
{
private const int _MAX_STEAL_ATTEMPTS = 8;
private readonly int _index;
private readonly Thread _thread;
private readonly ConcurrentQueue<JobHandle> _localQueue;
private readonly ConcurrentQueue<JobHandle>[] _localQueues;
private readonly JobScheduler _scheduler;
private readonly Random _random;
private readonly Random _stealRandom;
internal ConcurrentQueue<JobHandle> LocalQueue => _localQueue;
private readonly int _maxStealAttems;
private uint _priorityTick;
internal ReadOnlySpan<ConcurrentQueue<JobHandle>> LocalQueues => _localQueues;
public WorkerThread(int index, JobScheduler scheduler, ThreadPriority priority)
{
_index = index;
_localQueue = new();
_localQueues = new ConcurrentQueue<JobHandle>[3];
for (var i = 0; i < _localQueues.Length; i++)
{
_localQueues[i] = new ConcurrentQueue<JobHandle>();
}
_scheduler = scheduler;
_random = new Random(index * 9973 + Environment.TickCount);
_stealRandom = new Random(index * 9973 + Environment.TickCount);
_maxStealAttems = Math.Max((int)(_scheduler.WorkerCount * 0.5f), 3);
_thread = new Thread(WorkLoop)
{
@@ -30,27 +40,53 @@ internal class WorkerThread : IDisposable
};
}
public void Start() => _thread.Start();
private bool TryFindJob(out JobHandle handle)
public void Start()
{
if (_localQueue.TryDequeue(out handle))
{
return true;
}
_thread.Start();
}
if (_scheduler.TryStealFromMain(-1, out handle))
{
return true;
}
private unsafe bool TryFindJob(out JobHandle handle)
{
_priorityTick++;
for (var i = 0; i < _MAX_STEAL_ATTEMPTS; i++)
var tick = (int)(_priorityTick & 7);
// Ratio: 4 High (50%), 3 Normal (37.5%), 1 Low (12.5%)
var cascade = stackalloc int[24] {
0, 1, 2, // Tick 0 (High)
0, 1, 2, // Tick 1 (High)
0, 1, 2, // Tick 2 (High)
0, 1, 2, // Tick 3 (High)
1, 2, 0, // Tick 4 (Normal)
1, 2, 0, // Tick 5 (Normal)
1, 2, 0, // Tick 6 (Normal)
2, 0, 1 // Tick 7 (Low)
};
var index = tick * 3;
for (var offset = 0; offset < 3; offset++)
{
var randomIndex = _random.Next(0, _scheduler.WorkerCount);
if (randomIndex != _index && _scheduler.TryStealFromWorker(randomIndex, out handle))
var p = cascade[index + offset];
if (_localQueues[p].TryDequeue(out handle))
{
return true;
}
if (_scheduler.TryStealFromMain(p, out handle))
{
return true;
}
for (var i = 1; i < _scheduler.WorkerCount; i++)
{
// Calculate the target deterministically using modulo arithmetic
var targetIndex = (_index + i) % _scheduler.WorkerCount;
if (_scheduler.TryStealFromWorker(targetIndex, p, out handle))
{
return true;
}
}
}
handle = JobHandle.Invalid;
@@ -121,7 +157,7 @@ internal class WorkerThread : IDisposable
}
}
_scheduler.MarkJobComplete(handle);
_scheduler.MarkJobComplete(handle, _index);
}
}
}