diff --git a/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderChannel.cs b/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderChannel.cs deleted file mode 100644 index 8edc6cf8..00000000 --- a/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderChannel.cs +++ /dev/null @@ -1,754 +0,0 @@ -using System.Buffers; -using System.Collections.Concurrent; -using System.IO.Pipelines; -using System.Text; -using System.Threading.Channels; - -using LogExpert.Core.Entities; -using LogExpert.Core.Interfaces; - -namespace LogExpert.Core.Classes.Log.Streamreaders; - -public class PositionAwareStreamReaderChannel : LogStreamReaderBase, ILogStreamReaderMemory -{ - private const int DEFAULT_BYTE_BUFFER_SIZE = 64 * 1024; // 64 KB - private const int MINIMUM_READ_AHEAD_SIZE = 4 * 1024; // 4 KB - private const int DEFAULT_CHANNEL_CAPACITY = 128; // Number of line segments - - private static readonly Encoding[] _preambleEncodings = - [ - Encoding.UTF8, - Encoding.Unicode, - Encoding.BigEndianUnicode, - Encoding.UTF32 - ]; - - private readonly StreamPipeReaderOptions _streamPipeReaderOptions = new(bufferSize: DEFAULT_BYTE_BUFFER_SIZE, minimumReadSize: MINIMUM_READ_AHEAD_SIZE, leaveOpen: true); - private readonly int _maximumLineLength; - private readonly Lock _reconfigureLock = new(); - private readonly BufferedStream _stream; - private readonly Encoding _encoding; - private readonly int _byteBufferSize; - private readonly int _charBufferSize; - private readonly long _preambleLength; - - private Channel _lineChannel; - private ChannelReader _reader; - private ChannelWriter _writer; - - private LineSegment? _currentSegment; - - private PipeReader _pipeReader; - private CancellationTokenSource _cts; - private Task _producerTask; - private bool _isDisposed; - private long _position; - - // Line queue - using BlockingCollection for thread-safe, race-free synchronization - private BlockingCollection _lineQueue; - private Exception _producerException; - - public PositionAwareStreamReaderChannel (Stream stream, EncodingOptions encodingOptions, int maximumLineLength) - { - ArgumentNullException.ThrowIfNull(stream); - - if (!stream.CanRead) - { - throw new ArgumentException("Stream must support reading.", nameof(stream)); - } - - if (!stream.CanSeek) - { - throw new ArgumentException("Stream must support seeking.", nameof(stream)); - } - - if (maximumLineLength <= 0) - { - maximumLineLength = 1024; - } - - _stream = new BufferedStream(stream); - - _maximumLineLength = maximumLineLength; - - _byteBufferSize = DEFAULT_BYTE_BUFFER_SIZE; - var (length, detectedEncoding) = DetectPreambleLength(stream); - _preambleLength = length; - _encoding = DetermineEncoding(encodingOptions, detectedEncoding); - - - _charBufferSize = Math.Max(_encoding.GetMaxCharCount(_byteBufferSize), _maximumLineLength + 2); - - // Start the pipeline (will create the collection) - RestartPipelineInternal(0); - } - - public override long Position - { - get => Interlocked.Read(ref _position); - set - { - ArgumentOutOfRangeException.ThrowIfNegative(value); - RestartPipeline(value); - } - } - - public override bool IsBufferComplete => true; - - public override Encoding Encoding => _encoding; - - public override bool IsDisposed - { - get => _isDisposed; - protected set => _isDisposed = value; - } - - public override int ReadChar () - { - throw new NotSupportedException("PipelineLogStreamReader currently supports line-based reads only."); - } - - public static Encoding DetermineEncoding (EncodingOptions options, Encoding detectedEncoding) - { - return options?.Encoding != null - ? options.Encoding - : detectedEncoding ?? options?.DefaultEncoding ?? Encoding.Default; - } - - public override string ReadLine () - { - ObjectDisposedException.ThrowIf(IsDisposed, GetType()); - - var producerEx = Volatile.Read(ref _producerException); - if (producerEx != null) - { - throw new InvalidOperationException("Producer task encountered an error.", producerEx); - } - - LineSegment segment; - try - { - //Channel read (more efficient than BlockingCollection) - if (!_reader.TryRead(out segment)) - { - // Use async path if not immediately available - var task = _reader.ReadAsync(_cts?.Token ?? CancellationToken.None); - segment = !task.IsCompleted - ? task.AsTask().GetAwaiter().GetResult() - : task.GetAwaiter().GetResult(); - } - } - catch (OperationCanceledException) - { - return null; - } - catch (ChannelClosedException) - { - return null; - } - - using (segment) - { - if (segment.IsEof) - { - return null; - } - - var line = new string(segment.Buffer, 0, segment.Length); - _ = Interlocked.Exchange(ref _position, segment.ByteOffset + segment.ByteLength); - return line; - } - } - - protected override void Dispose (bool disposing) - { - if (_isDisposed) - { - return; - } - - if (disposing) - { - using (_reconfigureLock.EnterScope()) - { - CancelPipelineLocked(); - - // Clean up remaining items and dispose collection - if (_lineQueue != null) - { - while (_lineQueue.TryTake(out var segment)) - { - segment.Dispose(); - } - - _lineQueue.Dispose(); - } - - _stream?.Dispose(); - } - } - - _isDisposed = true; - } - - private void RestartPipelineInternal (long startPosition) - { - // Seek stream to start position (accounting for preamble) - _ = _stream.Seek(_preambleLength + startPosition, SeekOrigin.Begin); - - // Create PipeReader - _pipeReader = PipeReader.Create(_stream, _streamPipeReaderOptions); - - // CRITICAL: Create a NEW BlockingCollection instance - // Once CompleteAdding() is called, a BlockingCollection cannot be reused - _lineQueue = new BlockingCollection(new ConcurrentQueue(), DEFAULT_CHANNEL_CAPACITY); - - Volatile.Write(ref _producerException, null); - - // Create cancellation token - _cts = new CancellationTokenSource(); - - _lineChannel = Channel.CreateBounded(new BoundedChannelOptions(128) - { - FullMode = BoundedChannelFullMode.Wait, - SingleReader = true, - SingleWriter = true - }); - - _reader = _lineChannel.Reader; - _writer = _lineChannel.Writer; - - // Start producer task - _producerTask = Task.Run(() => ProduceAsync(startPosition, _cts.Token), CancellationToken.None); - - // Update position - _ = Interlocked.Exchange(ref _position, startPosition); - } - - private void RestartPipeline (long newPosition) - { - using (_reconfigureLock.EnterScope()) - { - CancelPipelineLocked(); - RestartPipelineInternal(newPosition); - } - } - - /// - /// Cancels the current pipeline operation and releases associated resources. This method should be called while - /// holding the appropriate lock to ensure thread safety. - /// - /// This method cancels any ongoing producer task, marks the internal queue as complete to - /// unblock waiting consumers, and disposes of pipeline resources. It is intended for internal use and must be - /// invoked only when the pipeline is in a valid state for cancellation. - private void CancelPipelineLocked () - { - if (_cts == null) - { - return; - } - - try - { - _cts.Cancel(); - } - catch (ObjectDisposedException) - { - // Ignore if already disposed - } - - try - { - _producerTask?.Wait(); - } - catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is OperationCanceledException)) - { - // Expected cancellation - } - finally - { - _cts.Dispose(); - _cts = null; - } - - _writer?.Complete(); - - // Mark collection as complete to unblock any waiting Take() calls - // This must happen AFTER the producer task is cancelled and finished - if (_lineQueue != null && !_lineQueue.IsAddingCompleted) - { - _lineQueue.CompleteAdding(); - } - - // Complete and dispose the PipeReader - if (_pipeReader != null) - { - try - { - _pipeReader.Complete(); - } - catch (Exception) - { - // Ignore errors during completion - } - } - } - - private async Task ProduceAsync (long startByteOffset, CancellationToken token) - { - var charPool = ArrayPool.Shared; - char[] charBuffer = null; - Decoder decoder = null; - - try - { - // Allocate char buffer - charBuffer = charPool.Rent(_charBufferSize); - decoder = _encoding.GetDecoder(); - - var charsInBuffer = 0; - var byteOffset = startByteOffset; - - while (!token.IsCancellationRequested) - { - // Read from pipe - ReadResult result = await _pipeReader.ReadAsync(token).ConfigureAwait(false); - ReadOnlySequence buffer = result.Buffer; - - if (buffer.Length > 0) - { - // Process the buffer - decode and extract lines - var state = ProcessBuffer(buffer, charBuffer, charsInBuffer, decoder, byteOffset, result.IsCompleted); - charsInBuffer = state.charsInBuffer; - byteOffset = state.byteOffset; - - // Advance the reader - _pipeReader.AdvanceTo(buffer.End); - } - - if (result.IsCompleted) - { - // Handle any remaining chars in buffer as final line - if (charsInBuffer > 0) - { - var segment = CreateSegment(charBuffer, 0, charsInBuffer, 0, byteOffset); - EnqueueLine(segment); - byteOffset += segment.ByteLength; - } - - // Send EOF marker - EnqueueLine(LineSegment.CreateEof(byteOffset)); - break; - } - } - } - catch (OperationCanceledException) - { - // Expected when Position is changed or disposed - } - catch (Exception ex) - { - // Store exception to rethrow in ReadLine - Volatile.Write(ref _producerException, ex); - } - finally - { - // Always mark collection as complete when producer finishes - try - { - _lineQueue?.CompleteAdding(); - } - catch (ObjectDisposedException) - { - // Collection was already disposed - } - - if (charBuffer != null) - { - charPool.Return(charBuffer); - } - } - } - - private void EnqueueLine (LineSegment segment) - { - try - { - if (!_writer.TryWrite(segment)) - { - // Use async path if buffer is full - _writer.WriteAsync(segment, _cts.Token).AsTask().GetAwaiter().GetResult(); - } - } - catch (ChannelClosedException) - { - // Collection was marked as complete, dispose the segment - segment.Dispose(); - } - } - - private (int charsInBuffer, long byteOffset) ProcessBuffer ( - ReadOnlySequence buffer, - char[] charBuffer, - int charsInBuffer, - Decoder decoder, - long byteOffset, - bool isCompleted) - { - var localByteOffset = byteOffset; - var localCharsInBuffer = charsInBuffer; - - // Decode bytes to chars - if (buffer.IsSingleSegment) - { - // Fast path for single segment - var span = buffer.FirstSpan; - (localCharsInBuffer, localByteOffset) = DecodeAndProcessSegment(span, charBuffer, localCharsInBuffer, decoder, localByteOffset, isCompleted); - } - else - { - // Slow path for multi-segment - foreach (var segment in buffer) - { - (localCharsInBuffer, localByteOffset) = DecodeAndProcessSegment(segment.Span, charBuffer, localCharsInBuffer, decoder, localByteOffset, false); - } - - if (isCompleted) - { - // Flush decoder on completion - decoder.Convert([], 0, 0, charBuffer, localCharsInBuffer, - _charBufferSize - localCharsInBuffer, true, - out _, out var charsProduced, out _); - localCharsInBuffer += charsProduced; - } - } - - // Scan for complete lines - var searchIndex = 0; - while (true) - { - var (newlineIndex, newlineChars) = FindNewlineIndex(charBuffer, searchIndex, localCharsInBuffer - searchIndex, isCompleted); - - if (newlineIndex == -1) - { - break; - } - - var lineLength = newlineIndex - searchIndex; - var segment = CreateSegment(charBuffer, searchIndex, lineLength, newlineChars, localByteOffset); - localByteOffset += segment.ByteLength; - EnqueueLine(segment); - searchIndex = newlineIndex + newlineChars; - } - - // Move remaining chars to beginning of buffer - var remaining = localCharsInBuffer - searchIndex; - if (remaining > 0 && searchIndex > 0) - { - charBuffer.AsSpan(searchIndex, remaining).CopyTo(charBuffer.AsSpan(0, remaining)); - //Array.Copy(charBuffer, searchIndex, charBuffer, 0, remaining); - } - - return (remaining, localByteOffset); - } - - private (int charsInBuffer, long byteOffset) DecodeAndProcessSegment (ReadOnlySpan bytes, char[] charBuffer, int charsInBuffer, Decoder decoder, long byteOffset, bool flush) - { - var bytesConsumed = 0; - - while (bytesConsumed < bytes.Length) - { - var charsAvailable = _charBufferSize - charsInBuffer; - - // CRITICAL FIX: Process lines when buffer is getting full - if (charsAvailable < 100) // Leave room for multi-byte sequences - { - // Process lines to free up space - var searchIndex = 0; - while (searchIndex < charsInBuffer) - { - var available = charsInBuffer - searchIndex; - var (newlineIndex, newlineChars) = FindNewlineIndex(charBuffer, searchIndex, available, false); - - if (newlineIndex == -1) - { - // No more complete lines found - var remaining = charsInBuffer - searchIndex; - if (remaining > 0 && searchIndex > 0) - { - charBuffer.AsSpan(searchIndex, remaining).CopyTo(charBuffer.AsSpan(0, remaining)); - //Array.Copy(charBuffer, searchIndex, charBuffer, 0, remaining); - } - - charsInBuffer = remaining; - break; - } - - // Found a line - create and enqueue it - var lineLength = newlineIndex - searchIndex; - var segment = CreateSegment(charBuffer, searchIndex, lineLength, newlineChars, byteOffset); - byteOffset += segment.ByteLength; - EnqueueLine(segment); - searchIndex = newlineIndex + newlineChars; - } - - // If still no space, force process current content as truncated line - if (charsInBuffer >= _charBufferSize - 100 && charsInBuffer > 0) - { - var segment = CreateSegment(charBuffer, 0, charsInBuffer, 0, byteOffset); - byteOffset += segment.ByteLength; - EnqueueLine(segment); - charsInBuffer = 0; - } - - charsAvailable = _charBufferSize - charsInBuffer; - - if (charsAvailable < 10) - { - // Still no space - exit to avoid infinite loop - break; - } - } - - decoder.Convert( - bytes[bytesConsumed..], - charBuffer.AsSpan(charsInBuffer), - flush && bytesConsumed == bytes.Length, - out var usedBytes, - out var charsProduced, - out _); - - bytesConsumed += usedBytes; - charsInBuffer += charsProduced; - } - - return (charsInBuffer, byteOffset); - } - - /// - /// Finds the next newline in the char buffer. - /// Handles \r, \n, and \r\n as newline delimiters. - /// - /// The char buffer to search - /// Start index for search - /// Number of chars available to search - /// If true, treats \r at end of buffer as newline - /// Tuple of (newline index, newline char count) - private static (int newLineIndex, int newLineChars) FindNewlineIndex ( - char[] buffer, - int start, - int available, - bool allowStandaloneCr) - { - var span = buffer.AsSpan(start, available); - - //Vectorized Search for \n - var lfIndex = span.IndexOf('\n'); - if (lfIndex != -1) - { - // Found \n - check if preceded by \r - if (lfIndex > 0 && span[lfIndex - 1] == '\r') - { - return (newLineIndex: start + lfIndex - 1, newLineChars: 2); - } - - return (newLineIndex: start + lfIndex, newLineChars: 1); - } - - //Vectorized search for \r - var crIndex = span.IndexOf('\r'); - if (crIndex != -1) - { - // Check if at end of buffer - if (crIndex + 1 >= span.Length) - { - if (allowStandaloneCr) - { - return (newLineIndex: start + crIndex, newLineChars: 1); - } - - return (newLineIndex: -1, newLineChars: 0); - } - - // Check next char - if (span[crIndex + 1] != '\n') - { - return (newLineIndex: start + crIndex, newLineChars: 1); - } - } - - return (newLineIndex: -1, newLineChars: 0); - } - - /// - /// Creates a LineSegment from the char buffer, handling truncation. - /// - private LineSegment CreateSegment ( - char[] source, - int start, - int lineLength, - int newlineChars, - long byteOffset) - { - var consumedChars = lineLength + newlineChars; - - // Calculate byte length for position tracking - var byteLength = consumedChars == 0 - ? 0 - : _encoding.GetByteCount(source, start, consumedChars); - - // Apply maximum line length constraint - var logicalLength = Math.Min(lineLength, _maximumLineLength); - var truncated = lineLength > logicalLength; - - // Rent buffer from pool (ensure at least size 1) - var rentalLength = Math.Max(logicalLength, 1); - var buffer = ArrayPool.Shared.Rent(rentalLength); - - // Copy line content (excluding newline) - if (logicalLength > 0) - { - source.AsSpan(start, logicalLength).CopyTo(buffer.AsSpan(0, logicalLength)); - //Array.Copy(source, start, buffer, 0, logicalLength); - } - - return new LineSegment(buffer, logicalLength, byteOffset, byteLength, truncated, false); - } - - private static (int length, Encoding? detectedEncoding) DetectPreambleLength (Stream stream) - { - if (!stream.CanSeek) - { - return (0, null); - } - - var originalPos = stream.Position; - var buffer = new byte[4]; - _ = stream.Seek(0, SeekOrigin.Begin); - var readBytes = stream.Read(buffer, 0, buffer.Length); - _ = stream.Seek(originalPos, SeekOrigin.Begin); - - if (readBytes >= 2) - { - foreach (var encoding in _preambleEncodings) - { - var preamble = encoding.GetPreamble(); - var fail = false; - for (var i = 0; i < readBytes && i < preamble.Length; ++i) - { - if (buffer[i] != preamble[i]) - { - fail = true; - break; - } - } - - if (!fail) - { - return (preamble.Length, encoding); - } - } - } - - return (0, null); - } - - public bool TryReadLine (out ReadOnlyMemory lineMemory) - { - ObjectDisposedException.ThrowIf(IsDisposed, GetType()); - - var producerEx = Volatile.Read(ref _producerException); - if (producerEx != null) - { - throw new InvalidOperationException("Producer task encountered an error.", producerEx); - } - - if (!_lineQueue.TryTake(out var segment, 100, _cts?.Token ?? CancellationToken.None)) - { - lineMemory = default; - return false; - } - - // Store segment for lifetime management - _currentSegment?.Dispose(); - _currentSegment = segment; - - if (segment.IsEof) - { - lineMemory = default; - return false; - } - - lineMemory = new ReadOnlyMemory(segment.Buffer, 0, segment.Length); - _ = Interlocked.Exchange(ref _position, segment.ByteOffset + segment.ByteLength); - return true; - } - - public void ReturnMemory (ReadOnlyMemory memory) - { - throw new NotImplementedException(); - } - - /// - /// Represents a line segment with its position and metadata. - /// Uses ArrayPool for efficient char buffer management. - /// - private readonly struct LineSegment : IDisposable - { - /// - /// The rented char buffer from ArrayPool. May be larger than Length. - /// - public char[] Buffer { get; } - - /// - /// The actual length of the line content in the buffer. - /// - public int Length { get; } - - /// - /// The byte offset in the stream where this line starts. - /// - public long ByteOffset { get; } - - /// - /// The number of bytes consumed from the stream for this line (including newline). - /// - public int ByteLength { get; } - - /// - /// True if the line was truncated due to maximum line length constraint. - /// - public bool IsTruncated { get; } - - /// - /// True if this is an EOF marker segment. - /// - public bool IsEof { get; } - - public LineSegment (char[] buffer, int length, long byteOffset, int byteLength, bool isTruncated, bool isEof) - { - Buffer = buffer; - Length = length; - ByteOffset = byteOffset; - ByteLength = byteLength; - IsTruncated = isTruncated; - IsEof = isEof; - } - - public void Dispose () - { - if (Buffer != null) - { - ArrayPool.Shared.Return(Buffer); - } - } - - /// - /// Creates an EOF marker segment. - /// - public static LineSegment CreateEof (long byteOffset) - { - return new LineSegment(null, 0, byteOffset, 0, false, true); - } - } -} diff --git a/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderPipeline.cs b/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderPipeline.cs deleted file mode 100644 index b85fd82f..00000000 --- a/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderPipeline.cs +++ /dev/null @@ -1,730 +0,0 @@ -using System.Buffers; -using System.Collections.Concurrent; -using System.IO.Pipelines; -using System.Text; - -using LogExpert.Core.Entities; -using LogExpert.Core.Interfaces; - -namespace LogExpert.Core.Classes.Log.Streamreaders; - -public class PositionAwareStreamReaderPipeline : LogStreamReaderBase, ILogStreamReaderMemory -{ - private const int DEFAULT_BYTE_BUFFER_SIZE = 64 * 1024; // 64 KB - private const int MINIMUM_READ_AHEAD_SIZE = 4 * 1024; // 4 KB - private const int DEFAULT_CHANNEL_CAPACITY = 128; // Number of line segments - - private static readonly Encoding[] _preambleEncodings = - [ - Encoding.UTF8, - Encoding.Unicode, - Encoding.BigEndianUnicode, - Encoding.UTF32 - ]; - - private readonly StreamPipeReaderOptions _streamPipeReaderOptions = new(bufferSize: DEFAULT_BYTE_BUFFER_SIZE, minimumReadSize: MINIMUM_READ_AHEAD_SIZE, leaveOpen: true); - private readonly int _maximumLineLength; - private readonly Lock _reconfigureLock = new(); - private readonly Stream _stream; - private readonly Encoding _encoding; - private readonly int _byteBufferSize; - private readonly int _charBufferSize; - private readonly long _preambleLength; - - private LineSegment? _currentSegment; - - private PipeReader _pipeReader; - private CancellationTokenSource _cts; - private Task _producerTask; - private bool _isDisposed; - private long _position; - - // Line queue - using BlockingCollection for thread-safe, race-free synchronization - private BlockingCollection _lineQueue; - private Exception _producerException; - - public PositionAwareStreamReaderPipeline (Stream stream, EncodingOptions encodingOptions, int maximumLineLength) - { - ArgumentNullException.ThrowIfNull(stream); - - if (!stream.CanRead) - { - throw new ArgumentException("Stream must support reading.", nameof(stream)); - } - - if (!stream.CanSeek) - { - throw new ArgumentException("Stream must support seeking.", nameof(stream)); - } - - if (maximumLineLength <= 0) - { - maximumLineLength = 1024; - } - - _maximumLineLength = maximumLineLength; - _byteBufferSize = DEFAULT_BYTE_BUFFER_SIZE; - var (length, detectedEncoding) = DetectPreambleLength(stream); - _preambleLength = length; - _encoding = DetermineEncoding(encodingOptions, detectedEncoding); - - _stream = stream; - _charBufferSize = Math.Max(_encoding.GetMaxCharCount(_byteBufferSize), _maximumLineLength + 2); - - // Start the pipeline (will create the collection) - RestartPipelineInternal(0); - } - - public override long Position - { - get => Interlocked.Read(ref _position); - set - { - ArgumentOutOfRangeException.ThrowIfNegative(value); - RestartPipeline(value); - } - } - - public override bool IsBufferComplete => true; - - public override Encoding Encoding => _encoding; - - public override bool IsDisposed - { - get => _isDisposed; - protected set => _isDisposed = value; - } - - public override int ReadChar () - { - throw new NotSupportedException("PipelineLogStreamReader currently supports line-based reads only."); - } - - public override string ReadLine () - { - if (TryReadLine(out var lineMemory)) - { - return new string(lineMemory.Span); // Only allocate when explicitly requested - } - - return null; - - //ObjectDisposedException.ThrowIf(IsDisposed, GetType()); - - //// Check for producer exception - //var producerEx = Volatile.Read(ref _producerException); - //if (producerEx != null) - //{ - // throw new InvalidOperationException("Producer task encountered an error.", producerEx); - //} - - //LineSegment segment; - //try - //{ - // // BlockingCollection.Take() blocks until an item is available or collection is completed - // // This eliminates the race condition present in the semaphore + queue approach - // segment = _lineQueue.Take(_cts?.Token ?? CancellationToken.None); - //} - //catch (OperationCanceledException) - //{ - // return null; - //} - //catch (InvalidOperationException) // Thrown when collection is marked as completed and empty - //{ - // return null; - //} - - //using (segment) - //{ - // if (segment.IsEof) - // { - // return null; - // } - - // var line = new string(segment.Buffer, 0, segment.Length); - // _ = Interlocked.Exchange(ref _position, segment.ByteOffset + segment.ByteLength); - // return line; - //} - } - - protected override void Dispose (bool disposing) - { - if (_isDisposed) - { - return; - } - - if (disposing) - { - using (_reconfigureLock.EnterScope()) - { - CancelPipelineLocked(); - - // Clean up remaining items and dispose collection - if (_lineQueue != null) - { - while (_lineQueue.TryTake(out var segment)) - { - segment.Dispose(); - } - - _lineQueue.Dispose(); - } - - _stream?.Dispose(); - } - } - - _isDisposed = true; - } - - private void RestartPipelineInternal (long startPosition) - { - // Seek stream to start position (accounting for preamble) - _ = _stream.Seek(_preambleLength + startPosition, SeekOrigin.Begin); - - // Create PipeReader - _pipeReader = PipeReader.Create(_stream, _streamPipeReaderOptions); - - _lineQueue = new BlockingCollection(new ConcurrentQueue(), DEFAULT_CHANNEL_CAPACITY); - - Volatile.Write(ref _producerException, null); - - // Create cancellation token - _cts = new CancellationTokenSource(); - - // Start producer task - _producerTask = Task.Run(() => ProduceAsync(startPosition, _cts.Token), CancellationToken.None); - - // Update position - _ = Interlocked.Exchange(ref _position, startPosition); - } - - private void RestartPipeline (long newPosition) - { - using (_reconfigureLock.EnterScope()) - { - CancelPipelineLocked(); - RestartPipelineInternal(newPosition); - } - } - - /// - /// Cancels the current pipeline operation and releases associated resources. This method should be called while - /// holding the appropriate lock to ensure thread safety. - /// - /// This method cancels any ongoing producer task, marks the internal queue as complete to - /// unblock waiting consumers, and disposes of pipeline resources. It is intended for internal use and must be - /// invoked only when the pipeline is in a valid state for cancellation. - private void CancelPipelineLocked () - { - if (_cts == null) - { - return; - } - - try - { - _cts.Cancel(); - } - catch (ObjectDisposedException) - { - // Ignore if already disposed - } - - try - { - _producerTask?.Wait(); - } - catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is OperationCanceledException)) - { - // Expected cancellation - } - finally - { - _cts.Dispose(); - _cts = null; - } - - // Mark collection as complete to unblock any waiting Take() calls - // This must happen AFTER the producer task is cancelled and finished - if (_lineQueue != null && !_lineQueue.IsAddingCompleted) - { - _lineQueue.CompleteAdding(); - } - - // Complete and dispose the PipeReader - if (_pipeReader != null) - { - try - { - _pipeReader.Complete(); - } - catch (Exception) - { - // Ignore errors during completion - } - } - } - - private async Task ProduceAsync (long startByteOffset, CancellationToken token) - { - var charPool = ArrayPool.Shared; - char[] charBuffer = null; - Decoder decoder = null; - - try - { - // Allocate char buffer - charBuffer = charPool.Rent(_charBufferSize); - decoder = _encoding.GetDecoder(); - - var charsInBuffer = 0; - var byteOffset = startByteOffset; - - while (!token.IsCancellationRequested) - { - // Read from pipe - ReadResult result = await _pipeReader.ReadAsync(token).ConfigureAwait(false); - ReadOnlySequence buffer = result.Buffer; - - if (buffer.Length > 0) - { - // Process the buffer - decode and extract lines - var state = ProcessBuffer(buffer, charBuffer, charsInBuffer, decoder, byteOffset, result.IsCompleted); - charsInBuffer = state.charsInBuffer; - byteOffset = state.byteOffset; - - // Advance the reader - _pipeReader.AdvanceTo(buffer.End); - } - - if (result.IsCompleted) - { - // Handle any remaining chars in buffer as final line - if (charsInBuffer > 0) - { - var segment = CreateSegment(charBuffer, 0, charsInBuffer, 0, byteOffset); - EnqueueLine(segment); - byteOffset += segment.ByteLength; - } - - // Send EOF marker - EnqueueLine(LineSegment.CreateEof(byteOffset)); - break; - } - } - } - catch (OperationCanceledException) - { - // Expected when Position is changed or disposed - } - catch (Exception ex) - { - // Store exception to rethrow in ReadLine - Volatile.Write(ref _producerException, ex); - } - finally - { - // Always mark collection as complete when producer finishes - try - { - _lineQueue?.CompleteAdding(); - } - catch (ObjectDisposedException) - { - // Collection was already disposed - } - - if (charBuffer != null) - { - charPool.Return(charBuffer); - } - } - } - - private void EnqueueLine (LineSegment segment) - { - try - { - _lineQueue.Add(segment, _cts.Token); - } - catch (InvalidOperationException) - { - // Collection was marked as complete, dispose the segment - segment.Dispose(); - } - } - - private (int charsInBuffer, long byteOffset) ProcessBuffer ( - ReadOnlySequence buffer, - char[] charBuffer, - int charsInBuffer, - Decoder decoder, - long byteOffset, - bool isCompleted) - { - var localByteOffset = byteOffset; - var localCharsInBuffer = charsInBuffer; - - // Decode bytes to chars - if (buffer.IsSingleSegment) - { - // Fast path for single segment - var span = buffer.FirstSpan; - (localCharsInBuffer, localByteOffset) = DecodeAndProcessSegment(span, charBuffer, localCharsInBuffer, decoder, localByteOffset, isCompleted); - } - else - { - // Slow path for multi-segment - foreach (var segment in buffer) - { - (localCharsInBuffer, localByteOffset) = DecodeAndProcessSegment(segment.Span, charBuffer, localCharsInBuffer, decoder, localByteOffset, false); - } - - if (isCompleted) - { - // Flush decoder on completion - decoder.Convert([], 0, 0, charBuffer, localCharsInBuffer, - _charBufferSize - localCharsInBuffer, true, - out _, out var charsProduced, out _); - localCharsInBuffer += charsProduced; - } - } - - // Scan for complete lines - var searchIndex = 0; - while (true) - { - var (newlineIndex, newlineChars) = FindNewlineIndex(charBuffer, searchIndex, localCharsInBuffer - searchIndex, isCompleted); - - if (newlineIndex == -1) - { - break; - } - - var lineLength = newlineIndex - searchIndex; - var segment = CreateSegment(charBuffer, searchIndex, lineLength, newlineChars, localByteOffset); - localByteOffset += segment.ByteLength; - EnqueueLine(segment); - searchIndex = newlineIndex + newlineChars; - } - - // Move remaining chars to beginning of buffer - var remaining = localCharsInBuffer - searchIndex; - if (remaining > 0 && searchIndex > 0) - { - charBuffer.AsSpan(searchIndex, remaining).CopyTo(charBuffer.AsSpan(0, remaining)); - //Array.Copy(charBuffer, searchIndex, charBuffer, 0, remaining); - } - - return (remaining, localByteOffset); - } - - private (int charsInBuffer, long byteOffset) DecodeAndProcessSegment (ReadOnlySpan bytes, char[] charBuffer, int charsInBuffer, Decoder decoder, long byteOffset, bool flush) - { - var bytesConsumed = 0; - - while (bytesConsumed < bytes.Length) - { - var charsAvailable = _charBufferSize - charsInBuffer; - - // CRITICAL FIX: Process lines when buffer is getting full - if (charsAvailable < 100) // Leave room for multi-byte sequences - { - // Process lines to free up space - var searchIndex = 0; - while (searchIndex < charsInBuffer) - { - var available = charsInBuffer - searchIndex; - var (newlineIndex, newlineChars) = FindNewlineIndex(charBuffer, searchIndex, available, false); - - if (newlineIndex == -1) - { - // No more complete lines found - var remaining = charsInBuffer - searchIndex; - if (remaining > 0 && searchIndex > 0) - { - charBuffer.AsSpan(searchIndex, remaining).CopyTo(charBuffer.AsSpan(0, remaining)); - //Array.Copy(charBuffer, searchIndex, charBuffer, 0, remaining); - } - - charsInBuffer = remaining; - break; - } - - // Found a line - create and enqueue it - var lineLength = newlineIndex - searchIndex; - var segment = CreateSegment(charBuffer, searchIndex, lineLength, newlineChars, byteOffset); - byteOffset += segment.ByteLength; - EnqueueLine(segment); - searchIndex = newlineIndex + newlineChars; - } - - // If still no space, force process current content as truncated line - if (charsInBuffer >= _charBufferSize - 100 && charsInBuffer > 0) - { - var segment = CreateSegment(charBuffer, 0, charsInBuffer, 0, byteOffset); - byteOffset += segment.ByteLength; - EnqueueLine(segment); - charsInBuffer = 0; - } - - charsAvailable = _charBufferSize - charsInBuffer; - - if (charsAvailable < 10) - { - // Still no space - exit to avoid infinite loop - break; - } - } - - decoder.Convert( - bytes[bytesConsumed..], - charBuffer.AsSpan(charsInBuffer), - flush && bytesConsumed == bytes.Length, - out var usedBytes, - out var charsProduced, - out _); - - bytesConsumed += usedBytes; - charsInBuffer += charsProduced; - } - - return (charsInBuffer, byteOffset); - } - - /// - /// Finds the next newline in the char buffer. - /// Handles \r, \n, and \r\n as newline delimiters. - /// - /// The char buffer to search - /// Start index for search - /// Number of chars available to search - /// If true, treats \r at end of buffer as newline - /// Tuple of (newline index, newline char count) - private static (int newLineIndex, int newLineChars) FindNewlineIndex ( - char[] buffer, - int start, - int available, - bool allowStandaloneCr) - { - var span = buffer.AsSpan(start, available); - - //Vectorized Search for \n - var lfIndex = span.IndexOf('\n'); - if (lfIndex != -1) - { - // Found \n - check if preceded by \r - if (lfIndex > 0 && span[lfIndex - 1] == '\r') - { - return (newLineIndex: start + lfIndex - 1, newLineChars: 2); - } - - return (newLineIndex: start + lfIndex, newLineChars: 1); - } - - //Vectorized search for \r - var crIndex = span.IndexOf('\r'); - if (crIndex != -1) - { - // Check if at end of buffer - if (crIndex + 1 >= span.Length) - { - if (allowStandaloneCr) - { - return (newLineIndex: start + crIndex, newLineChars: 1); - } - - return (newLineIndex: -1, newLineChars: 0); - } - - // Check next char - if (span[crIndex + 1] != '\n') - { - return (newLineIndex: start + crIndex, newLineChars: 1); - } - } - - return (newLineIndex: -1, newLineChars: 0); - } - - /// - /// Creates a LineSegment from the char buffer, handling truncation. - /// - private LineSegment CreateSegment ( - char[] source, - int start, - int lineLength, - int newlineChars, - long byteOffset) - { - var consumedChars = lineLength + newlineChars; - - // Calculate byte length for position tracking - var byteLength = consumedChars == 0 - ? 0 - : _encoding.GetByteCount(source, start, consumedChars); - - // Apply maximum line length constraint - var logicalLength = Math.Min(lineLength, _maximumLineLength); - var truncated = lineLength > logicalLength; - - // Rent buffer from pool (ensure at least size 1) - var rentalLength = Math.Max(logicalLength, 1); - var buffer = ArrayPool.Shared.Rent(rentalLength); - - // Copy line content (excluding newline) - if (logicalLength > 0) - { - source.AsSpan(start, logicalLength).CopyTo(buffer.AsSpan(0, logicalLength)); - //Array.Copy(source, start, buffer, 0, logicalLength); - } - - return new LineSegment(buffer, logicalLength, byteOffset, byteLength, truncated, false); - } - - private static Encoding DetermineEncoding (EncodingOptions options, Encoding detectedEncoding) - { - return options?.Encoding != null - ? options.Encoding - : detectedEncoding ?? options?.DefaultEncoding ?? Encoding.Default; - } - - private static (int length, Encoding? detectedEncoding) DetectPreambleLength (Stream stream) - { - if (!stream.CanSeek) - { - return (0, null); - } - - var originalPos = stream.Position; - var buffer = new byte[4]; - _ = stream.Seek(0, SeekOrigin.Begin); - var readBytes = stream.Read(buffer, 0, buffer.Length); - _ = stream.Seek(originalPos, SeekOrigin.Begin); - - if (readBytes >= 2) - { - foreach (var encoding in _preambleEncodings) - { - var preamble = encoding.GetPreamble(); - var fail = false; - for (var i = 0; i < readBytes && i < preamble.Length; ++i) - { - if (buffer[i] != preamble[i]) - { - fail = true; - break; - } - } - - if (!fail) - { - return (preamble.Length, encoding); - } - } - } - - return (0, null); - } - - public bool TryReadLine (out ReadOnlyMemory lineMemory) - { - ObjectDisposedException.ThrowIf(IsDisposed, GetType()); - - var producerEx = Volatile.Read(ref _producerException); - if (producerEx != null) - { - throw new InvalidOperationException("Producer task encountered an error.", producerEx); - } - - if (!_lineQueue.TryTake(out var segment, 100, _cts?.Token ?? CancellationToken.None)) - { - lineMemory = default; - return false; - } - - // Store segment for lifetime management - _currentSegment?.Dispose(); - _currentSegment = segment; - - if (segment.IsEof) - { - lineMemory = default; - return false; - } - - lineMemory = new ReadOnlyMemory(segment.Buffer, 0, segment.Length); - _ = Interlocked.Exchange(ref _position, segment.ByteOffset + segment.ByteLength); - return true; - } - - public void ReturnMemory (ReadOnlyMemory memory) - { - throw new NotImplementedException(); - } - - /// - /// Represents a line segment with its position and metadata. - /// Uses ArrayPool for efficient char buffer management. - /// - private readonly struct LineSegment : IDisposable - { - /// - /// The rented char buffer from ArrayPool. May be larger than Length. - /// - public char[] Buffer { get; } - - /// - /// The actual length of the line content in the buffer. - /// - public int Length { get; } - - /// - /// The byte offset in the stream where this line starts. - /// - public long ByteOffset { get; } - - /// - /// The number of bytes consumed from the stream for this line (including newline). - /// - public int ByteLength { get; } - - /// - /// True if the line was truncated due to maximum line length constraint. - /// - public bool IsTruncated { get; } - - /// - /// True if this is an EOF marker segment. - /// - public bool IsEof { get; } - - public LineSegment (char[] buffer, int length, long byteOffset, int byteLength, bool isTruncated, bool isEof) - { - Buffer = buffer; - Length = length; - ByteOffset = byteOffset; - ByteLength = byteLength; - IsTruncated = isTruncated; - IsEof = isEof; - } - - public void Dispose () - { - if (Buffer != null) - { - ArrayPool.Shared.Return(Buffer); - } - } - - /// - /// Creates an EOF marker segment. - /// - public static LineSegment CreateEof (long byteOffset) - { - return new LineSegment(null, 0, byteOffset, 0, false, true); - } - } -} diff --git a/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderPipelineNew.cs b/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderPipelineNew.cs deleted file mode 100644 index e35b5951..00000000 --- a/src/LogExpert.Core/Classes/Log/Streamreaders/PositionAwareStreamReaderPipelineNew.cs +++ /dev/null @@ -1,702 +0,0 @@ -using System.Buffers; -using System.Collections.Concurrent; -using System.IO.Pipelines; -using System.Text; - -using LogExpert.Core.Entities; -using LogExpert.Core.Interfaces; - -namespace LogExpert.Core.Classes.Log.Streamreaders; - -/// -/// EXPERIMENTAL: TypedPipeline-based reader for benchmarking comparison. -/// Uses multi-threaded pipeline with BlockingCollection stages. -/// Expected to be 15-25% slower than PositionAwareStreamReaderPipeline due to pipeline overhead. -/// -public class PositionAwareStreamReaderPipelineNew : LogStreamReaderBase, ILogStreamReaderMemory -{ - private const int DEFAULT_BYTE_BUFFER_SIZE = 64 * 1024; // 64 KB - private const int MINIMUM_READ_AHEAD_SIZE = 4 * 1024; // 4 KB - private const int DEFAULT_CHANNEL_CAPACITY = 128; // Number of line segments - - private static readonly Encoding[] _preambleEncodings = - [ - Encoding.UTF8, - Encoding.Unicode, - Encoding.BigEndianUnicode, - Encoding.UTF32 - ]; - - private readonly StreamPipeReaderOptions _streamPipeReaderOptions = new(bufferSize: DEFAULT_BYTE_BUFFER_SIZE, minimumReadSize: MINIMUM_READ_AHEAD_SIZE, leaveOpen: true); - private readonly int _maximumLineLength; - private readonly Lock _reconfigureLock = new(); - private readonly Stream _stream; - private readonly Encoding _encoding; - private readonly int _charBufferSize; - private readonly long _preambleLength; - - private LineSegment? _currentSegment; - private PipeReader _pipeReader; - private CancellationTokenSource _cts; - private Task _producerTask; - private bool _isDisposed; - private long _position; - private BlockingCollection _lineQueue; - private Exception _producerException; - private IPipeline _pipeline; - - public PositionAwareStreamReaderPipelineNew (Stream stream, EncodingOptions encodingOptions, int maximumLineLength) - { - ArgumentNullException.ThrowIfNull(stream); - - if (!stream.CanRead) - { - throw new ArgumentException("Stream must support reading.", nameof(stream)); - } - - if (!stream.CanSeek) - { - throw new ArgumentException("Stream must support seeking.", nameof(stream)); - } - - if (maximumLineLength <= 0) - { - maximumLineLength = 1024; - } - - _maximumLineLength = maximumLineLength; - var (length, detectedEncoding) = DetectPreambleLength(stream); - _preambleLength = length; - _encoding = DetermineEncoding(encodingOptions, detectedEncoding); - - _stream = stream; - _charBufferSize = Math.Max(_encoding.GetMaxCharCount(DEFAULT_BYTE_BUFFER_SIZE), _maximumLineLength + 2); - - RestartPipelineInternal(0); - } - - public override long Position - { - get => Interlocked.Read(ref _position); - set - { - ArgumentOutOfRangeException.ThrowIfNegative(value); - RestartPipeline(value); - } - } - - public override bool IsBufferComplete => true; - - public override Encoding Encoding => _encoding; - - public override bool IsDisposed - { - get => _isDisposed; - protected set => _isDisposed = value; - } - - public override int ReadChar () - { - throw new NotSupportedException("PipelineLogStreamReader currently supports line-based reads only."); - } - - public override string ReadLine () - { - if (TryReadLine(out var lineMemory)) - { - return new string(lineMemory.Span); - } - - return null; - } - - public bool TryReadLine (out ReadOnlyMemory lineMemory) - { - ObjectDisposedException.ThrowIf(IsDisposed, GetType()); - - var producerEx = Volatile.Read(ref _producerException); - if (producerEx != null) - { - throw new InvalidOperationException("Producer task encountered an error.", producerEx); - } - - var queue = _lineQueue; - var cts = _cts; - - if (queue == null || cts == null) - { - lineMemory = default; - return false; - } - - try - { - // With pre-filled queue, data should be available immediately - if (!queue.TryTake(out var segment, 50, cts.Token)) - { - lineMemory = default; - return false; - } - - _currentSegment?.Dispose(); - _currentSegment = segment; - - if (segment.IsEof) - { - lineMemory = default; - return false; - } - - lineMemory = new ReadOnlyMemory(segment.Buffer, 0, segment.Length); - _ = Interlocked.Exchange(ref _position, segment.ByteOffset + segment.ByteLength); - return true; - } - catch (OperationCanceledException) - { - lineMemory = default; - return false; - } - catch (ObjectDisposedException) - { - lineMemory = default; - return false; - } - } - - public void ReturnMemory (ReadOnlyMemory memory) - { - // No-op for this implementation - } - - protected override void Dispose (bool disposing) - { - if (_isDisposed) - { - return; - } - - if (disposing) - { - using (_reconfigureLock.EnterScope()) - { - CancelPipelineLocked(); - - if (_lineQueue != null) - { - while (_lineQueue.TryTake(out var segment)) - { - segment.Dispose(); - } - - _lineQueue.Dispose(); - } - - _stream?.Dispose(); - } - } - - _isDisposed = true; - } - - private void RestartPipelineInternal (long startPosition) - { - _ = _stream.Seek(_preambleLength + startPosition, SeekOrigin.Begin); - - _pipeReader = PipeReader.Create(_stream, _streamPipeReaderOptions); - _lineQueue = new BlockingCollection(new ConcurrentQueue(), DEFAULT_CHANNEL_CAPACITY); - - Volatile.Write(ref _producerException, null); - - // KEY FIX: Read and enqueue first line synchronously to guarantee immediate data availability - var firstLine = ReadFirstLineSynchronously(startPosition); - long nextByteOffset = startPosition; - - if (firstLine.HasValue) - { - _lineQueue.Add(firstLine.Value); - nextByteOffset = firstLine.Value.ByteOffset + firstLine.Value.ByteLength; - } - - _cts = new CancellationTokenSource(); - - // Build TypedPipeline - var builder = new TypedPipelineBuilder(); - _pipeline = builder.AddStep(ProcessBuffer).Build(); - - _pipeline.Finished += segment => - { - if (segment.Buffer != null || segment.IsEof) - { - EnqueueLine(segment); - } - }; - - _producerTask = Task.Run(() => ProduceAsync(nextByteOffset, _cts.Token), CancellationToken.None); - - _ = Interlocked.Exchange(ref _position, startPosition); - } - - /// - /// Reads the first line from the stream synchronously to pre-fill the queue. - /// This ensures data is immediately available when TryReadLine() is called after a seek. - /// - private LineSegment? ReadFirstLineSynchronously (long startPosition) - { - const int FIRST_LINE_BUFFER_SIZE = 4096; - - var byteBuffer = ArrayPool.Shared.Rent(FIRST_LINE_BUFFER_SIZE); - var charBuffer = ArrayPool.Shared.Rent(_charBufferSize); - - try - { - var bytesRead = _stream.Read(byteBuffer, 0, FIRST_LINE_BUFFER_SIZE); - - if (bytesRead == 0) - { - return LineSegment.CreateEof(startPosition); - } - - var decoder = _encoding.GetDecoder(); - var charsDecoded = decoder.GetChars(byteBuffer, 0, bytesRead, charBuffer, 0, flush: false); - - var (newlineIndex, newlineChars) = FindNewlineIndex(charBuffer, 0, charsDecoded, false); - - if (newlineIndex == -1) - { - return null; - } - - var segment = CreateSegment(charBuffer, 0, newlineIndex, newlineChars, startPosition); - - return segment; - } - catch (Exception ex) when (ex is IOException or - DecoderFallbackException or - ObjectDisposedException) - { - return null; - } - finally - { - ArrayPool.Shared.Return(byteBuffer); - ArrayPool.Shared.Return(charBuffer); - } - } - - private void RestartPipeline (long newPosition) - { - using (_reconfigureLock.EnterScope()) - { - CancelPipelineLocked(); - RestartPipelineInternal(newPosition); - } - } - - private void CancelPipelineLocked () - { - if (_cts == null) - { - return; - } - - try - { - _cts.Cancel(); - } - catch (ObjectDisposedException) - { - // Ignore - } - - try - { - _producerTask?.Wait(); - } - catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is OperationCanceledException)) - { - // Expected - } - finally - { - _pipeline?.Complete(); - _cts.Dispose(); - _cts = null; - } - - if (_lineQueue != null && !_lineQueue.IsAddingCompleted) - { - _lineQueue.CompleteAdding(); - } - - if (_pipeReader != null) - { - try - { - _pipeReader.Complete(); - } - catch (ObjectDisposedException) - { - // Ignore: shutdown race - } - catch (InvalidOperationException) - { - // Ignore: already completed or invalid teardown state - } - } - } - - private async Task ProduceAsync (long startByteOffset, CancellationToken token) - { - var charPool = ArrayPool.Shared; - char[] charBuffer = null; - Decoder decoder = null; - - try - { - charBuffer = charPool.Rent(_charBufferSize); - decoder = _encoding.GetDecoder(); - - var charsInBuffer = 0; - var byteOffset = startByteOffset; - - while (!token.IsCancellationRequested) - { - ReadResult result = await _pipeReader.ReadAsync(token).ConfigureAwait(false); - ReadOnlySequence buffer = result.Buffer; - - if (buffer.Length > 0) - { - // Create buffer data and feed to pipeline - var bufferData = new BufferData(buffer, charBuffer, charsInBuffer, decoder, byteOffset, result.IsCompleted); - - // Process and extract lines - var processResult = ProcessBufferAndExtractLines(bufferData); - - // Update state - charsInBuffer = processResult.RemainingChars; - byteOffset = processResult.NewByteOffset; - - var pipeline = _pipeline ?? throw new InvalidOperationException("Pipeline is not initialized."); - - // Feed each line to pipeline (which will enqueue them) - foreach (var segment in processResult.Lines) - { - - pipeline.Execute(new BufferData(default, segment.Buffer, 0, null, segment.ByteOffset, false) - { - PreExtractedSegment = segment - }); - } - - _pipeReader.AdvanceTo(buffer.End); - } - - if (result.IsCompleted) - { - if (charsInBuffer > 0) - { - var segment = CreateSegment(charBuffer, 0, charsInBuffer, 0, byteOffset); - EnqueueLine(segment); - byteOffset += segment.ByteLength; - } - - EnqueueLine(LineSegment.CreateEof(byteOffset)); - break; - } - } - } - catch (OperationCanceledException) - { - // Expected - } - catch (DecoderFallbackException ex) - { - Volatile.Write(ref _producerException, ex); - } - catch (ObjectDisposedException ex) - { - Volatile.Write(ref _producerException, ex); - } - catch (InvalidOperationException ex) - { - Volatile.Write(ref _producerException, ex); - } - catch - { - throw; - } - finally - { - _pipeline?.Complete(); - - try - { - _lineQueue?.CompleteAdding(); - } - catch (ObjectDisposedException) - { - // Ignore - } - - if (charBuffer != null) - { - charPool.Return(charBuffer); - } - } - } - - private LineSegment ProcessBuffer (BufferData bufferData) - { - // If pre-extracted, just return it (simulating pipeline overhead) - if (bufferData.PreExtractedSegment.HasValue) - { - return bufferData.PreExtractedSegment.Value; - } - - // This shouldn't happen in normal operation, but handle gracefully - // Return an empty line segment - return new LineSegment(null, 0, bufferData.ByteOffset, 0, false, false); - } - - private ProcessResult ProcessBufferAndExtractLines (BufferData bufferData) - { - var lines = new List(); - var localCharsInBuffer = bufferData.CharsInBuffer; - var localByteOffset = bufferData.ByteOffset; - - // Decode bytes to chars - if (bufferData.Buffer.IsSingleSegment) - { - var span = bufferData.Buffer.FirstSpan; - var charsAvailable = _charBufferSize - localCharsInBuffer; - - if (charsAvailable > 10) - { - bufferData.Decoder.Convert( - span, - bufferData.CharBuffer.AsSpan(localCharsInBuffer), - bufferData.IsCompleted, - out var usedBytes, - out var charsProduced, - out _); - - localCharsInBuffer += charsProduced; - localByteOffset += usedBytes; - } - } - - // Extract lines - var searchIndex = 0; - while (true) - { - var (newlineIndex, newlineChars) = FindNewlineIndex(bufferData.CharBuffer, searchIndex, localCharsInBuffer - searchIndex, false); - - if (newlineIndex == -1) - { - break; - } - - var lineLength = newlineIndex - searchIndex; - var segment = CreateSegment(bufferData.CharBuffer, searchIndex, lineLength, newlineChars, localByteOffset); - lines.Add(segment); - localByteOffset += segment.ByteLength; - searchIndex = newlineIndex + newlineChars; - } - - // Calculate remaining chars - var remaining = localCharsInBuffer - searchIndex; - if (remaining > 0 && searchIndex > 0) - { - bufferData.CharBuffer.AsSpan(searchIndex, remaining).CopyTo(bufferData.CharBuffer.AsSpan(0, remaining)); - } - - return new ProcessResult(lines, remaining, localByteOffset); - } - - private void EnqueueLine (LineSegment segment) - { - try - { - // Don't use cancellation token here - let the queue complete naturally - _lineQueue.Add(segment); - } - catch (InvalidOperationException) - { - // Collection was marked as complete, dispose the segment - segment.Dispose(); - } - } - - private static (int newLineIndex, int newLineChars) FindNewlineIndex ( - char[] buffer, - int start, - int available, - bool allowStandaloneCr) - { - var span = buffer.AsSpan(start, available); - - var lfIndex = span.IndexOf('\n'); - if (lfIndex != -1) - { - if (lfIndex > 0 && span[lfIndex - 1] == '\r') - { - return (newLineIndex: start + lfIndex - 1, newLineChars: 2); - } - - return (newLineIndex: start + lfIndex, newLineChars: 1); - } - - var crIndex = span.IndexOf('\r'); - if (crIndex != -1) - { - if (crIndex + 1 >= span.Length) - { - if (allowStandaloneCr) - { - return (newLineIndex: start + crIndex, newLineChars: 1); - } - - return (newLineIndex: -1, newLineChars: 0); - } - - if (span[crIndex + 1] != '\n') - { - return (newLineIndex: start + crIndex, newLineChars: 1); - } - } - - return (newLineIndex: -1, newLineChars: 0); - } - - private LineSegment CreateSegment ( - char[] source, - int start, - int lineLength, - int newlineChars, - long byteOffset) - { - var consumedChars = lineLength + newlineChars; - - var byteLength = consumedChars == 0 - ? 0 - : _encoding.GetByteCount(source, start, consumedChars); - - var logicalLength = Math.Min(lineLength, _maximumLineLength); - var truncated = lineLength > logicalLength; - - var rentalLength = Math.Max(logicalLength, 1); - var buffer = ArrayPool.Shared.Rent(rentalLength); - - if (logicalLength > 0) - { - source.AsSpan(start, logicalLength).CopyTo(buffer.AsSpan(0, logicalLength)); - } - - return new LineSegment(buffer, logicalLength, byteOffset, byteLength, truncated, false); - } - - // Pipeline data structures - private class BufferData - { - public ReadOnlySequence Buffer { get; } - public char[] CharBuffer { get; } - public int CharsInBuffer { get; } - public Decoder Decoder { get; } - public long ByteOffset { get; } - public bool IsCompleted { get; } - public LineSegment? PreExtractedSegment { get; set; } - - public BufferData (ReadOnlySequence buffer, char[] charBuffer, int charsInBuffer, Decoder decoder, long byteOffset, bool isCompleted) - { - Buffer = buffer; - CharBuffer = charBuffer; - CharsInBuffer = charsInBuffer; - Decoder = decoder; - ByteOffset = byteOffset; - IsCompleted = isCompleted; - } - } - - private record ProcessResult (List Lines, int RemainingChars, long NewByteOffset); - private record PipelineInput (ReadOnlySequence Buffer, char[] CharBuffer, int CharsInBuffer, Decoder Decoder, long ByteOffset, bool IsCompleted); - private record DecodeResult (char[] CharBuffer, int CharsInBuffer, long ByteOffset); - private record ExtractResult (List Lines); - - private readonly struct LineSegment : IDisposable - { - public char[] Buffer { get; } - public int Length { get; } - public long ByteOffset { get; } - public int ByteLength { get; } - public bool IsTruncated { get; } - public bool IsEof { get; } - - public LineSegment (char[] buffer, int length, long byteOffset, int byteLength, bool isTruncated, bool isEof) - { - Buffer = buffer; - Length = length; - ByteOffset = byteOffset; - ByteLength = byteLength; - IsTruncated = isTruncated; - IsEof = isEof; - } - - public void Dispose () - { - if (Buffer != null) - { - ArrayPool.Shared.Return(Buffer); - } - } - - public static LineSegment CreateEof (long byteOffset) - { - return new LineSegment(null, 0, byteOffset, 0, false, true); - } - } - - private static Encoding DetermineEncoding (EncodingOptions options, Encoding detectedEncoding) - { - return options?.Encoding != null - ? options.Encoding - : detectedEncoding ?? options?.DefaultEncoding ?? Encoding.Default; - } - - private static (int length, Encoding? detectedEncoding) DetectPreambleLength (Stream stream) - { - if (!stream.CanSeek) - { - return (0, null); - } - - var originalPos = stream.Position; - var buffer = new byte[4]; - _ = stream.Seek(0, SeekOrigin.Begin); - var readBytes = stream.Read(buffer, 0, buffer.Length); - _ = stream.Seek(originalPos, SeekOrigin.Begin); - - if (readBytes >= 2) - { - foreach (var encoding in _preambleEncodings) - { - var preamble = encoding.GetPreamble(); - var fail = false; - for (var i = 0; i < readBytes && i < preamble.Length; ++i) - { - if (buffer[i] != preamble[i]) - { - fail = true; - break; - } - } - - if (!fail) - { - return (preamble.Length, encoding); - } - } - } - - return (0, null); - } -} diff --git a/src/LogExpert.Core/Interfaces/ILogStreamReader.cs b/src/LogExpert.Core/Interfaces/ILogStreamReader.cs index b5782d67..8c883798 100644 --- a/src/LogExpert.Core/Interfaces/ILogStreamReader.cs +++ b/src/LogExpert.Core/Interfaces/ILogStreamReader.cs @@ -15,9 +15,9 @@ namespace LogExpert.Core.Interfaces; /// /// Implementations include: /// -/// PositionAwareStreamReaderLegacy - Character-by-character reading for precise position control -/// PositionAwareStreamReaderSystem - Uses .NET's StreamReader.ReadLine() for improved performance -/// PositionAwareStreamReaderPipeline - Modern async pipeline-based implementation using System.IO.Pipelines +/// PositionAwareStreamReaderDirect - Default. Reads decoded chars into pooled blocks and returns zero-copy slices; fastest, assumes \n/\r\n line endings +/// PositionAwareStreamReaderSystem - Uses .NET's StreamReader.ReadLine(); also splits bare \r (classic-Mac) line endings +/// PositionAwareStreamReaderLegacy - Character-by-character reading; exact byte position on mixed/pathological line endings, slowest /// XmlLogReader - Decorator for reading structured XML log blocks (e.g., Log4j XML format) /// /// @@ -109,8 +109,8 @@ public interface ILogStreamReader : IDisposable /// the character. /// /// - /// Some implementations (like PositionAwareStreamReaderPipeline) may not support this method - /// and will throw as they are optimized for line-based reading only. + /// An implementation optimized purely for line-based reading may not support this method + /// and will throw . /// /// /// The reader has been disposed. diff --git a/src/LogExpert.Tests/StreamReaderTests/LogStreamReaderTest.cs b/src/LogExpert.Tests/StreamReaderTests/LogStreamReaderTest.cs index dcbd8a06..b2788ee3 100644 --- a/src/LogExpert.Tests/StreamReaderTests/LogStreamReaderTest.cs +++ b/src/LogExpert.Tests/StreamReaderTests/LogStreamReaderTest.cs @@ -100,152 +100,6 @@ public void CountLinesWithLegacyNewLine (string text, int expectedLines) Assert.That(expectedLines, Is.EqualTo(lineCount), $"Unexpected lines:\n{text}"); } - [Test] - [TestCase("Line 1\nLine 2\nLine 3", 3)] - [TestCase("Line 1\nLine 2\nLine 3\n", 3)] - [TestCase("Line 1\r\nLine 2\r\nLine 3", 3)] - [TestCase("Line 1\r\nLine 2\r\nLine 3\r\n", 3)] - [TestCase("Line 1\rLine 2\rLine 3", 3)] - [TestCase("Line 1\rLine 2\rLine 3\r", 3)] - public void ReadLinesWithPipelineNewLine (string text, int expectedLines) - { - using var stream = new MemoryStream(Encoding.ASCII.GetBytes(text)); - using var reader = new PositionAwareStreamReaderPipeline(stream, new EncodingOptions(), 500); - var lineCount = 0; - while (true) - { - var line = reader.ReadLine(); - if (line == null) - { - break; - } - - lineCount += 1; - - Assert.That(line.StartsWith($"Line {lineCount}", StringComparison.OrdinalIgnoreCase), $"Invalid line: {line}"); - } - - Assert.That(expectedLines, Is.EqualTo(lineCount), $"Unexpected lines:\n{text}"); - } - - [Test] - [TestCase("\n\n\n", 3)] - [TestCase("\r\n\r\n\r\n", 3)] - [TestCase("\r\r\r", 3)] - public void CountLinesWithPipelineNewLine (string text, int expectedLines) - { - using var stream = new MemoryStream(Encoding.ASCII.GetBytes(text)); - using var reader = new PositionAwareStreamReaderPipeline(stream, new EncodingOptions(), 500); - var lineCount = 0; - while (reader.ReadLine() != null) - { - lineCount += 1; - } - - Assert.That(expectedLines, Is.EqualTo(lineCount), $"Unexpected lines:\n{text}"); - } - - [Test] - public void PipelineReaderShouldTrackPositionCorrectly () - { - var text = "Line 1\nLine 2\nLine 3\n"; - using var stream = new MemoryStream(Encoding.UTF8.GetBytes(text)); - using var reader = new PositionAwareStreamReaderPipeline(stream, new EncodingOptions(), 500); - - var line1 = reader.ReadLine(); - var pos1 = reader.Position; - Assert.That(line1, Is.EqualTo("Line 1")); - Assert.That(pos1, Is.EqualTo(7)); // "Line 1\n" = 7 bytes - - var line2 = reader.ReadLine(); - var pos2 = reader.Position; - Assert.That(line2, Is.EqualTo("Line 2")); - Assert.That(pos2, Is.EqualTo(14)); // 7 + "Line 2\n" = 14 bytes - - var line3 = reader.ReadLine(); - var pos3 = reader.Position; - Assert.That(line3, Is.EqualTo("Line 3")); - Assert.That(pos3, Is.EqualTo(21)); // 14 + "Line 3\n" = 21 bytes - } - - [Test] - public void PipelineReaderShouldSupportSeeking () - { - var text = "Line 1\nLine 2\nLine 3\n"; - using var stream = new MemoryStream(Encoding.UTF8.GetBytes(text)); - using var reader = new PositionAwareStreamReaderPipeline(stream, new EncodingOptions(), 500); - - // Read first line - var line1 = reader.ReadLine(); - Assert.That(line1, Is.EqualTo("Line 1")); - - // Seek back to beginning - reader.Position = 0; - - // Should read first line again - var line1Again = reader.ReadLine(); - Assert.That(line1Again, Is.EqualTo("Line 1")); - - // Seek to middle - reader.Position = 7; // After "Line 1\n" - - // Should read second line - var line2 = reader.ReadLine(); - Assert.That(line2, Is.EqualTo("Line 2")); - } - - [Test] - public void PipelineReaderShouldHandleMaximumLineLength () - { - var longLine = new string('X', 1000); - var text = $"{longLine}\nShort line\n"; - using var stream = new MemoryStream(Encoding.UTF8.GetBytes(text)); - using var reader = new PositionAwareStreamReaderPipeline(stream, new EncodingOptions(), 100); - - // First line should be truncated to 100 chars - var line1 = reader.ReadLine(); - Assert.That(line1, Has.Length.EqualTo(100)); - Assert.That(line1, Is.EqualTo(new string('X', 100))); - - // Second line should be normal - var line2 = reader.ReadLine(); - Assert.That(line2, Is.EqualTo("Short line")); - } - - [Test] - public void PipelineReaderShouldHandleUnicode () - { - var text = "Hello 世界\nСпасибо\n"; - using var stream = new MemoryStream(Encoding.UTF8.GetBytes(text)); - using var reader = new PositionAwareStreamReaderPipeline(stream, new EncodingOptions { Encoding = Encoding.UTF8 }, 500); - - var line1 = reader.ReadLine(); - Assert.That(line1, Is.EqualTo("Hello 世界")); - - var line2 = reader.ReadLine(); - Assert.That(line2, Is.EqualTo("Спасибо")); - } - - [Test] - public void PipelineReaderShouldHandleEmptyLines () - { - var text = "Line 1\n\nLine 3\n"; - using var stream = new MemoryStream(Encoding.UTF8.GetBytes(text)); - using var reader = new PositionAwareStreamReaderPipeline(stream, new EncodingOptions(), 500); - - var line1 = reader.ReadLine(); - Assert.That(line1, Is.EqualTo("Line 1")); - - var line2 = reader.ReadLine(); - Assert.That(line2, Is.EqualTo("")); - - var line3 = reader.ReadLine(); - Assert.That(line3, Is.EqualTo("Line 3")); - - var eof = reader.ReadLine(); - Assert.That(eof, Is.Null); - } - [Test] [TestCase("Line 1\nLine 2\nLine 3", 3)] [TestCase("Line 1\nLine 2\nLine 3\n", 3)]