Viewing: changelog_monitor.rs

// SPDX-License-Identifier: MIT

// Copyright (c) 2025 DDN. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

use chrono::{DateTime, Utc};
use clap::Parser;
use rustreapi::{
    LustrePath, Record,
    changelog::{
        ChangelogBuilder, ChangelogExtraFlag, ChangelogFlag, ChangelogReader, RecordConverter,
        RecordConverterBuilder, TimeFormat,
    },
};
use serde::{Deserialize, Serialize};
use std::{thread, time::Duration};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

/// Monitor Lustre changelog records from multiple MDTs and output as JSON
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
    /// Lustre filesystem name
    #[arg(short, long)]
    fsname: String,

    /// List of MDT indices to monitor (e.g., 0,1,2). If not specified, defaults to MDT0 only.
    #[arg(short, long, value_delimiter = ',')]
    mdts: Option<Vec<u32>>,

    /// Starting record index (default: 0 for latest)
    #[arg(short, long, default_value = "0")]
    start_record: i64,

    /// Enable follow mode to continue monitoring for new records
    #[arg(short = 'F', long, default_value = "false")]
    follow: bool,

    /// Enable blocking mode when no records are available
    #[arg(short, long, default_value = "false")]
    block: bool,

    /// Disable job ID information in records (enabled by default)
    #[arg(long, default_value = "false")]
    no_jobid: bool,

    /// Disable UID/GID information in records (enabled by default)
    #[arg(long, default_value = "false")]
    no_uidgid: bool,

    /// Include client NID information in records
    #[arg(short, long, default_value = "false")]
    no_nid: bool,

    /// Include open mode information in records
    #[arg(short, long, default_value = "false")]
    openmode: bool,

    /// Include extended attribute information in records
    #[arg(short, long, default_value = "false")]
    xattr: bool,

    /// Maximum number of records to process before exiting (0 = unlimited)
    #[arg(short = 'c', long, default_value = "0")]
    count: u64,

    /// Verbose output for debugging
    #[arg(short, long, default_value = "false")]
    verbose: bool,

    /// Time format for timestamps: `"unix"` (default), `"local"`, `"utc"`
    #[arg(long, default_value = "unix", value_parser = ["unix", "local", "utc"])]
    time_format: String,

    /// Resolve parent paths for FIDs (requires Lustre mount access)
    #[arg(short, long, default_value = "false")]
    resolve_parent: bool,
}

/// A changelog record with its source MDT information
#[derive(Debug, Serialize, Deserialize)]
struct ChangelogOutput {
    /// The MDT device name this record came from
    mdt: String,
    /// The actual changelog record
    record: Record,
    /// Timestamp when this record was processed
    timestamp: DateTime<Utc>,
}

/// Reader state for a single MDT
struct MdtReader {
    device: String,
    reader: ChangelogReader,
    converter: RecordConverter,
}

impl MdtReader {
    fn new(fsname: &str, mdt_index: u32, args: &Args) -> Result<Self> {
        let device = format!("{}-MDT{:04x}", fsname, mdt_index);

        if args.verbose {
            eprintln!("Connecting to MDT device: {}", device);
        }

        // Build changelog flags - always set essential flags for proper operation
        let mut flags = ChangelogFlag::ExtraFlags;

        // Include JobId unless explicitly disabled
        if !args.no_jobid {
            flags |= ChangelogFlag::JobId;
        }

        if args.follow {
            flags |= ChangelogFlag::Follow;
        }
        if args.block {
            flags |= ChangelogFlag::Block;
        }

        // Build extra flags - include UID/GID by default unless disabled
        let mut extra_flags = ChangelogExtraFlag::none();
        if !args.no_uidgid {
            extra_flags |= ChangelogExtraFlag::UidGid;
        }

        if !args.no_nid {
            extra_flags |= ChangelogExtraFlag::Nid;
        }
        if args.openmode {
            extra_flags |= ChangelogExtraFlag::OpenMode;
        }
        if args.xattr {
            extra_flags |= ChangelogExtraFlag::Xattr;
        }

        let mut builder = ChangelogBuilder::new()
            .device(&device)
            .flags(flags)
            .start_record(args.start_record);

        if extra_flags.bits() != 0 {
            builder = builder.extra_flags(extra_flags);
        }

        let reader = builder
            .connect()
            .map_err(|e| format!("Failed to connect to {}: {}", device, e))?;

        // Try to get Lustre mount point for FID path resolution
        let mut converter_builder = RecordConverterBuilder::new();

        // Configure time format based on arguments
        let time_format = match args.time_format.as_str() {
            "local" => TimeFormat::Iso8601Local,
            "utc" => TimeFormat::Iso8601Utc,
            "unix" => TimeFormat::Unix,
            _ => {
                return Err(format!(
                    "Invalid time format '{}'. Valid options are: unix, local, utc",
                    args.time_format
                )
                .into());
            }
        };
        converter_builder = converter_builder.time_format(time_format);

        match LustrePath::find_mount_by_fsname(fsname) {
            Ok(lustre_path) => {
                if args.verbose {
                    eprintln!("Found Lustre mount at: {}", lustre_path);
                }
                match lustre_path.open() {
                    Ok(fd) => {
                        converter_builder = converter_builder.lustre_fd(fd);
                    }
                    Err(e) => {
                        if args.verbose {
                            eprintln!("Could not open Lustre mount: {}", e);
                        }
                    }
                }
            }
            Err(e) => {
                if args.verbose {
                    eprintln!("Could not find Lustre mount for {}: {}", fsname, e);
                }
            }
        }

        let converter = converter_builder.build();

        if args.verbose {
            eprintln!("Successfully connected to {}", device);
        }

        Ok(Self {
            device,
            reader,
            converter,
        })
    }
}

