Viewing: reader.rs
// SPDX-License-Identifier: MIT
// Copyright (c) 2025 DDN. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
use std::{io::Error as IoError, ptr, time::Duration};
use super::{
convert::{ChangelogRecord, ConvertRecord},
error::{ChangelogError, Result as ChangelogResult},
flags::ChangelogFlag,
record::Record,
};
use crate::{
error::{Result, cvt_rc_m},
hsm::RawDescriptor,
};
use changelog_sys::*;
use nix::{
errno::Errno,
fcntl::{FcntlArg, OFlag, fcntl},
};
/// Safe wrapper around `llapi_changelog_recv` that handles error conversion.
///
/// # Returns
/// - `Ok(Some(ptr))` if a record was received successfully
/// - `Ok(None)` if no more records are available (EOF)
/// - `Err(error)` if an error occurred during receive
fn safe_changelog_recv(
priv_ptr: *mut std::os::raw::c_void,
device: &str,
) -> Result<Option<*mut changelog_rec>> {
let mut rec_ptr: *mut changelog_rec = ptr::null_mut();
let rc = unsafe { llapi_changelog_recv(priv_ptr, &mut rec_ptr) };
// Save errno immediately after the call, before any other operations
let errno = Errno::last();
match rc {
0 => {
if rec_ptr.is_null() {
Ok(None)
} else {
Ok(Some(rec_ptr))
}
}
1 => {
// EOF - no more records available
Ok(None)
}
rc if rc == -(Errno::EAGAIN as i32) || rc == -(Errno::EWOULDBLOCK as i32) => {
// EAGAIN or EWOULDBLOCK - No data available right now in non-blocking mode
Ok(None)
}
_ => {
// Check errno as well (sometimes errno is set instead of rc being -errno)
if errno == Errno::EAGAIN || errno == Errno::EWOULDBLOCK {
// No data available right now, but not an error
Ok(None)
} else {
// Real error occurred - use cvt_rc_m to convert the error code
cvt_rc_m(
rc,
format!(
"Failed to receive changelog record from device {} (rc: {}, errno: {})",
device, rc, errno
),
)?;
unreachable!()
}
}
}
}
/// A reader for consuming Lustre changelog records.
///
/// This reader manages an active connection to a Lustre changelog and provides
/// methods for receiving records both in blocking and non-blocking modes.
/// It automatically cleans up the connection when dropped.
///
/// # Examples
/// ```rust,no_run
/// use rustreapi::changelog::{ChangelogBuilder, ChangelogFlag};
///
/// let reader = ChangelogBuilder::new()
/// .device("lustre-MDT0000")
/// .flags(ChangelogFlag::Follow)
/// .connect()?;
///
/// // Blocking receive
/// if let Some(record) = reader.recv()? {
/// println!("Received record: {:?}", record);
/// }
///
/// // Non-blocking poll
/// if let Some(record) = reader.poll()? {
/// println!("Polled record: {:?}", record);
/// }
///
/// // Iterator interface
/// for record in reader {
/// let record = record?;
/// if record.record_type == rustreapi::RecordType::Create {
/// println!("File created: {:?}", record.filename);
/// }
/// }
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
#[derive(Debug)]
pub struct ChangelogReader {
priv_ptr: *mut std::os::raw::c_void,
device: String,
flags: ChangelogFlag,
}
impl ChangelogReader {
/// Create a new changelog reader.
///
/// # Safety
/// The caller must ensure that `priv_ptr` is a valid pointer returned
/// by `llapi_changelog_start` and has not been freed.
pub(super) fn new(
priv_ptr: *mut std::os::raw::c_void,
device: String,
flags: ChangelogFlag,
) -> Self {
Self {
priv_ptr,
device,
flags,
}
}
/// Receive a changelog record, blocking until one is available.
///
/// This method will block indefinitely if the `Block` flag was set during
/// connection. If `Block` was not set, it will return `None` immediately
/// if no records are available.
///
/// # Returns
/// - `Ok(Some(record))` if a record was received
/// - `Ok(None)` if no records are available and blocking is disabled
/// - `Err(error)` if an error occurred
pub fn recv(&self) -> ChangelogResult<Option<Record>> {
match safe_changelog_recv(self.priv_ptr, &self.device) {
Ok(Some(rec_ptr)) => {
// We have a record, convert it
let changelog_record = unsafe { ChangelogRecord::from_ptr(rec_ptr) }
.ok_or(ChangelogError::InvalidRecordPointer)?;
let record = changelog_record.to_record()?;
Ok(Some(record))
}
Ok(None) => Ok(None),
Err(e) => Err(ChangelogError::ReceiveFailed {
device: self.device.clone(),
source: Box::new(e),
}),
}
}
/// Receive a changelog record with a converter for context-aware processing.
///
/// This method works like `recv()` but allows passing a `RecordConverter`
/// that can contain context such as Lustre file descriptors for resolving
/// parent FIDs to directory paths.
///
/// # Arguments
/// * `converter` - `RecordConverter` with optional context for FID resolution
///
/// # Returns
/// - `Ok(Some(record))` if a record was received
/// - `Ok(None)` if no records are available and blocking is disabled
/// - `Err(error)` if an error occurred
pub fn recv_with_converter<C: ConvertRecord>(
&self,
converter: &C,
) -> ChangelogResult<Option<Record>> {
match safe_changelog_recv(self.priv_ptr, &self.device) {
Ok(Some(rec_ptr)) => {
// We have a record, convert it
let changelog_record = unsafe { ChangelogRecord::from_ptr(rec_ptr) }
.ok_or(ChangelogError::InvalidRecordPointer)?;
let record = converter.convert_record(&changelog_record)?;
Ok(Some(record))
}
Ok(None) => Ok(None),
Err(e) => Err(ChangelogError::ReceiveFailed {
device: self.device.clone(),
source: Box::new(e),
}),
}
}
/// Poll for a changelog record without blocking.
///
/// This method will return immediately regardless of the `Block` flag setting.
/// It's useful for implementing non-blocking event loops.
///
/// # Returns
/// - `Ok(Some(record))` if a record was received
/// - `Ok(None)` if no records are currently available
/// - `Err(error)` if an error occurred
pub fn poll(&self) -> ChangelogResult<Option<Record>> {
// For now, this is the same as recv() since the Lustre API doesn't
// provide a separate non-blocking interface. The blocking behavior
// is controlled by the flags set during connection.
// TODO: Consider using a timeout mechanism or background thread
// for true non-blocking behavior
self.recv()
}
/// Poll for a changelog record with a timeout.
///
/// This method will wait up to the specified duration for a record to become
/// available before returning `None`.
///
/// # Arguments
/// * `timeout` - Maximum time to wait for a record
///
/// # Returns
/// - `Ok(Some(record))` if a record was received within the timeout
/// - `Ok(None)` if no records were available within the timeout
/// - `Err(error)` if an error occurred
pub fn poll_timeout(&self, _timeout: Duration) -> ChangelogResult<Option<Record>> {
// TODO: Implement timeout-based polling using threads or async
// For now, fall back to regular polling
self.poll()
}
/// Get the device name this reader is connected to.
pub fn device(&self) -> &str {
&self.device
}
/// Get the flags used to configure this reader.
pub fn flags(&self) -> ChangelogFlag {
self.flags
}
/// Check if this reader is configured to follow new records.
pub fn is_following(&self) -> bool {
self.flags.contains(ChangelogFlag::Follow)
}
/// Check if this reader is configured to block when no records are available.
pub fn is_blocking(&self) -> bool {
self.flags.contains(ChangelogFlag::Block)
}
/// Get the file descriptor for this changelog reader as a `RawDescriptor`.
///
/// This returns a `RawDescriptor` that is compatible with `tokio::AsyncFd` for
/// implementing non-blocking, interruptible reads.
///
/// # Returns
/// - `Ok(RawDescriptor)` if the file descriptor was retrieved successfully
/// - `Err(error)` if an error occurred
pub fn get_fd(&self) -> ChangelogResult<RawDescriptor> {
let fd = unsafe { llapi_changelog_get_fd(self.priv_ptr) };
if fd < 0 {
return Err(ChangelogError::ConnectionFailed {
device: self.device.clone(),
source: IoError::other(format!("Failed to get changelog file descriptor: {}", fd)),
});
}
// Set the file descriptor to non-blocking mode using nix
// Get current flags
let flags = fcntl(fd, FcntlArg::F_GETFL).map_err(|e| ChangelogError::ConnectionFailed {
device: self.device.clone(),
source: IoError::from(e),
})?;
// Set non-blocking flag
let mut new_flags = OFlag::from_bits_truncate(flags);
new_flags |= OFlag::O_NONBLOCK;
fcntl(fd, FcntlArg::F_SETFL(new_flags)).map_err(|e| ChangelogError::ConnectionFailed {
device: self.device.clone(),
source: IoError::from(e),
})?;
Ok(RawDescriptor::new(fd))
}
}
impl Drop for ChangelogReader {
fn drop(&mut self) {
if !self.priv_ptr.is_null() {
unsafe {
let mut ptr = self.priv_ptr;
let _ = llapi_changelog_fini(&mut ptr);
self.priv_ptr = ptr::null_mut();
}
}
}
}
impl Iterator for ChangelogReader {
type Item = ChangelogResult<Record>;
/// Iterate over changelog records.
///
/// This iterator will continue indefinitely if the `Follow` flag was set,
/// yielding new records as they become available. Without `Follow`, it will
/// stop when no more records are available.
///
/// # Note
/// The iterator will block if the `Block` flag was set during connection.
/// For non-blocking iteration, use the `poll()` method directly.
fn next(&mut self) -> Option<Self::Item> {
match self.recv() {
Ok(Some(record)) => Some(Ok(record)),
Ok(None) => {
if self.is_following() {
// In follow mode, we should continue trying
// TODO: Add a small delay to avoid busy-waiting
self.next()
} else {
// No more records and not following
None
}
}
Err(e) => Some(Err(e)),
}
}
}