Skip to content

Commit

Permalink
Sqlite IPC Rework (#502)
Browse files Browse the repository at this point in the history
* Use TEXT instead of VARCHAR

* Force ascending aliased rowid values

* Dispose rented connection and command early

* Serialize to bytes early

* Notify subscribers AFTER reading

* Update test

* Rework SqliteIPC

* Update

* Use Queue instead of Stack

* Cleanup ReaderLoop

* Seal SqliteIPC

* Add WriterLoop

* Publish jobs early when in-process

* ReaderLoop is for other processes

* Cleanup

* Cleanup

* Update messages

* Add EnsureWrite
  • Loading branch information
erri120 committed Aug 3, 2023
1 parent 893efda commit 5251b88
Show file tree
Hide file tree
Showing 6 changed files with 586 additions and 285 deletions.
3 changes: 2 additions & 1 deletion src/NexusMods.CLI/Types/IpcHandlers/NxmIpcProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class NxmIpcProtocolHandler : IIpcProtocolHandler
/// <inheritdoc/>
public string Protocol => "nxm";

private IMessageProducer<NXMUrlMessage> _messages;
private readonly IMessageProducer<NXMUrlMessage> _messages;

/// <summary>
/// constructor
Expand All @@ -27,6 +27,7 @@ public NxmIpcProtocolHandler(IMessageProducer<NXMUrlMessage> messages)
public async Task Handle(string url, CancellationToken cancel)
{
await _messages.Write(new NXMUrlMessage { Value = NXMUrl.Parse(url) }, cancel);
_messages.EnsureWrite(cancel);
}
}

9 changes: 8 additions & 1 deletion src/NexusMods.DataModel/Interprocess/IMessageProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@ namespace NexusMods.DataModel.Interprocess;
/// A message producer for sending messages to other processes.
/// </summary>
/// <typeparam name="T">Message type to send, each message type gets its own queue</typeparam>
public interface IMessageProducer<T> where T : IMessage
public interface IMessageProducer<in T> where T : IMessage
{
/// <summary>
/// Sends a message to the queue.
/// </summary>
/// <param name="message">The message to write.</param>
/// <param name="token">Can be used to cancel this operation.</param>
public ValueTask Write(T message, CancellationToken token);

/// <summary>
/// Ensures all messages in-flight have been saved to the database.
/// This is done using an <see cref="EventWaitHandle"/> and this call
/// will block the entire thread.
/// </summary>
public void EnsureWrite(CancellationToken cancellationToken);
}
20 changes: 20 additions & 0 deletions src/NexusMods.DataModel/Interprocess/InterprocessProducer.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
using System.Diagnostics;
using System.Reactive.Linq;

namespace NexusMods.DataModel.Interprocess;

/// <summary>
Expand Down Expand Up @@ -48,4 +51,21 @@ private void WriteInner(T message)
var used = message.Write(buffer);
_sqliteIpc.Send(_queueName, buffer[..used]);
}

/// <inheritdoc/>
public void EnsureWrite(CancellationToken cancellationToken)
{
var timeout = SqliteIPC.WriterLoopInterval + SqliteIPC.SqliteDefaultTimeout;
Thread.Sleep(timeout);

const int maxIterations = 10;
var i = 0;

while (i < maxIterations && !cancellationToken.IsCancellationRequested)
{
var signaled = _sqliteIpc.WriterLoopFinished.WaitOne(timeout);
if (signaled) break;
i += 1;
}
}
}
32 changes: 32 additions & 0 deletions src/NexusMods.DataModel/Interprocess/SemaphoreSlimWaiter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace NexusMods.DataModel.Interprocess;

internal readonly struct SemaphoreSlimWaiter : IDisposable
{
private readonly SemaphoreSlim _semaphoreSlim;

public bool HasEntered { get; }

internal SemaphoreSlimWaiter(SemaphoreSlim semaphoreSlim, bool entered)
{
_semaphoreSlim = semaphoreSlim;
HasEntered = entered;
}

public void Dispose()
{
if (!HasEntered) return;
_semaphoreSlim.Release();
}
}

internal static class SemaphoreExtensions
{
public static SemaphoreSlimWaiter CustomWait(
this SemaphoreSlim semaphoreSlim,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
var entered = semaphoreSlim.Wait(timeout, cancellationToken);
return new SemaphoreSlimWaiter(semaphoreSlim, entered);
}
}
Loading

0 comments on commit 5251b88

Please sign in to comment.