fn main() -> Result<()> {
    let args = Args::parse();

    // Default to MDT0 if no MDTs specified
    let mdts = args.mdts.clone().unwrap_or_else(|| vec![0]);

    if args.verbose {
        eprintln!("Starting changelog monitor for filesystem: {}", args.fsname);
        eprintln!("Monitoring MDTs: {:?}", mdts);
    }

    // For this example, we'll monitor MDTs sequentially
    // In a production implementation, you'd want parallel monitoring
    let mut readers = Vec::new();

    for &mdt_index in &mdts {
        match MdtReader::new(&args.fsname, mdt_index, &args) {
            Ok(reader) => {
                readers.push(reader);
            }
            Err(e) => {
                if args.verbose {
                    eprintln!("Warning: Failed to connect to MDT{:04x}: {}", mdt_index, e);
                } else if mdts.len() == 1 {
                    // If only one MDT was specified and it failed, that's an error
                    eprintln!(
                        "Failed to connect to {}-MDT{:04x}: {}",
                        args.fsname, mdt_index, e
                    );
                } else {
                    // Multiple MDTs specified, just note which ones failed
                    eprintln!(
                        "Warning: {}-MDT{:04x} has no changelog consumer",
                        args.fsname, mdt_index
                    );
                }
                // Propagate the error - don't continue with other MDTs
                return Err(e);
            }
        }
    }

    if readers.is_empty() {
        return Err("No MDTs could be connected".into());
    }

    if args.verbose {
        eprintln!("Successfully connected to {} MDTs", readers.len());
    }

    // Monitor records from all readers
    let mut total_records = 0u64;
    let start_time = Utc::now();

    loop {
        let mut any_activity = false;

        for mdt_reader in &readers {
            // Try to get a record from this MDT
            match mdt_reader.reader.recv_with_converter(&mdt_reader.converter) {
                Ok(Some(record)) => {
                    any_activity = true;
                    total_records += 1;

                    let output = ChangelogOutput {
                        mdt: mdt_reader.device.clone(),
                        record,
                        timestamp: Utc::now(),
                    };

                    // Output as JSON
                    match serde_json::to_string(&output) {
                        Ok(json) => println!("{}", json),
                        Err(e) => {
                            eprintln!("Failed to serialize record to JSON: {}", e);
                        }
                    }

                    // Check if we've hit the record limit
                    if args.count > 0 && total_records >= args.count {
                        if args.verbose {
                            eprintln!("Reached record limit of {}", args.count);
                        }
                        return Ok(());
                    }
                }
                Ok(None) => {
                    // No records available from this MDT
                }
                Err(e) => {
                    if args.verbose {
                        eprintln!("Error reading from {}: {}", mdt_reader.device, e);
                    }
                    // For non-verbose mode, we'll skip logging repeated errors
                    // since they're likely due to the MDT not having a changelog consumer
                }
            }
        }

        // If we're not following and no activity, exit
        if !args.follow && !any_activity {
            if args.verbose {
                eprintln!("No more records available, exiting");
            }
            break;
        }

        // If no activity and we're following, sleep briefly to avoid busy waiting
        if !any_activity {
            thread::sleep(Duration::from_millis(100));
        }
    }

    if args.verbose {
        let duration = Utc::now().signed_duration_since(start_time).num_seconds();
        eprintln!(
            "Processed {} records in {} seconds",
            total_records, duration
        );
    }

    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_args_parsing() {
        // Test basic argument parsing
        let args = Args::parse_from(["changelog_monitor", "--fsname", "lustre", "--mdts", "0,1,2"]);

        assert_eq!(args.fsname, "lustre");
        assert_eq!(args.mdts, Some(vec![0, 1, 2]));
        assert_eq!(args.start_record, 0);
        assert!(!args.follow);
    }

    #[test]
    fn test_args_with_flags() {
        let args = Args::parse_from([
            "changelog_monitor",
            "--fsname",
            "test",
            "--mdts",
            "0",
            "--follow",
            "--block",
            "--no-jobid",
            "--no-uidgid",
            "--count",
            "100",
            "--time-format",
            "local",
        ]);

        assert_eq!(args.fsname, "test");
        assert_eq!(args.mdts, Some(vec![0]));
        assert!(args.follow);
        assert!(args.block);
        assert!(args.no_jobid);
        assert!(args.no_uidgid);
        assert_eq!(args.count, 100);
        assert_eq!(args.time_format, "local");
    }

    #[test]
    fn test_args_default_mdt() {
        // Test that default behavior works without specifying MDTs
        let args = Args::parse_from(["changelog_monitor", "--fsname", "lustre"]);

        assert_eq!(args.fsname, "lustre");
        assert_eq!(args.mdts, None); // Should default to None, then be converted to vec![0]
        assert_eq!(args.start_record, 0);
        assert!(!args.follow);
        assert_eq!(args.time_format, "unix"); // Should default to unix
    }

    #[test]
    fn test_time_format_options() {
        let args_unix = Args::parse_from([
            "changelog_monitor",
            "--fsname",
            "lustre",
            "--time-format",
            "unix",
        ]);
        assert_eq!(args_unix.time_format, "unix");

        let args_local = Args::parse_from([
            "changelog_monitor",
            "--fsname",
            "lustre",
            "--time-format",
            "local",
        ]);
        assert_eq!(args_local.time_format, "local");

        let args_utc = Args::parse_from([
            "changelog_monitor",
            "--fsname",
            "lustre",
            "--time-format",
            "utc",
        ]);
        assert_eq!(args_utc.time_format, "utc");
    }
}