Skip to content

Packet Reader Service#

The Packet Reader Service provides high-performance data consumption from Kafka topics with support for real-time streaming and historical data replay.

Overview#

The service offers flexible data consumption modes:

  • All packets: Continuous consumption of all packet types via ReadPackets
  • Essential packets: Read only configuration and metadata via ReadEssentials
  • Filtered data packets: Consume specific parameters using regex patterns via ReadDataPackets

Key Features#

Reading Modes#

  • ReadPackets: Continuous streaming of all packet types
  • ReadEssentials: Essential packets only (config, metadata)
  • ReadDataPackets: Filtered data packets using regex patterns

Performance#

  • Low-latency data delivery
  • Efficient memory usage
  • Connection pooling
  • Backpressure handling

Usage Examples#

Basic Stream Reading#

var packetReader = StreamingApiClient.GetPacketReaderClient();

// Start reading all packets from a connection
var readRequest = new ReadPacketsRequest
{
    Connection = connection.Connection
};

var stream = packetReader.ReadPackets(readRequest);

while (await stream.ResponseStream.MoveNext())
{
    var response = stream.ResponseStream.Current;
    foreach (var packet in response.Response)
    {
        Console.WriteLine($"Received: {packet.Packet.Type} from {packet.Stream}");
        ProcessPacket(packet.Packet);
    }
}

Reading Essential Packets#

// Read only essential packets (configuration, metadata, etc.)
var essentialsRequest = new ReadEssentialsRequest
{
    Connection = connection.Connection
};

var stream = packetReader.ReadEssentials(essentialsRequest);

while (await stream.ResponseStream.MoveNext())
{
    var response = stream.ResponseStream.Current;
    foreach (var packet in response.Response)
    {
        Console.WriteLine($"Essential packet: {packet.Packet.Type}");
        ProcessEssentialPacket(packet.Packet);
    }
}

Filtered Data Packet Reading#

// Read data packets with specific parameters (supports regex patterns)
var dataPacketRequest = new ReadDataPacketsRequest
{
    Request = new DataPacketRequest
    {
        Connection = connection.Connection,
        IncludeParameters = { "vCar.*", ".*Temp.*", "NGear.*" },
        ExcludeParameters = { ".*Debug.*" }
    }
};

var stream = packetReader.ReadDataPackets(dataPacketRequest);

while (await stream.ResponseStream.MoveNext())
{
    var response = stream.ResponseStream.Current;
    foreach (var packet in response.Response)
    {
        // Only data packets matching the filter
        ProcessDataPacket(packet.Packet);
    }
}

Advanced Features#

Parameter Filtering with Regex#

// Include all engine-related parameters, exclude calibration data
var dataRequest = new ReadDataPacketsRequest
{
    Request = new DataPacketRequest
    {
        Connection = connection.Connection,
        IncludeParameters = 
        { 
            "NEngine.*",      // All engine parameters
            "TEngine.*",      // Engine temperature
            "PEngine.*"       // Engine pressure
        },
        ExcludeParameters = 
        { 
            ".*Calibration.*",  // Exclude calibration
            ".*Raw.*"           // Exclude raw data
        }
    }
};

var stream = packetReader.ReadDataPackets(dataRequest);

Combined Reading Strategies#

// Read essentials and filtered data packets concurrently
var essentialsTask = Task.Run(async () =>
{
    var stream = packetReader.ReadEssentials(new ReadEssentialsRequest
    {
        Connection = connection.Connection
    });

    while (await stream.ResponseStream.MoveNext())
    {
        var response = stream.ResponseStream.Current;
        foreach (var packet in response.Response)
        {
            ProcessEssentialPacket(packet.Packet);
        }
    }
});

var dataTask = Task.Run(async () =>
{
    var stream = packetReader.ReadDataPackets(new ReadDataPacketsRequest
    {
        Request = new DataPacketRequest
        {
            Connection = connection.Connection,
            IncludeParameters = { "vCar.*", "NGear.*" }
        }
    });

    while (await stream.ResponseStream.MoveNext())
    {
        var response = stream.ResponseStream.Current;
        foreach (var packet in response.Response)
        {
            ProcessDataPacket(packet.Packet);
        }
    }
});

await Task.WhenAll(essentialsTask, dataTask);

Best Practices#

  • Handle backpressure appropriately
  • Implement proper error recovery
  • Monitor consumer lag
  • Use appropriate buffer sizes

See Also#