Skip to content

Packet Writer Service#

The Packet Writer Service enables high-performance data publishing to Kafka topics with support for both individual packets and batch operations.

Overview#

The service provides multiple writing modes optimized for different use cases:

  • Single packet writing: For low-frequency, critical data
  • Batch operations: For high-throughput scenarios
  • Essential packet handling: For critical data that requires special routing
  • Stream-based routing: Automatic topic/partition selection

Key Features#

Writing Modes#

  • Individual packet writing with immediate publishing
  • Batch writing for optimal throughput
  • Streaming writes for continuous data flows
  • Essential packet routing for critical data

Performance Optimization#

  • Asynchronous operations for non-blocking writes
  • Internal buffering for batch efficiency
  • Connection pooling and reuse
  • Optional Prometheus metrics for monitoring (requires admin privileges)

Usage Examples#

Single Packet Writing#

var packetWriter = StreamingApiClient.GetPacketWriterClient();

// Write individual telemetry packet
await packetWriter.WriteDataPacketAsync(new WriteDataPacketRequest
{
    Detail = new DataPacketDetails
    {
        DataSource = "vehicle-telemetry",
        Stream = "engine",
        SessionKey = "race-session-2024-001",
        Message = new Packet
        {
            Content = ByteString.CopyFrom(engineData),
            Type = "PeriodicData",
            IsEssential = false
        }
    }
});

Batch Writing#

// High-throughput batch writing
var batchStream = packetWriter.WriteDataPackets();

try
{
    for (int i = 0; i < 1000; i++)
    {
        await batchStream.RequestStream.WriteAsync(new WriteDataPacketsRequest
        {
            Details = 
            {
                new DataPacketDetails
                {
                    DataSource = "high-frequency-sensors",
                    Stream = "accelerometers",
                    SessionKey = sessionKey,
                    Message = new Packet
                    {
                        Content = ByteString.CopyFrom(GenerateSensorData(i)),
                        Type = "PeriodicData"
                    }
                }
            }
        });
    }
}
finally
{
    await batchStream.RequestStream.CompleteAsync();
}

Essential Packet Writing#

// Write critical system events
await packetWriter.WriteDataPacketAsync(new WriteDataPacketRequest
{
    Detail = new DataPacketDetails
    {
        DataSource = "system-events",
        Stream = "", // Writes to the main stream
        SessionKey = sessionKey,
        Message = new Packet
        {
            Content = ByteString.CopyFrom(criticalEventData),
            Type = "Event",
            IsEssential = true // Ensures special routing and persistence
        }
    }
});

Advanced Usage#

Multi-Stream Writing#

// Write to multiple streams simultaneously
var tasks = new List<Task>();

// Engine data
tasks.Add(packetWriter.WriteDataPacketAsync(new WriteDataPacketRequest
{
    Detail = new DataPacketDetails
    {
        DataSource = "telemetry",
        Stream = "engine",
        SessionKey = sessionKey,
        Message = CreateEnginePacket(engineData)
    }
}));

// Brake data
tasks.Add(packetWriter.WriteDataPacketAsync(new WriteDataPacketRequest
{
    Detail = new DataPacketDetails
    {
        DataSource = "telemetry", 
        Stream = "brakes",
        SessionKey = sessionKey,
        Message = CreateBrakePacket(brakeData)
    }
}));

// Wait for all writes to complete
await Task.WhenAll(tasks);

Structured Data Writing#

// Write structured telemetry data
public async Task WriteVehicleState(VehicleState state)
{
    var packet = new Packet
    {
        Content = ByteString.CopyFrom(JsonSerializer.SerializeToUtf8Bytes(new
        {
            timestamp = state.Timestamp.Ticks,
            vehicle_id = state.VehicleId,
            position = new 
            {
                latitude = state.Position.Latitude,
                longitude = state.Position.Longitude,
                altitude = state.Position.Altitude
            },
            dynamics = new
            {
                speed = state.Speed,
                heading = state.Heading,
                acceleration = state.Acceleration
            }
        })),
        Type = "VehicleState",
        SessionKey = state.SessionKey
    };

    await packetWriter.WriteDataPacketAsync(new WriteDataPacketRequest
    {
        Detail = new DataPacketDetails
        {
            DataSource = "vehicle-tracking",
            Stream = "position",
            SessionKey = state.SessionKey,
            Message = packet
        }
    });
}

High-Performance Streaming#

// Continuous data streaming pattern
public class ContinuousDataStreamer
{
    private readonly PacketWriterService.PacketWriterServiceClient packetWriter;
    private AsyncClientStreamingCall<WriteDataPacketsRequest, WriteDataPacketsResponse>? streamWriter;

    public async Task StartStreaming()
    {
        streamWriter = packetWriter.WriteDataPackets();

        // Continuous streaming loop
        while (isStreaming)
        {
            var data = await GetNextDataPoint();

            await streamWriter.RequestStream.WriteAsync(new WriteDataPacketsRequest
            {
                Details = 
                {
                    new DataPacketDetails
                    {
                        DataSource = "continuous-stream",
                        Stream = "sensor-data",
                        SessionKey = sessionKey,
                        Message = new Packet
                        {
                            Content = ByteString.CopyFrom(data),
                            Type = "PeriodicData"
                        }
                    }
                }
            });
        }

        await streamWriter.RequestStream.CompleteAsync();
    }
}

Performance Optimization#

Batching Strategies#

