From 50d40ec27608d97176e39025e5f1b5fec641372a Mon Sep 17 00:00:00 2001 From: BRUNER Patrick Date: Mon, 22 Jun 2026 17:35:59 +0200 Subject: [PATCH] Remove dead stream-reader spikes (Channel, Pipeline, PipelineNew) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These three readers were performance spikes attempting to beat .NET's buffered StreamReader. They were never wired into the ReaderType enum or the factory, so the factory could only ever produce Legacy, System, or SystemDirect. Channel and PipelineNew were referenced nowhere; Pipeline lived only in a doc comment and one test. The spikes' payoff was the finding that stream I/O is not the read-path bottleneck (buffer creation and line-location are) — already captured in PositionAwareStreamReaderDirect. The code artifacts are spent. - Delete Channel/Pipeline/PipelineNew (~2,186 lines) - Remove the 7 Pipeline* tests (seeking/unicode/max-length/empty-line behaviour is already covered by the System and Direct tests) - Update ILogStreamReader doc comment to list the three real readers and drop the NotSupportedException reference to the removed Pipeline reader The three-reader seam is kept deliberately: it varies across a speed-vs-format-robustness axis (see ADR 0006). --- .../PositionAwareStreamReaderChannel.cs | 754 ------------------ .../PositionAwareStreamReaderPipeline.cs | 730 ----------------- .../PositionAwareStreamReaderPipelineNew.cs | 702 ---------------- .../Interfaces/ILogStreamReader.cs | 10 +- .../StreamReaderTests/LogStreamReaderTest.cs | 146 ---- 5 files changed, 5 insertions(+), 2337 deletions(-) delete mode 100644 src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderChannel.cs delete mode 100644 src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderPipeline.cs delete mode 100644 src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderPipelineNew.cs diff --git a/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderChannel.cs b/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderChannel.cs deleted file mode 100644 index 8edc6cf8..00000000 --- a/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderChannel.cs +++ /dev/null @@ -1,754 +0,0 @@ -using System.Buffers; -using System.Collections.Concurrent; -using System.IO.Pipelines; -using System.Text; -using System.Threading.Channels; - -using LogExpert.Core.Entities; -using LogExpert.Core.Interfaces; - -namespace LogExpert.Core.Classes.Log.Streamreaders; - -public class PositionAwareStreamReaderChannel : LogStreamReaderBase, ILogStreamReaderMemory -{ - private const int DEFAULT_BYTE_BUFFER_SIZE = 64 * 1024; // 64 KB - private const int MINIMUM_READ_AHEAD_SIZE = 4 * 1024; // 4 KB - private const int DEFAULT_CHANNEL_CAPACITY = 128; // Number of line segments - - private static readonly Encoding[] _preambleEncodings = - [ - Encoding.UTF8, - Encoding.Unicode, - Encoding.BigEndianUnicode, - Encoding.UTF32 - ]; - - private readonly StreamPipeReaderOptions _streamPipeReaderOptions = new(bufferSize: DEFAULT_BYTE_BUFFER_SIZE, minimumReadSize: MINIMUM_READ_AHEAD_SIZE, leaveOpen: true); - private readonly int _maximumLineLength; - private readonly Lock _reconfigureLock = new(); - private readonly BufferedStream _stream; - private readonly Encoding _encoding; - private readonly int _byteBufferSize; - private readonly int _charBufferSize; - private readonly long _preambleLength; - - private Channel _lineChannel; - private ChannelReader _reader; - private ChannelWriter _writer; - - private LineSegment? _currentSegment; - - private PipeReader _pipeReader; - private CancellationTokenSource _cts; - private Task _producerTask; - private bool _isDisposed; - private long _position; - - // Line queue - using BlockingCollection for thread-safe, race-free synchronization - private BlockingCollection _lineQueue; - private Exception _producerException; - - public PositionAwareStreamReaderChannel (Stream stream, EncodingOptions encodingOptions, int maximumLineLength) - { - ArgumentNullException.ThrowIfNull(stream); - - if (!stream.CanRead) - { - throw new ArgumentException("Stream must support reading.", nameof(stream)); - } - - if (!stream.CanSeek) - { - throw new ArgumentException("Stream must support seeking.", nameof(stream)); - } - - if (maximumLineLength <= 0) - { - maximumLineLength = 1024; - } - - _stream = new BufferedStream(stream); - - _maximumLineLength = maximumLineLength; - - _byteBufferSize = DEFAULT_BYTE_BUFFER_SIZE; - var (length, detectedEncoding) = DetectPreambleLength(stream); - _preambleLength = length; - _encoding = DetermineEncoding(encodingOptions, detectedEncoding); - - - _charBufferSize = Math.Max(_encoding.GetMaxCharCount(_byteBufferSize), _maximumLineLength + 2); - - // Start the pipeline (will create the collection) - RestartPipelineInternal(0); - } - - public override long Position - { - get => Interlocked.Read(ref _position); - set - { - ArgumentOutOfRangeException.ThrowIfNegative(value); - RestartPipeline(value); - } - } - - public override bool IsBufferComplete => true; - - public override Encoding Encoding => _encoding; - - public override bool IsDisposed - { - get => _isDisposed; - protected set => _isDisposed = value; - } - - public override int ReadChar () - { - throw new NotSupportedException("PipelineLogStreamReader currently supports line-based reads only."); - } - - public static Encoding DetermineEncoding (EncodingOptions options, Encoding detectedEncoding) - { - return options?.Encoding != null - ? options.Encoding - : detectedEncoding ?? options?.DefaultEncoding ?? Encoding.Default; - } - - public override string ReadLine () - { - ObjectDisposedException.ThrowIf(IsDisposed, GetType()); - - var producerEx = Volatile.Read(ref _producerException); - if (producerEx != null) - { - throw new InvalidOperationException("Producer task encountered an error.", producerEx); - } - - LineSegment segment; - try - { - //Channel read (more efficient than BlockingCollection) - if (!_reader.TryRead(out segment)) - { - // Use async path if not immediately available - var task = _reader.ReadAsync(_cts?.Token ?? CancellationToken.None); - segment = !task.IsCompleted - ? task.AsTask().GetAwaiter().GetResult() - : task.GetAwaiter().GetResult(); - } - } - catch (OperationCanceledException) - { - return null; - } - catch (ChannelClosedException) - { - return null; - } - - using (segment) - { - if (segment.IsEof) - { - return null; - } - - var line = new string(segment.Buffer, 0, segment.Length); - _ = Interlocked.Exchange(ref _position, segment.ByteOffset + segment.ByteLength); - return line; - } - } - - protected override void Dispose (bool disposing) - { - if (_isDisposed) - { - return; - } - - if (disposing) - { - using (_reconfigureLock.EnterScope()) - { - CancelPipelineLocked(); - - // Clean up remaining items and dispose collection - if (_lineQueue != null) - { - while (_lineQueue.TryTake(out var segment)) - { - segment.Dispose(); - } - - _lineQueue.Dispose(); - } - - _stream?.Dispose(); - } - } - - _isDisposed = true; - } - - private void RestartPipelineInternal (long startPosition) - { - // Seek stream to start position (accounting for preamble) - _ = _stream.Seek(_preambleLength + startPosition, SeekOrigin.Begin); - - // Create PipeReader - _pipeReader = PipeReader.Create(_stream, _streamPipeReaderOptions); - - // CRITICAL: Create a NEW BlockingCollection instance - // Once CompleteAdding() is called, a BlockingCollection cannot be reused - _lineQueue = new BlockingCollection(new ConcurrentQueue(), DEFAULT_CHANNEL_CAPACITY); - - Volatile.Write(ref _producerException, null); - - // Create cancellation token - _cts = new CancellationTokenSource(); - - _lineChannel = Channel.CreateBounded(new BoundedChannelOptions(128) - { - FullMode = BoundedChannelFullMode.Wait, - SingleReader = true, - SingleWriter = true - }); - - _reader = _lineChannel.Reader; - _writer = _lineChannel.Writer; - - // Start producer task - _producerTask = Task.Run(() => ProduceAsync(startPosition, _cts.Token), CancellationToken.None); - - // Update position - _ = Interlocked.Exchange(ref _position, startPosition); - } - - private void RestartPipeline (long newPosition) - { - using (_reconfigureLock.EnterScope()) - { - CancelPipelineLocked(); - RestartPipelineInternal(newPosition); - } - } - - /// - /// Cancels the current pipeline operation and releases associated resources. This method should be called while - /// holding the appropriate lock to ensure thread safety. - /// - /// This method cancels any ongoing producer task, marks the internal queue as complete to - /// unblock waiting consumers, and disposes of pipeline resources. It is intended for internal use and must be - /// invoked only when the pipeline is in a valid state for cancellation. - private void CancelPipelineLocked () - { - if (_cts == null) - { - return; - } - - try - { - _cts.Cancel(); - } - catch (ObjectDisposedException) - { - // Ignore if already disposed - } - - try - { - _producerTask?.Wait(); - } - catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is OperationCanceledException)) - { - // Expected cancellation - } - finally - { - _cts.Dispose(); - _cts = null; - } - - _writer?.Complete(); - - // Mark collection as complete to unblock any waiting Take() calls - // This must happen AFTER the producer task is cancelled and finished - if (_lineQueue != null && !_lineQueue.IsAddingCompleted) - { - _lineQueue.CompleteAdding(); - } - - // Complete and dispose the PipeReader - if (_pipeReader != null) - { - try - { - _pipeReader.Complete(); - } - catch (Exception) - { - // Ignore errors during completion - } - } - } - - private async Task ProduceAsync (long startByteOffset, CancellationToken token) - { - var charPool = ArrayPool.Shared; - char[] charBuffer = null; - Decoder decoder = null; - - try - { - // Allocate char buffer - charBuffer = charPool.Rent(_charBufferSize); - decoder = _encoding.GetDecoder(); - - var charsInBuffer = 0; - var byteOffset = startByteOffset; - - while (!token.IsCancellationRequested) - { - // Read from pipe - ReadResult result = await _pipeReader.ReadAsync(token).ConfigureAwait(false); - ReadOnlySequence buffer = result.Buffer; - - if (buffer.Length > 0) - { - // Process the buffer - decode and extract lines - var state = ProcessBuffer(buffer, charBuffer, charsInBuffer, decoder, byteOffset, result.IsCompleted); - charsInBuffer = state.charsInBuffer; - byteOffset = state.byteOffset; - - // Advance the reader - _pipeReader.AdvanceTo(buffer.End); - } - - if (result.IsCompleted) - { - // Handle any remaining chars in buffer as final line - if (charsInBuffer > 0) - { - var segment = CreateSegment(charBuffer, 0, charsInBuffer, 0, byteOffset); - EnqueueLine(segment); - byteOffset += segment.ByteLength; - } - - // Send EOF marker - EnqueueLine(LineSegment.CreateEof(byteOffset)); - break; - } - } - } - catch (OperationCanceledException) - { - // Expected when Position is changed or disposed - } - catch (Exception ex) - { - // Store exception to rethrow in ReadLine - Volatile.Write(ref _producerException, ex); - } - finally - { - // Always mark collection as complete when producer finishes - try - { - _lineQueue?.CompleteAdding(); - } - catch (ObjectDisposedException) - { - // Collection was already disposed - } - - if (charBuffer != null) - { - charPool.Return(charBuffer); - } - } - } - - private void EnqueueLine (LineSegment segment) - { - try - { - if (!_writer.TryWrite(segment)) - { - // Use async path if buffer is full - _writer.WriteAsync(segment, _cts.Token).AsTask().GetAwaiter().GetResult(); - } - } - catch (ChannelClosedException) - { - // Collection was marked as complete, dispose the segment - segment.Dispose(); - } - } - - private (int charsInBuffer, long byteOffset) ProcessBuffer ( - ReadOnlySequence buffer, - char[] charBuffer, - int charsInBuffer, - Decoder decoder, - long byteOffset, - bool isCompleted) - { - var localByteOffset = byteOffset; - var localCharsInBuffer = charsInBuffer; - - // Decode bytes to chars - if (buffer.IsSingleSegment) - { - // Fast path for single segment - var span = buffer.FirstSpan; - (localCharsInBuffer, localByteOffset) = DecodeAndProcessSegment(span, charBuffer, localCharsInBuffer, decoder, localByteOffset, isCompleted); - } - else - { - // Slow path for multi-segment - foreach (var segment in buffer) - { - (localCharsInBuffer, localByteOffset) = DecodeAndProcessSegment(segment.Span, charBuffer, localCharsInBuffer, decoder, localByteOffset, false); - } - - if (isCompleted) - { - // Flush decoder on completion - decoder.Convert([], 0, 0, charBuffer, localCharsInBuffer, - _charBufferSize - localCharsInBuffer, true, - out _, out var charsProduced, out _); - localCharsInBuffer += charsProduced; - } - } - - // Scan for complete lines - var searchIndex = 0; - while (true) - { - var (newlineIndex, newlineChars) = FindNewlineIndex(charBuffer, searchIndex, localCharsInBuffer - searchIndex, isCompleted); - - if (newlineIndex == -1) - { - break; - } - - var lineLength = newlineIndex - searchIndex; - var segment = CreateSegment(charBuffer, searchIndex, lineLength, newlineChars, localByteOffset); - localByteOffset += segment.ByteLength; - EnqueueLine(segment); - searchIndex = newlineIndex + newlineChars; - } - - // Move remaining chars to beginning of buffer - var remaining = localCharsInBuffer - searchIndex; - if (remaining > 0 && searchIndex > 0) - { - charBuffer.AsSpan(searchIndex, remaining).CopyTo(charBuffer.AsSpan(0, remaining)); - //Array.Copy(charBuffer, searchIndex, charBuffer, 0, remaining); - } - - return (remaining, localByteOffset); - } - - private (int charsInBuffer, long byteOffset) DecodeAndProcessSegment (ReadOnlySpan bytes, char[] charBuffer, int charsInBuffer, Decoder decoder, long byteOffset, bool flush) - { - var bytesConsumed = 0; - - while (bytesConsumed < bytes.Length) - { - var charsAvailable = _charBufferSize - charsInBuffer; - - // CRITICAL FIX: Process lines when buffer is getting full - if (charsAvailable < 100) // Leave room for multi-byte sequences - { - // Process lines to free up space - var searchIndex = 0; - while (searchIndex < charsInBuffer) - { - var available = charsInBuffer - searchIndex; - var (newlineIndex, newlineChars) = FindNewlineIndex(charBuffer, searchIndex, available, false); - - if (newlineIndex == -1) - { - // No more complete lines found - var remaining = charsInBuffer - searchIndex; - if (remaining > 0 && searchIndex > 0) - { - charBuffer.AsSpan(searchIndex, remaining).CopyTo(charBuffer.AsSpan(0, remaining)); - //Array.Copy(charBuffer, searchIndex, charBuffer, 0, remaining); - } - - charsInBuffer = remaining; - break; - } - - // Found a line - create and enqueue it - var lineLength = newlineIndex - searchIndex; - var segment = CreateSegment(charBuffer, searchIndex, lineLength, newlineChars, byteOffset); - byteOffset += segment.ByteLength; - EnqueueLine(segment); - searchIndex = newlineIndex + newlineChars; - } - - // If still no space, force process current content as truncated line - if (charsInBuffer >= _charBufferSize - 100 && charsInBuffer > 0) - { - var segment = CreateSegment(charBuffer, 0, charsInBuffer, 0, byteOffset); - byteOffset += segment.ByteLength; - EnqueueLine(segment); - charsInBuffer = 0; - } - - charsAvailable = _charBufferSize - charsInBuffer; - - if (charsAvailable < 10) - { - // Still no space - exit to avoid infinite loop - break; - } - } - - decoder.Convert( - bytes[bytesConsumed..], - charBuffer.AsSpan(charsInBuffer), - flush && bytesConsumed == bytes.Length, - out var usedBytes, - out var charsProduced, - out _); - - bytesConsumed += usedBytes; - charsInBuffer += charsProduced; - } - - return (charsInBuffer, byteOffset); - } - - /// - /// Finds the next newline in the char buffer. - /// Handles \r, \n, and \r\n as newline delimiters. - /// - /// The char buffer to search - /// Start index for search - /// Number of chars available to search - /// If true, treats \r at end of buffer as newline - /// Tuple of (newline index, newline char count) - private static (int newLineIndex, int newLineChars) FindNewlineIndex ( - char[] buffer, - int start, - int available, - bool allowStandaloneCr) - { - var span = buffer.AsSpan(start, available); - - //Vectorized Search for \n - var lfIndex = span.IndexOf('\n'); - if (lfIndex != -1) - { - // Found \n - check if preceded by \r - if (lfIndex > 0 && span[lfIndex - 1] == '\r') - { - return (newLineIndex: start + lfIndex - 1, newLineChars: 2); - } - - return (newLineIndex: start + lfIndex, newLineChars: 1); - } - - //Vectorized search for \r - var crIndex = span.IndexOf('\r'); - if (crIndex != -1) - { - // Check if at end of buffer - if (crIndex + 1 >= span.Length) - { - if (allowStandaloneCr) - { - return (newLineIndex: start + crIndex, newLineChars: 1); - } - - return (newLineIndex: -1, newLineChars: 0); - } - - // Check next char - if (span[crIndex + 1] != '\n') - { - return (newLineIndex: start + crIndex, newLineChars: 1); - } - } - - return (newLineIndex: -1, newLineChars: 0); - } - - /// - /// Creates a LineSegment from the char buffer, handling truncation. - /// - private LineSegment CreateSegment ( - char[] source, - int start, - int lineLength, - int newlineChars, - long byteOffset) - { - var consumedChars = lineLength + newlineChars; - - // Calculate byte length for position tracking - var byteLength = consumedChars == 0 - ? 0 - : _encoding.GetByteCount(source, start, consumedChars); - - // Apply maximum line length constraint - var logicalLength = Math.Min(lineLength, _maximumLineLength); - var truncated = lineLength > logicalLength; - - // Rent buffer from pool (ensure at least size 1) - var rentalLength = Math.Max(logicalLength, 1); - var buffer = ArrayPool.Shared.Rent(rentalLength); - - // Copy line content (excluding newline) - if (logicalLength > 0) - { - source.AsSpan(start, logicalLength).CopyTo(buffer.AsSpan(0, logicalLength)); - //Array.Copy(source, start, buffer, 0, logicalLength); - } - - return new LineSegment(buffer, logicalLength, byteOffset, byteLength, truncated, false); - } - - private static (int length, Encoding? detectedEncoding) DetectPreambleLength (Stream stream) - { - if (!stream.CanSeek) - { - return (0, null); - } - - var originalPos = stream.Position; - var buffer = new byte[4]; - _ = stream.Seek(0, SeekOrigin.Begin); - var readBytes = stream.Read(buffer, 0, buffer.Length); - _ = stream.Seek(originalPos, SeekOrigin.Begin); - - if (readBytes >= 2) - { - foreach (var encoding in _preambleEncodings) - { - var preamble = encoding.GetPreamble(); - var fail = false; - for (var i = 0; i < readBytes && i < preamble.Length; ++i) - { - if (buffer[i] != preamble[i]) - { - fail = true; - break; - } - } - - if (!fail) - { - return (preamble.Length, encoding); - } - } - } - - return (0, null); - } - - public bool TryReadLine (out ReadOnlyMemory lineMemory) - { - ObjectDisposedException.ThrowIf(IsDisposed, GetType()); - - var producerEx = Volatile.Read(ref _producerException); - if (producerEx != null) - { - throw new InvalidOperationException("Producer task encountered an error.", producerEx); - } - - if (!_lineQueue.TryTake(out var segment, 100, _cts?.Token ?? CancellationToken.None)) - { - lineMemory = default; - return false; - } - - // Store segment for lifetime management - _currentSegment?.Dispose(); - _currentSegment = segment; - - if (segment.IsEof) - { - lineMemory = default; - return false; - } - - lineMemory = new ReadOnlyMemory(segment.Buffer, 0, segment.Length); - _ = Interlocked.Exchange(ref _position, segment.ByteOffset + segment.ByteLength); - return true; - } - - public void ReturnMemory (ReadOnlyMemory memory) - { - throw new NotImplementedException(); - } - - /// - /// Represents a line segment with its position and metadata. - /// Uses ArrayPool for efficient char buffer management. - /// - private readonly struct LineSegment : IDisposable - { - /// - /// The rented char buffer from ArrayPool. May be larger than Length. - /// - public char[] Buffer { get; } - - /// - /// The actual length of the line content in the buffer. - /// - public int Length { get; } - - /// - /// The byte offset in the stream where this line starts. - /// - public long ByteOffset { get; } - - /// - /// The number of bytes consumed from the stream for this line (including newline). - /// - public int ByteLength { get; } - - /// - /// True if the line was truncated due to maximum line length constraint. - /// - public bool IsTruncated { get; } - - /// - /// True if this is an EOF marker segment. - /// - public bool IsEof { get; } - - public LineSegment (char[] buffer, int length, long byteOffset, int byteLength, bool isTruncated, bool isEof) - { - Buffer = buffer; - Length = length; - ByteOffset = byteOffset; - ByteLength = byteLength; - IsTruncated = isTruncated; - IsEof = isEof; - } - - public void Dispose () - { - if (Buffer != null) - { - ArrayPool.Shared.Return(Buffer); - } - } - - /// - /// Creates an EOF marker segment. - /// - public static LineSegment CreateEof (long byteOffset) - { - return new LineSegment(null, 0, byteOffset, 0, false, true); - } - } -} diff --git a/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderPipeline.cs b/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderPipeline.cs deleted file mode 100644 index b85fd82f..00000000 --- a/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderPipeline.cs +++ /dev/null @@ -1,730 +0,0 @@ -using System.Buffers; -using System.Collections.Concurrent; -using System.IO.Pipelines; -using System.Text; - -using LogExpert.Core.Entities; -using LogExpert.Core.Interfaces; - -namespace LogExpert.Core.Classes.Log.Streamreaders; - -public class PositionAwareStreamReaderPipeline : LogStreamReaderBase, ILogStreamReaderMemory -{ - private const int DEFAULT_BYTE_BUFFER_SIZE = 64 * 1024; // 64 KB - private const int MINIMUM_READ_AHEAD_SIZE = 4 * 1024; // 4 KB - private const int DEFAULT_CHANNEL_CAPACITY = 128; // Number of line segments - - private static readonly Encoding[] _preambleEncodings = - [ - Encoding.UTF8, - Encoding.Unicode, - Encoding.BigEndianUnicode, - Encoding.UTF32 - ]; - - private readonly StreamPipeReaderOptions _streamPipeReaderOptions = new(bufferSize: DEFAULT_BYTE_BUFFER_SIZE, minimumReadSize: MINIMUM_READ_AHEAD_SIZE, leaveOpen: true); - private readonly int _maximumLineLength; - private readonly Lock _reconfigureLock = new(); - private readonly Stream _stream; - private readonly Encoding _encoding; - private readonly int _byteBufferSize; - private readonly int _charBufferSize; - private readonly long _preambleLength; - - private LineSegment? _currentSegment; - - private PipeReader _pipeReader; - private CancellationTokenSource _cts; - private Task _producerTask; - private bool _isDisposed; - private long _position; - - // Line queue - using BlockingCollection for thread-safe, race-free synchronization - private BlockingCollection _lineQueue; - private Exception _producerException; - - public PositionAwareStreamReaderPipeline (Stream stream, EncodingOptions encodingOptions, int maximumLineLength) - { - ArgumentNullException.ThrowIfNull(stream); - - if (!stream.CanRead) - { - throw new ArgumentException("Stream must support reading.", nameof(stream)); - } - - if (!stream.CanSeek) - { - throw new ArgumentException("Stream must support seeking.", nameof(stream)); - } - - if (maximumLineLength <= 0) - { - maximumLineLength = 1024; - } - - _maximumLineLength = maximumLineLength; - _byteBufferSize = DEFAULT_BYTE_BUFFER_SIZE; - var (length, detectedEncoding) = DetectPreambleLength(stream); - _preambleLength = length; - _encoding = DetermineEncoding(encodingOptions, detectedEncoding); - - _stream = stream; - _charBufferSize = Math.Max(_encoding.GetMaxCharCount(_byteBufferSize), _maximumLineLength + 2); - - // Start the pipeline (will create the collection) - RestartPipelineInternal(0); - } - - public override long Position - { - get => Interlocked.Read(ref _position); - set - { - ArgumentOutOfRangeException.ThrowIfNegative(value); - RestartPipeline(value); - } - } - - public override bool IsBufferComplete => true; - - public override Encoding Encoding => _encoding; - - public override bool IsDisposed - { - get => _isDisposed; - protected set => _isDisposed = value; - } - - public override int ReadChar () - { - throw new NotSupportedException("PipelineLogStreamReader currently supports line-based reads only."); - } - - public override string ReadLine () - { - if (TryReadLine(out var lineMemory)) - { - return new string(lineMemory.Span); // Only allocate when explicitly requested - } - - return null; - - //ObjectDisposedException.ThrowIf(IsDisposed, GetType()); - - //// Check for producer exception - //var producerEx = Volatile.Read(ref _producerException); - //if (producerEx != null) - //{ - // throw new InvalidOperationException("Producer task encountered an error.", producerEx); - //} - - //LineSegment segment; - //try - //{ - // // BlockingCollection.Take() blocks until an item is available or collection is completed - // // This eliminates the race condition present in the semaphore + queue approach - // segment = _lineQueue.Take(_cts?.Token ?? CancellationToken.None); - //} - //catch (OperationCanceledException) - //{ - // return null; - //} - //catch (InvalidOperationException) // Thrown when collection is marked as completed and empty - //{ - // return null; - //} - - //using (segment) - //{ - // if (segment.IsEof) - // { - // return null; - // } - - // var line = new string(segment.Buffer, 0, segment.Length); - // _ = Interlocked.Exchange(ref _position, segment.ByteOffset + segment.ByteLength); - // return line; - //} - } - - protected override void Dispose (bool disposing) - { - if (_isDisposed) - { - return; - } - - if (disposing) - { - using (_reconfigureLock.EnterScope()) - { - CancelPipelineLocked(); - - // Clean up remaining items and dispose collection - if (_lineQueue != null) - { - while (_lineQueue.TryTake(out var segment)) - { - segment.Dispose(); - } - - _lineQueue.Dispose(); - } - - _stream?.Dispose(); - } - } - - _isDisposed = true; - } - - private void RestartPipelineInternal (long startPosition) - { - // Seek stream to start position (accounting for preamble) - _ = _stream.Seek(_preambleLength + startPosition, SeekOrigin.Begin); - - // Create PipeReader - _pipeReader = PipeReader.Create(_stream, _streamPipeReaderOptions); - - _lineQueue = new BlockingCollection(new ConcurrentQueue(), DEFAULT_CHANNEL_CAPACITY); - - Volatile.Write(ref _producerException, null); - - // Create cancellation token - _cts = new CancellationTokenSource(); - - // Start producer task - _producerTask = Task.Run(() => ProduceAsync(startPosition, _cts.Token), CancellationToken.None); - - // Update position - _ = Interlocked.Exchange(ref _position, startPosition); - } - - private void RestartPipeline (long newPosition) - { - using (_reconfigureLock.EnterScope()) - { - CancelPipelineLocked(); - RestartPipelineInternal(newPosition); - } - } - - /// - /// Cancels the current pipeline operation and releases associated resources. This method should be called while - /// holding the appropriate lock to ensure thread safety. - /// - /// This method cancels any ongoing producer task, marks the internal queue as complete to - /// unblock waiting consumers, and disposes of pipeline resources. It is intended for internal use and must be - /// invoked only when the pipeline is in a valid state for cancellation. - private void CancelPipelineLocked () - { - if (_cts == null) - { - return; - } - - try - { - _cts.Cancel(); - } - catch (ObjectDisposedException) - { - // Ignore if already disposed - } - - try - { - _producerTask?.Wait(); - } - catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is OperationCanceledException)) - { - // Expected cancellation - } - finally - { - _cts.Dispose(); - _cts = null; - } - - // Mark collection as complete to unblock any waiting Take() calls - // This must happen AFTER the producer task is cancelled and finished - if (_lineQueue != null && !_lineQueue.IsAddingCompleted) - { - _lineQueue.CompleteAdding(); - } - - // Complete and dispose the PipeReader - if (_pipeReader != null) - { - try - { - _pipeReader.Complete(); - } - catch (Exception) - { - // Ignore errors during completion - } - } - } - - private async Task ProduceAsync (long startByteOffset, CancellationToken token) - { - var charPool = ArrayPool.Shared; - char[] charBuffer = null; - Decoder decoder = null; - - try - { - // Allocate char buffer - charBuffer = charPool.Rent(_charBufferSize); - decoder = _encoding.GetDecoder(); - - var charsInBuffer = 0; - var byteOffset = startByteOffset; - - while (!token.IsCancellationRequested) - { - // Read from pipe - ReadResult result = await _pipeReader.ReadAsync(token).ConfigureAwait(false); - ReadOnlySequence buffer = result.Buffer; - - if (buffer.Length > 0) - { - // Process the buffer - decode and extract lines - var state = ProcessBuffer(buffer, charBuffer, charsInBuffer, decoder, byteOffset, result.IsCompleted); - charsInBuffer = state.charsInBuffer; - byteOffset = state.byteOffset; - - // Advance the reader - _pipeReader.AdvanceTo(buffer.End); - } - - if (result.IsCompleted) - { - // Handle any remaining chars in buffer as final line - if (charsInBuffer > 0) - { - var segment = CreateSegment(charBuffer, 0, charsInBuffer, 0, byteOffset); - EnqueueLine(segment); - byteOffset += segment.ByteLength; - } - - // Send EOF marker - EnqueueLine(LineSegment.CreateEof(byteOffset)); - break; - } - } - } - catch (OperationCanceledException) - { - // Expected when Position is changed or disposed - } - catch (Exception ex) - { - // Store exception to rethrow in ReadLine - Volatile.Write(ref _producerException, ex); - } - finally - { - // Always mark collection as complete when producer finishes - try - { - _lineQueue?.CompleteAdding(); - } - catch (ObjectDisposedException) - { - // Collection was already disposed - } - - if (charBuffer != null) - { - charPool.Return(charBuffer); - } - } - } - - private void EnqueueLine (LineSegment segment) - { - try - { - _lineQueue.Add(segment, _cts.Token); - } - catch (InvalidOperationException) - { - // Collection was marked as complete, dispose the segment - segment.Dispose(); - } - } - - private (int charsInBuffer, long byteOffset) ProcessBuffer ( - ReadOnlySequence buffer, - char[] charBuffer, - int charsInBuffer, - Decoder decoder, - long byteOffset, - bool isCompleted) - { - var localByteOffset = byteOffset; - var localCharsInBuffer = charsInBuffer; - - // Decode bytes to chars - if (buffer.IsSingleSegment) - { - // Fast path for single segment - var span = buffer.FirstSpan; - (localCharsInBuffer, localByteOffset) = DecodeAndProcessSegment(span, charBuffer, localCharsInBuffer, decoder, localByteOffset, isCompleted); - } - else - { - // Slow path for multi-segment - foreach (var segment in buffer) - { - (localCharsInBuffer, localByteOffset) = DecodeAndProcessSegment(segment.Span, charBuffer, localCharsInBuffer, decoder, localByteOffset, false); - } - - if (isCompleted) - { - // Flush decoder on completion - decoder.Convert([], 0, 0, charBuffer, localCharsInBuffer, - _charBufferSize - localCharsInBuffer, true, - out _, out var charsProduced, out _); - localCharsInBuffer += charsProduced; - } - } - - // Scan for complete lines - var searchIndex = 0; - while (true) - { - var (newlineIndex, newlineChars) = FindNewlineIndex(charBuffer, searchIndex, localCharsInBuffer - searchIndex, isCompleted); - - if (newlineIndex == -1) - { - break; - } - - var lineLength = newlineIndex - searchIndex; - var segment = CreateSegment(charBuffer, searchIndex, lineLength, newlineChars, localByteOffset); - localByteOffset += segment.ByteLength; - EnqueueLine(segment); - searchIndex = newlineIndex + newlineChars; - } - - // Move remaining chars to beginning of buffer - var remaining = localCharsInBuffer - searchIndex; - if (remaining > 0 && searchIndex > 0) - { - charBuffer.AsSpan(searchIndex, remaining).CopyTo(charBuffer.AsSpan(0, remaining)); - //Array.Copy(charBuffer, searchIndex, charBuffer, 0, remaining); - } - - return (remaining, localByteOffset); - } - - private (int charsInBuffer, long byteOffset) DecodeAndProcessSegment (ReadOnlySpan bytes, char[] charBuffer, int charsInBuffer, Decoder decoder, long byteOffset, bool flush) - { - var bytesConsumed = 0; - - while (bytesConsumed < bytes.Length) - { - var charsAvailable = _charBufferSize - charsInBuffer; - - // CRITICAL FIX: Process lines when buffer is getting full - if (charsAvailable < 100) // Leave room for multi-byte sequences - { - // Process lines to free up space - var searchIndex = 0; - while (searchIndex < charsInBuffer) - { - var available = charsInBuffer - searchIndex; - var (newlineIndex, newlineChars) = FindNewlineIndex(charBuffer, searchIndex, available, false); - - if (newlineIndex == -1) - { - // No more complete lines found - var remaining = charsInBuffer - searchIndex; - if (remaining > 0 && searchIndex > 0) - { - charBuffer.AsSpan(searchIndex, remaining).CopyTo(charBuffer.AsSpan(0, remaining)); - //Array.Copy(charBuffer, searchIndex, charBuffer, 0, remaining); - } - - charsInBuffer = remaining; - break; - } - - // Found a line - create and enqueue it - var lineLength = newlineIndex - searchIndex; - var segment = CreateSegment(charBuffer, searchIndex, lineLength, newlineChars, byteOffset); - byteOffset += segment.ByteLength; - EnqueueLine(segment); - searchIndex = newlineIndex + newlineChars; - } - - // If still no space, force process current content as truncated line - if (charsInBuffer >= _charBufferSize - 100 && charsInBuffer > 0) - { - var segment = CreateSegment(charBuffer, 0, charsInBuffer, 0, byteOffset); - byteOffset += segment.ByteLength; - EnqueueLine(segment); - charsInBuffer = 0; - } - - charsAvailable = _charBufferSize - charsInBuffer; - - if (charsAvailable < 10) - { - // Still no space - exit to avoid infinite loop - break; - } - } - - decoder.Convert( - bytes[bytesConsumed..], - charBuffer.AsSpan(charsInBuffer), - flush && bytesConsumed == bytes.Length, - out var usedBytes, - out var charsProduced, - out _); - - bytesConsumed += usedBytes; - charsInBuffer += charsProduced; - } - - return (charsInBuffer, byteOffset); - } - - /// - /// Finds the next newline in the char buffer. - /// Handles \r, \n, and \r\n as newline delimiters. - /// - /// The char buffer to search - /// Start index for search - /// Number of chars available to search - /// If true, treats \r at end of buffer as newline - /// Tuple of (newline index, newline char count) - private static (int newLineIndex, int newLineChars) FindNewlineIndex ( - char[] buffer, - int start, - int available, - bool allowStandaloneCr) - { - var span = buffer.AsSpan(start, available); - - //Vectorized Search for \n - var lfIndex = span.IndexOf('\n'); - if (lfIndex != -1) - { - // Found \n - check if preceded by \r - if (lfIndex > 0 && span[lfIndex - 1] == '\r') - { - return (newLineIndex: start + lfIndex - 1, newLineChars: 2); - } - - return (newLineIndex: start + lfIndex, newLineChars: 1); - } - - //Vectorized search for \r - var crIndex = span.IndexOf('\r'); - if (crIndex != -1) - { - // Check if at end of buffer - if (crIndex + 1 >= span.Length) - { - if (allowStandaloneCr) - { - return (newLineIndex: start + crIndex, newLineChars: 1); - } - - return (newLineIndex: -1, newLineChars: 0); - } - - // Check next char - if (span[crIndex + 1] != '\n') - { - return (newLineIndex: start + crIndex, newLineChars: 1); - } - } - - return (newLineIndex: -1, newLineChars: 0); - } - - /// - /// Creates a LineSegment from the char buffer, handling truncation. - /// - private LineSegment CreateSegment ( - char[] source, - int start, - int lineLength, - int newlineChars, - long byteOffset) - { - var consumedChars = lineLength + newlineChars; - - // Calculate byte length for position tracking - var byteLength = consumedChars == 0 - ? 0 - : _encoding.GetByteCount(source, start, consumedChars); - - // Apply maximum line length constraint - var logicalLength = Math.Min(lineLength, _maximumLineLength); - var truncated = lineLength > logicalLength; - - // Rent buffer from pool (ensure at least size 1) - var rentalLength = Math.Max(logicalLength, 1); - var buffer = ArrayPool.Shared.Rent(rentalLength); - - // Copy line content (excluding newline) - if (logicalLength > 0) - { - source.AsSpan(start, logicalLength).CopyTo(buffer.AsSpan(0, logicalLength)); - //Array.Copy(source, start, buffer, 0, logicalLength); - } - - return new LineSegment(buffer, logicalLength, byteOffset, byteLength, truncated, false); - } - - private static Encoding DetermineEncoding (EncodingOptions options, Encoding detectedEncoding) - { - return options?.Encoding != null - ? options.Encoding - : detectedEncoding ?? options?.DefaultEncoding ?? Encoding.Default; - } - - private static (int length, Encoding? detectedEncoding) DetectPreambleLength (Stream stream) - { - if (!stream.CanSeek) - { - return (0, null); - } - - var originalPos = stream.Position; - var buffer = new byte[4]; - _ = stream.Seek(0, SeekOrigin.Begin); - var readBytes = stream.Read(buffer, 0, buffer.Length); - _ = stream.Seek(originalPos, SeekOrigin.Begin); - - if (readBytes >= 2) - { - foreach (var encoding in _preambleEncodings) - { - var preamble = encoding.GetPreamble(); - var fail = false; - for (var i = 0; i < readBytes && i < preamble.Length; ++i) - { - if (buffer[i] != preamble[i]) - { - fail = true; - break; - } - } - - if (!fail) - { - return (preamble.Length, encoding); - } - } - } - - return (0, null); - } - - public bool TryReadLine (out ReadOnlyMemory lineMemory) - { - ObjectDisposedException.ThrowIf(IsDisposed, GetType()); - - var producerEx = Volatile.Read(ref _producerException); - if (producerEx != null) - { - throw new InvalidOperationException("Producer task encountered an error.", producerEx); - } - - if (!_lineQueue.TryTake(out var segment, 100, _cts?.Token ?? CancellationToken.None)) - { - lineMemory = default; - return false; - } - - // Store segment for lifetime management - _currentSegment?.Dispose(); - _currentSegment = segment; - - if (segment.IsEof) - { - lineMemory = default; - return false; - } - - lineMemory = new ReadOnlyMemory(segment.Buffer, 0, segment.Length); - _ = Interlocked.Exchange(ref _position, segment.ByteOffset + segment.ByteLength); - return true; - } - - public void ReturnMemory (ReadOnlyMemory memory) - { - throw new NotImplementedException(); - } - - /// - /// Represents a line segment with its position and metadata. - /// Uses ArrayPool for efficient char buffer management. - /// - private readonly struct LineSegment : IDisposable - { - /// - /// The rented char buffer from ArrayPool. May be larger than Length. - /// - public char[] Buffer { get; } - - /// - /// The actual length of the line content in the buffer. - /// - public int Length { get; } - - /// - /// The byte offset in the stream where this line starts. - /// - public long ByteOffset { get; } - - /// - /// The number of bytes consumed from the stream for this line (including newline). - /// - public int ByteLength { get; } - - /// - /// True if the line was truncated due to maximum line length constraint. - /// - public bool IsTruncated { get; } - - /// - /// True if this is an EOF marker segment. - /// - public bool IsEof { get; } - - public LineSegment (char[] buffer, int length, long byteOffset, int byteLength, bool isTruncated, bool isEof) - { - Buffer = buffer; - Length = length; - ByteOffset = byteOffset; - ByteLength = byteLength; - IsTruncated = isTruncated; - IsEof = isEof; - } - - public void Dispose () - { - if (Buffer != null) - { - ArrayPool.Shared.Return(Buffer); - } - } - - /// - /// Creates an EOF marker segment. - /// - public static LineSegment CreateEof (long byteOffset) - { - return new LineSegment(null, 0, byteOffset, 0, false, true); - } - } -} diff --git a/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderPipelineNew.cs b/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderPipelineNew.cs deleted file mode 100644 index e35b5951..00000000 --- a/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderPipelineNew.cs +++ /dev/null @@ -1,702 +0,0 @@ -using System.Buffers; -using System.Collections.Concurrent; -using System.IO.Pipelines; -using System.Text; - -using LogExpert.Core.Entities; -using LogExpert.Core.Interfaces; - -namespace LogExpert.Core.Classes.Log.Streamreaders; - -/// -/// EXPERIMENTAL: TypedPipeline-based reader for benchmarking comparison. -/// Uses multi-threaded pipeline with BlockingCollection stages. -/// Expected to be 15-25% slower than PositionAwareStreamReaderPipeline due to pipeline overhead. -/// -public class PositionAwareStreamReaderPipelineNew : LogStreamReaderBase, ILogStreamReaderMemory -{ - private const int DEFAULT_BYTE_BUFFER_SIZE = 64 * 1024; // 64 KB - private const int MINIMUM_READ_AHEAD_SIZE = 4 * 1024; // 4 KB - private const int DEFAULT_CHANNEL_CAPACITY = 128; // Number of line segments - - private static readonly Encoding[] _preambleEncodings = - [ - Encoding.UTF8, - Encoding.Unicode, - Encoding.BigEndianUnicode, - Encoding.UTF32 - ]; - - private readonly StreamPipeReaderOptions _streamPipeReaderOptions = new(bufferSize: DEFAULT_BYTE_BUFFER_SIZE, minimumReadSize: MINIMUM_READ_AHEAD_SIZE, leaveOpen: true); - private readonly int _maximumLineLength; - private readonly Lock _reconfigureLock = new(); - private readonly Stream _stream; - private readonly Encoding _encoding; - private readonly int _charBufferSize; - private readonly long _preambleLength; - - private LineSegment? _currentSegment; - private PipeReader _pipeReader; - private CancellationTokenSource _cts; - private Task _producerTask; - private bool _isDisposed; - private long _position; - private BlockingCollection _lineQueue; - private Exception _producerException; - private IPipeline _pipeline; - - public PositionAwareStreamReaderPipelineNew (Stream stream, EncodingOptions encodingOptions, int maximumLineLength) - { - ArgumentNullException.ThrowIfNull(stream); - - if (!stream.CanRead) - { - throw new ArgumentException("Stream must support reading.", nameof(stream)); - } - - if (!stream.CanSeek) - { - throw new ArgumentException("Stream must support seeking.", nameof(stream)); - } - - if (maximumLineLength <= 0) - { - maximumLineLength = 1024; - } - - _maximumLineLength = maximumLineLength; - var (length, detectedEncoding) = DetectPreambleLength(stream); - _preambleLength = length; - _encoding = DetermineEncoding(encodingOptions, detectedEncoding); - - _stream = stream; - _charBufferSize = Math.Max(_encoding.GetMaxCharCount(DEFAULT_BYTE_BUFFER_SIZE), _maximumLineLength + 2); - - RestartPipelineInternal(0); - } - - public override long Position - { - get => Interlocked.Read(ref _position); - set - { - ArgumentOutOfRangeException.ThrowIfNegative(value); - RestartPipeline(value); - } - } - - public override bool IsBufferComplete => true; - - public override Encoding Encoding => _encoding; - - public override bool IsDisposed - { - get => _isDisposed; - protected set => _isDisposed = value; - } - - public override int ReadChar () - { - throw new NotSupportedException("PipelineLogStreamReader currently supports line-based reads only."); - } - - public override string ReadLine () - { - if (TryReadLine(out var lineMemory)) - { - return new string(lineMemory.Span); - } - - return null; - } - - public bool TryReadLine (out ReadOnlyMemory lineMemory) - { - ObjectDisposedException.ThrowIf(IsDisposed, GetType()); - - var producerEx = Volatile.Read(ref _producerException); - if (producerEx != null) - { - throw new InvalidOperationException("Producer task encountered an error.", producerEx); - } - - var queue = _lineQueue; - var cts = _cts; - - if (queue == null || cts == null) - { - lineMemory = default; - return false; - } - - try - { - // With pre-filled queue, data should be available immediately - if (!queue.TryTake(out var segment, 50, cts.Token)) - { - lineMemory = default; - return false; - } - - _currentSegment?.Dispose(); - _currentSegment = segment; - - if (segment.IsEof) - { - lineMemory = default; - return false; - } - - lineMemory = new ReadOnlyMemory(segment.Buffer, 0, segment.Length); - _ = Interlocked.Exchange(ref _position, segment.ByteOffset + segment.ByteLength); - return true; - } - catch (OperationCanceledException) - { - lineMemory = default; - return false; - } - catch (ObjectDisposedException) - { - lineMemory = default; - return false; - } - } - - public void ReturnMemory (ReadOnlyMemory memory) - { - // No-op for this implementation - } - - protected override void Dispose (bool disposing) - { - if (_isDisposed) - { - return; - } - - if (disposing) - { - using (_reconfigureLock.EnterScope()) - { - CancelPipelineLocked(); - - if (_lineQueue != null) - { - while (_lineQueue.TryTake(out var segment)) - { - segment.Dispose(); - } - - _lineQueue.Dispose(); - } - - _stream?.Dispose(); - } - } - - _isDisposed = true; - } - - private void RestartPipelineInternal (long startPosition) - { - _ = _stream.Seek(_preambleLength + startPosition, SeekOrigin.Begin); - - _pipeReader = PipeReader.Create(_stream, _streamPipeReaderOptions); - _lineQueue = new BlockingCollection(new ConcurrentQueue(), DEFAULT_CHANNEL_CAPACITY); - - Volatile.Write(ref _producerException, null); - - // KEY FIX: Read and enqueue first line synchronously to guarantee immediate data availability - var firstLine = ReadFirstLineSynchronously(startPosition); - long nextByteOffset = startPosition; - - if (firstLine.HasValue) - { - _lineQueue.Add(firstLine.Value); - nextByteOffset = firstLine.Value.ByteOffset + firstLine.Value.ByteLength; - } - - _cts = new CancellationTokenSource(); - - // Build TypedPipeline - var builder = new TypedPipelineBuilder(); - _pipeline = builder.AddStep(ProcessBuffer).Build(); - - _pipeline.Finished += segment => - { - if (segment.Buffer != null || segment.IsEof) - { - EnqueueLine(segment); - } - }; - - _producerTask = Task.Run(() => ProduceAsync(nextByteOffset, _cts.Token), CancellationToken.None); - - _ = Interlocked.Exchange(ref _position, startPosition); - } - - /// - /// Reads the first line from the stream synchronously to pre-fill the queue. - /// This ensures data is immediately available when TryReadLine() is called after a seek. - /// - private LineSegment? ReadFirstLineSynchronously (long startPosition) - { - const int FIRST_LINE_BUFFER_SIZE = 4096; - - var byteBuffer = ArrayPool.Shared.Rent(FIRST_LINE_BUFFER_SIZE); - var charBuffer = ArrayPool.Shared.Rent(_charBufferSize); - - try - { - var bytesRead = _stream.Read(byteBuffer, 0, FIRST_LINE_BUFFER_SIZE); - - if (bytesRead == 0) - { - return LineSegment.CreateEof(startPosition); - } - - var decoder = _encoding.GetDecoder(); - var charsDecoded = decoder.GetChars(byteBuffer, 0, bytesRead, charBuffer, 0, flush: false); - - var (newlineIndex, newlineChars) = FindNewlineIndex(charBuffer, 0, charsDecoded, false); - - if (newlineIndex == -1) - { - return null; - } - - var segment = CreateSegment(charBuffer, 0, newlineIndex, newlineChars, startPosition); - - return segment; - } - catch (Exception ex) when (ex is IOException or - DecoderFallbackException or - ObjectDisposedException) - { - return null; - } - finally - { - ArrayPool.Shared.Return(byteBuffer); - ArrayPool.Shared.Return(charBuffer); - } - } - - private void RestartPipeline (long newPosition) - { - using (_reconfigureLock.EnterScope()) - { - CancelPipelineLocked(); - RestartPipelineInternal(newPosition); - } - } - - private void CancelPipelineLocked () - { - if (_cts == null) - { - return; - } - - try - { - _cts.Cancel(); - } - catch (ObjectDisposedException) - { - // Ignore - } - - try - { - _producerTask?.Wait(); - } - catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is OperationCanceledException)) - { - // Expected - } - finally - { - _pipeline?.Complete(); - _cts.Dispose(); - _cts = null; - } - - if (_lineQueue != null && !_lineQueue.IsAddingCompleted) - { - _lineQueue.CompleteAdding(); - } - - if (_pipeReader != null) - { - try - { - _pipeReader.Complete(); - } - catch (ObjectDisposedException) - { - // Ignore: shutdown race - } - catch (InvalidOperationException) - { - // Ignore: already completed or invalid teardown state - } - } - } - - private async Task ProduceAsync (long startByteOffset, CancellationToken token) - { - var charPool = ArrayPool.Shared; - char[] charBuffer = null; - Decoder decoder = null; - - try - { - charBuffer = charPool.Rent(_charBufferSize); - decoder = _encoding.GetDecoder(); - - var charsInBuffer = 0; - var byteOffset = startByteOffset; - - while (!token.IsCancellationRequested) - { - ReadResult result = await _pipeReader.ReadAsync(token).ConfigureAwait(false); - ReadOnlySequence buffer = result.Buffer; - - if (buffer.Length > 0) - { - // Create buffer data and feed to pipeline - var bufferData = new BufferData(buffer, charBuffer, charsInBuffer, decoder, byteOffset, result.IsCompleted); - - // Process and extract lines - var processResult = ProcessBufferAndExtractLines(bufferData); - - // Update state - charsInBuffer = processResult.RemainingChars; - byteOffset = processResult.NewByteOffset; - - var pipeline = _pipeline ?? throw new InvalidOperationException("Pipeline is not initialized."); - - // Feed each line to pipeline (which will enqueue them) - foreach (var segment in processResult.Lines) - { - - pipeline.Execute(new BufferData(default, segment.Buffer, 0, null, segment.ByteOffset, false) - { - PreExtractedSegment = segment - }); - } - - _pipeReader.AdvanceTo(buffer.End); - } - - if (result.IsCompleted) - { - if (charsInBuffer > 0) - { - var segment = CreateSegment(charBuffer, 0, charsInBuffer, 0, byteOffset); - EnqueueLine(segment); - byteOffset += segment.ByteLength; - } - - EnqueueLine(LineSegment.CreateEof(byteOffset)); - break; - } - } - } - catch (OperationCanceledException) - { - // Expected - } - catch (DecoderFallbackException ex) - { - Volatile.Write(ref _producerException, ex); - } - catch (ObjectDisposedException ex) - { - Volatile.Write(ref _producerException, ex); - } - catch (InvalidOperationException ex) - { - Volatile.Write(ref _producerException, ex); - } - catch - { - throw; - } - finally - { - _pipeline?.Complete(); - - try - { - _lineQueue?.CompleteAdding(); - } - catch (ObjectDisposedException) - { - // Ignore - } - - if (charBuffer != null) - { - charPool.Return(charBuffer); - } - } - } - - private LineSegment ProcessBuffer (BufferData bufferData) - { - // If pre-extracted, just return it (simulating pipeline overhead) - if (bufferData.PreExtractedSegment.HasValue) - { - return bufferData.PreExtractedSegment.Value; - } - - // This shouldn't happen in normal operation, but handle gracefully - // Return an empty line segment - return new LineSegment(null, 0, bufferData.ByteOffset, 0, false, false); - } - - private ProcessResult ProcessBufferAndExtractLines (BufferData bufferData) - { - var lines = new List(); - var localCharsInBuffer = bufferData.CharsInBuffer; - var localByteOffset = bufferData.ByteOffset; - - // Decode bytes to chars - if (bufferData.Buffer.IsSingleSegment) - { - var span = bufferData.Buffer.FirstSpan; - var charsAvailable = _charBufferSize - localCharsInBuffer; - - if (charsAvailable > 10) - { - bufferData.Decoder.Convert( - span, - bufferData.CharBuffer.AsSpan(localCharsInBuffer), - bufferData.IsCompleted, - out var usedBytes, - out var charsProduced, - out _); - - localCharsInBuffer += charsProduced; - localByteOffset += usedBytes; - } - } - - // Extract lines - var searchIndex = 0; - while (true) - { - var (newlineIndex, newlineChars) = FindNewlineIndex(bufferData.CharBuffer, searchIndex, localCharsInBuffer - searchIndex, false); - - if (newlineIndex == -1) - { - break; - } - - var lineLength = newlineIndex - searchIndex; - var segment = CreateSegment(bufferData.CharBuffer, searchIndex, lineLength, newlineChars, localByteOffset); - lines.Add(segment); - localByteOffset += segment.ByteLength; - searchIndex = newlineIndex + newlineChars; - } - - // Calculate remaining chars - var remaining = localCharsInBuffer - searchIndex; - if (remaining > 0 && searchIndex > 0) - { - bufferData.CharBuffer.AsSpan(searchIndex, remaining).CopyTo(bufferData.CharBuffer.AsSpan(0, remaining)); - } - - return new ProcessResult(lines, remaining, localByteOffset); - } - - private void EnqueueLine (LineSegment segment) - { - try - { - // Don't use cancellation token here - let the queue complete naturally - _lineQueue.Add(segment); - } - catch (InvalidOperationException) - { - // Collection was marked as complete, dispose the segment - segment.Dispose(); - } - } - - private static (int newLineIndex, int newLineChars) FindNewlineIndex ( - char[] buffer, - int start, - int available, - bool allowStandaloneCr) - { - var span = buffer.AsSpan(start, available); - - var lfIndex = span.IndexOf('\n'); - if (lfIndex != -1) - { - if (lfIndex > 0 && span[lfIndex - 1] == '\r') - { - return (newLineIndex: start + lfIndex - 1, newLineChars: 2); - } - - return (newLineIndex: start + lfIndex, newLineChars: 1); - } - - var crIndex = span.IndexOf('\r'); - if (crIndex != -1) - { - if (crIndex + 1 >= span.Length) - { - if (allowStandaloneCr) - { - return (newLineIndex: start + crIndex, newLineChars: 1); - } - - return (newLineIndex: -1, newLineChars: 0); - } - - if (span[crIndex + 1] != '\n') - { - return (newLineIndex: start + crIndex, newLineChars: 1); - } - } - - return (newLineIndex: -1, newLineChars: 0); - } - - private LineSegment CreateSegment ( - char[] source, - int start, - int lineLength, - int newlineChars, - long byteOffset) - { - var consumedChars = lineLength + newlineChars; - - var byteLength = consumedChars == 0 - ? 0 - : _encoding.GetByteCount(source, start, consumedChars); - - var logicalLength = Math.Min(lineLength, _maximumLineLength); - var truncated = lineLength > logicalLength; - - var rentalLength = Math.Max(logicalLength, 1); - var buffer = ArrayPool.Shared.Rent(rentalLength); - - if (logicalLength > 0) - { - source.AsSpan(start, logicalLength).CopyTo(buffer.AsSpan(0, logicalLength)); - } - - return new LineSegment(buffer, logicalLength, byteOffset, byteLength, truncated, false); - } - - // Pipeline data structures - private class BufferData - { - public ReadOnlySequence Buffer { get; } - public char[] CharBuffer { get; } - public int CharsInBuffer { get; } - public Decoder Decoder { get; } - public long ByteOffset { get; } - public bool IsCompleted { get; } - public LineSegment? PreExtractedSegment { get; set; } - - public BufferData (ReadOnlySequence buffer, char[] charBuffer, int charsInBuffer, Decoder decoder, long byteOffset, bool isCompleted) - { - Buffer = buffer; - CharBuffer = charBuffer; - CharsInBuffer = charsInBuffer; - Decoder = decoder; - ByteOffset = byteOffset; - IsCompleted = isCompleted; - } - } - - private record ProcessResult (List Lines, int RemainingChars, long NewByteOffset); - private record PipelineInput (ReadOnlySequence Buffer, char[] CharBuffer, int CharsInBuffer, Decoder Decoder, long ByteOffset, bool IsCompleted); - private record DecodeResult (char[] CharBuffer, int CharsInBuffer, long ByteOffset); - private record ExtractResult (List Lines); - - private readonly struct LineSegment : IDisposable - { - public char[] Buffer { get; } - public int Length { get; } - public long ByteOffset { get; } - public int ByteLength { get; } - public bool IsTruncated { get; } - public bool IsEof { get; } - - public LineSegment (char[] buffer, int length, long byteOffset, int byteLength, bool isTruncated, bool isEof) - { - Buffer = buffer; - Length = length; - ByteOffset = byteOffset; - ByteLength = byteLength; - IsTruncated = isTruncated; - IsEof = isEof; - } - - public void Dispose () - { - if (Buffer != null) - { - ArrayPool.Shared.Return(Buffer); - } - } - - public static LineSegment CreateEof (long byteOffset) - { - return new LineSegment(null, 0, byteOffset, 0, false, true); - } - } - - private static Encoding DetermineEncoding (EncodingOptions options, Encoding detectedEncoding) - { - return options?.Encoding != null - ? options.Encoding - : detectedEncoding ?? options?.DefaultEncoding ?? Encoding.Default; - } - - private static (int length, Encoding? detectedEncoding) DetectPreambleLength (Stream stream) - { - if (!stream.CanSeek) - { - return (0, null); - } - - var originalPos = stream.Position; - var buffer = new byte[4]; - _ = stream.Seek(0, SeekOrigin.Begin); - var readBytes = stream.Read(buffer, 0, buffer.Length); - _ = stream.Seek(originalPos, SeekOrigin.Begin); - - if (readBytes >= 2) - { - foreach (var encoding in _preambleEncodings) - { - var preamble = encoding.GetPreamble(); - var fail = false; - for (var i = 0; i < readBytes && i < preamble.Length; ++i) - { - if (buffer[i] != preamble[i]) - { - fail = true; - break; - } - } - - if (!fail) - { - return (preamble.Length, encoding); - } - } - } - - return (0, null); - } -} diff --git a/src/LogExpert.Core/Interfaces/ILogStreamReader.cs b/src/LogExpert.Core/Interfaces/ILogStreamReader.cs index b5782d67..8c883798 100644 --- a/src/LogExpert.Core/Interfaces/ILogStreamReader.cs +++ b/src/LogExpert.Core/Interfaces/ILogStreamReader.cs @@ -15,9 +15,9 @@ namespace LogExpert.Core.Interfaces; /// /// Implementations include: /// -/// PositionAwareStreamReaderLegacy - Character-by-character reading for precise position control -/// PositionAwareStreamReaderSystem - Uses .NET's StreamReader.ReadLine() for improved performance -/// PositionAwareStreamReaderPipeline - Modern async pipeline-based implementation using System.IO.Pipelines +/// PositionAwareStreamReaderDirect - Default. Reads decoded chars into pooled blocks and returns zero-copy slices; fastest, assumes \n/\r\n line endings +/// PositionAwareStreamReaderSystem - Uses .NET's StreamReader.ReadLine(); also splits bare \r (classic-Mac) line endings +/// PositionAwareStreamReaderLegacy - Character-by-character reading; exact byte position on mixed/pathological line endings, slowest /// XmlLogReader - Decorator for reading structured XML log blocks (e.g., Log4j XML format) /// /// @@ -109,8 +109,8 @@ public interface ILogStreamReader : IDisposable /// the character. /// /// - /// Some implementations (like PositionAwareStreamReaderPipeline) may not support this method - /// and will throw as they are optimized for line-based reading only. + /// An implementation optimized purely for line-based reading may not support this method + /// and will throw . /// /// /// The reader has been disposed. diff --git a/src/LogExpert.Tests/StreamReaderTests/LogStreamReaderTest.cs b/src/LogExpert.Tests/StreamReaderTests/LogStreamReaderTest.cs index dcbd8a06..b2788ee3 100644 --- a/src/LogExpert.Tests/StreamReaderTests/LogStreamReaderTest.cs +++ b/src/LogExpert.Tests/StreamReaderTests/LogStreamReaderTest.cs @@ -100,152 +100,6 @@ public void CountLinesWithLegacyNewLine (string text, int expectedLines) Assert.That(expectedLines, Is.EqualTo(lineCount), $"Unexpected lines:\n{text}"); } - [Test] - [TestCase("Line 1\nLine 2\nLine 3", 3)] - [TestCase("Line 1\nLine 2\nLine 3\n", 3)] - [TestCase("Line 1\r\nLine 2\r\nLine 3", 3)] - [TestCase("Line 1\r\nLine 2\r\nLine 3\r\n", 3)] - [TestCase("Line 1\rLine 2\rLine 3", 3)] - [TestCase("Line 1\rLine 2\rLine 3\r", 3)] - public void ReadLinesWithPipelineNewLine (string text, int expectedLines) - { - using var stream = new MemoryStream(Encoding.ASCII.GetBytes(text)); - using var reader = new PositionAwareStreamReaderPipeline(stream, new EncodingOptions(), 500); - var lineCount = 0; - while (true) - { - var line = reader.ReadLine(); - if (line == null) - { - break; - } - - lineCount += 1; - - Assert.That(line.StartsWith($"Line {lineCount}", StringComparison.OrdinalIgnoreCase), $"Invalid line: {line}"); - } - - Assert.That(expectedLines, Is.EqualTo(lineCount), $"Unexpected lines:\n{text}"); - } - - [Test] - [TestCase("\n\n\n", 3)] - [TestCase("\r\n\r\n\r\n", 3)] - [TestCase("\r\r\r", 3)] - public void CountLinesWithPipelineNewLine (string text, int expectedLines) - { - using var stream = new MemoryStream(Encoding.ASCII.GetBytes(text)); - using var reader = new PositionAwareStreamReaderPipeline(stream, new EncodingOptions(), 500); - var lineCount = 0; - while (reader.ReadLine() != null) - { - lineCount += 1; - } - - Assert.That(expectedLines, Is.EqualTo(lineCount), $"Unexpected lines:\n{text}"); - } - - [Test] - public void PipelineReaderShouldTrackPositionCorrectly () - { - var text = "Line 1\nLine 2\nLine 3\n"; - using var stream = new MemoryStream(Encoding.UTF8.GetBytes(text)); - using var reader = new PositionAwareStreamReaderPipeline(stream, new EncodingOptions(), 500); - - var line1 = reader.ReadLine(); - var pos1 = reader.Position; - Assert.That(line1, Is.EqualTo("Line 1")); - Assert.That(pos1, Is.EqualTo(7)); // "Line 1\n" = 7 bytes - - var line2 = reader.ReadLine(); - var pos2 = reader.Position; - Assert.That(line2, Is.EqualTo("Line 2")); - Assert.That(pos2, Is.EqualTo(14)); // 7 + "Line 2\n" = 14 bytes - - var line3 = reader.ReadLine(); - var pos3 = reader.Position; - Assert.That(line3, Is.EqualTo("Line 3")); - Assert.That(pos3, Is.EqualTo(21)); // 14 + "Line 3\n" = 21 bytes - } - - [Test] - public void PipelineReaderShouldSupportSeeking () - { - var text = "Line 1\nLine 2\nLine 3\n"; - using var stream = new MemoryStream(Encoding.UTF8.GetBytes(text)); - using var reader = new PositionAwareStreamReaderPipeline(stream, new EncodingOptions(), 500); - - // Read first line - var line1 = reader.ReadLine(); - Assert.That(line1, Is.EqualTo("Line 1")); - - // Seek back to beginning - reader.Position = 0; - - // Should read first line again - var line1Again = reader.ReadLine(); - Assert.That(line1Again, Is.EqualTo("Line 1")); - - // Seek to middle - reader.Position = 7; // After "Line 1\n" - - // Should read second line - var line2 = reader.ReadLine(); - Assert.That(line2, Is.EqualTo("Line 2")); - } - - [Test] - public void PipelineReaderShouldHandleMaximumLineLength () - { - var longLine = new string('X', 1000); - var text = $"{longLine}\nShort line\n"; - using var stream = new MemoryStream(Encoding.UTF8.GetBytes(text)); - using var reader = new PositionAwareStreamReaderPipeline(stream, new EncodingOptions(), 100); - - // First line should be truncated to 100 chars - var line1 = reader.ReadLine(); - Assert.That(line1, Has.Length.EqualTo(100)); - Assert.That(line1, Is.EqualTo(new string('X', 100))); - - // Second line should be normal - var line2 = reader.ReadLine(); - Assert.That(line2, Is.EqualTo("Short line")); - } - - [Test] - public void PipelineReaderShouldHandleUnicode () - { - var text = "Hello 世界\nСпасибо\n"; - using var stream = new MemoryStream(Encoding.UTF8.GetBytes(text)); - using var reader = new PositionAwareStreamReaderPipeline(stream, new EncodingOptions { Encoding = Encoding.UTF8 }, 500); - - var line1 = reader.ReadLine(); - Assert.That(line1, Is.EqualTo("Hello 世界")); - - var line2 = reader.ReadLine(); - Assert.That(line2, Is.EqualTo("Спасибо")); - } - - [Test] - public void PipelineReaderShouldHandleEmptyLines () - { - var text = "Line 1\n\nLine 3\n"; - using var stream = new MemoryStream(Encoding.UTF8.GetBytes(text)); - using var reader = new PositionAwareStreamReaderPipeline(stream, new EncodingOptions(), 500); - - var line1 = reader.ReadLine(); - Assert.That(line1, Is.EqualTo("Line 1")); - - var line2 = reader.ReadLine(); - Assert.That(line2, Is.EqualTo("")); - - var line3 = reader.ReadLine(); - Assert.That(line3, Is.EqualTo("Line 3")); - - var eof = reader.ReadLine(); - Assert.That(eof, Is.Null); - } - [Test] [TestCase("Line 1\nLine 2\nLine 3", 3)] [TestCase("Line 1\nLine 2\nLine 3\n", 3)]