// Time-based batching
public class TimeBatchedWriter
{
    private readonly List<WriteDataPacketsRequest> batch = new();
    private readonly Timer batchTimer;

    public TimeBatchedWriter()
    {
        batchTimer = new Timer(FlushBatch, null, TimeSpan.FromMilliseconds(100), 
            TimeSpan.FromMilliseconds(100));
    }

    public void QueueWrite(WriteDataPacketsRequest request)
    {
        lock (batch)
        {
            batch.Add(request);

            // Size-based flushing
            if (batch.Count >= 100)
            {
                FlushBatch(null);
            }
        }
    }

    private async void FlushBatch(object? state)
    {
        List<WriteDataPacketsRequest> toFlush;
        lock (batch)
        {
            if (batch.Count == 0) return;

            toFlush = new List<WriteDataPacketsRequest>(batch);
            batch.Clear();
        }

        var batchStream = packetWriter.WriteDataPackets();
        try
        {
            foreach (var request in toFlush)
            {
                await batchStream.RequestStream.WriteAsync(request);
            }
        }
        finally
        {
            await batchStream.RequestStream.CompleteAsync();
        }
    }
}

Connection Reuse#

// Reuse streaming connections for better performance
public class OptimizedPacketWriter
{
    private AsyncClientStreamingCall<WriteDataPacketsRequest, WriteDataPacketsResponse>? currentStream;
    private readonly SemaphoreSlim streamLock = new(1, 1);

    public async Task WritePacketOptimized(WriteDataPacketsRequest request)
    {
        await streamLock.WaitAsync();
        try
        {
            if (currentStream == null)
            {
                currentStream = packetWriter.WriteDataPackets();
            }

            await currentStream.RequestStream.WriteAsync(request);
        }
        finally
        {
            streamLock.Release();
        }
    }

    public async Task FlushAndClose()
    {
        await streamLock.WaitAsync();
        try
        {
            if (currentStream != null)
            {
                await currentStream.RequestStream.CompleteAsync();
                currentStream = null;
            }
        }
        finally
        {
            streamLock.Release();
        }
    }
}

Error Handling#

Retry Mechanisms#

// Robust writing with retry logic
public async Task WriteWithRetry(WriteDataPacketRequest request, int maxRetries = 3)
{
    for (int attempt = 0; attempt <= maxRetries; attempt++)
    {
        try
        {
            await packetWriter.WriteDataPacketAsync(request);
            return; // Success
        }
        catch (RpcException ex) when (attempt < maxRetries)
        {
            switch (ex.StatusCode)
            {
                case StatusCode.Unavailable:
                case StatusCode.DeadlineExceeded:
                    // Transient errors - retry
                    await Task.Delay(TimeSpan.FromMilliseconds(100 * Math.Pow(2, attempt)));
                    break;

                default:
                    // Non-retryable error
                    throw;
            }
        }
    }
}

Error Recovery#

// Handle streaming errors gracefully
public async Task WriteStreamWithRecovery(IEnumerable<WriteDataPacketsRequest> requests)
{
    var requestQueue = new Queue<WriteDataPacketsRequest>(requests);

    while (requestQueue.Count > 0)
    {
        var stream = packetWriter.WriteDataPackets();
        try
        {
            // Write batch
            while (requestQueue.Count > 0)
            {
                var request = requestQueue.Dequeue();
                await stream.RequestStream.WriteAsync(request);
            }

            await stream.RequestStream.CompleteAsync();
        }
        catch (RpcException ex)
        {
            // Log error and retry remaining requests
            Console.WriteLine($"Stream error: {ex.Message}, retrying {requestQueue.Count} requests");
            await Task.Delay(1000); // Wait before retry
        }
    }
}

Monitoring (Optional)#

Performance Metrics#

If Prometheus is enabled with admin privileges, the following metrics are available:

  • stream_api_data_packets_published_total: Number of data packets published (by data source, stream)
  • stream_api_info_packets_published_total: Number of info packets published (by data source)
  • stream_api_data_packets_routed_total: Number of data packets routed by router (by data source, stream)
  • stream_api_data_packets_routed_bytes_total: Total bytes of data packets routed (by data source, stream)
  • stream_api_info_packets_routed_total: Number of info packets routed by router (by data source)
  • stream_api_info_packets_routed_bytes_total: Total bytes of info packets routed (by data source)

Example usage:

// Metrics are automatically tracked by the service when Prometheus is enabled
public class MetricsCollector
{
    private readonly Counter packetsWritten = Metrics
        .CreateCounter("packets_written_total", "Total packets written");

    private readonly Histogram writeLatency = Metrics
        .CreateHistogram("packet_write_duration_seconds", "Packet write latency");

    public async Task WriteWithMetrics(WriteDataPacketRequest request)
    {
        using var timer = writeLatency.NewTimer();

        try
        {
            await packetWriter.WriteDataPacketAsync(request);
            packetsWritten.Inc();
        }
        catch
        {
            // Handle errors
            throw;
        }
    }
}

Best Practices#

Data Organization#

  • Use consistent packet types for similar data
  • Include timestamps in packet content
  • Validate data before writing
  • Use appropriate stream names for logical grouping

Performance#

  • Use batch writing for high-frequency data
  • Implement appropriate retry mechanisms
  • Monitor write latency and throughput
  • Consider data compression for large packets

Reliability#

  • Mark critical data as essential
  • Implement proper error handling
  • Use structured logging for debugging
  • Monitor system resources
  • Ensure to write all data before closing the session.

See Also#