nomad-temporal-jobs

nodecleanup-activities

import "munchbox/temporal-workers/nodecleanup/activities"

Index

func HumanBytes

func HumanBytes(n int64) string

HumanBytes renders a byte count in a compact human-friendly form (KiB, MiB, GiB) matching the shape of `du -h`. Exported so the workflow can format the before/after/reclaimed sizes consistently.

type Activities

Activities holds shared dependencies for node cleanup activities. Register an instance with the Temporal worker to expose all exported methods as activity implementations.

type Activities struct {
    // contains filtered or unexported fields
}

func New

func New(cfg Config) *Activities

New creates an Activities instance with validated configuration.

func (*Activities) CleanupNodeViaSSH

func (a *Activities) CleanupNodeViaSSH(ctx context.Context, node NodeInfo, config CleanupConfig) (CleanupResult, error)

CleanupNodeViaSSH connects to a node over SSH and executes a cleanup script that identifies and optionally removes orphaned job data directories. Returns detailed results including counts and any errors.

func (*Activities) FindRegistryNode

func (a *Activities) FindRegistryNode(ctx context.Context, jobName string) (NodeInfo, error)

FindRegistryNode queries the Nomad API for the running alloc of the registry job and returns the NodeInfo for SSH dialing. Wraps a “no running alloc” condition as a non-retryable error so the workflow fails fast instead of retry-storming on a terminally-misconfigured cluster.

func (*Activities) GetAllNomadClientNodes

func (a *Activities) GetAllNomadClientNodes(ctx context.Context) ([]NodeInfo, error)

GetAllNomadClientNodes retrieves all ready Nomad client nodes with their SSH addresses and node metadata. Creates a client span to Nomad for service graph visibility.

func (*Activities) MeasureRegistryDataDir

func (a *Activities) MeasureRegistryDataDir(ctx context.Context, node NodeInfo, dataDir string) (int64, error)

MeasureRegistryDataDir returns the size in bytes of the registry’s bind-mounted storage directory on the given node. Used for before/after reporting. SSH-only because /mnt/gdrive is host-side; the Nomad API doesn’t expose disk usage.

func (*Activities) RunRegistryGarbageCollect

func (a *Activities) RunRegistryGarbageCollect(ctx context.Context, node NodeInfo, config RegistryGCConfig) (RegistryGCRunResult, error)

RunRegistryGarbageCollect SSHes to the registry host and runs the docker garbage-collect command against the bind-mounted storage. This is the long-running step (multi-GB registries take minutes); it heartbeats periodically and reports the count of “blob eligible for deletion” lines emitted by the registry tool.

Configured with MaxAttempts=1 by the workflow — a partial GC run shouldn’t be repeated; let the deferred scale-back put the registry online instead and surface the failure.

func (*Activities) ScaleRegistry

func (a *Activities) ScaleRegistry(ctx context.Context, jobName, groupName string, count int) error

ScaleRegistry scales the named Nomad job’s task group to the target count. Idempotent — Nomad accepts the call when the job is already at the requested count and returns success. Used both to scale down to 0 before GC and to scale back to 1 in the deferred compensation. A “job not found” error is wrapped as non-retryable; transient API errors surface plain so Temporal retries per the activity’s RetryPolicy.

func (*Activities) WaitRegistryAllocRunning

func (a *Activities) WaitRegistryAllocRunning(ctx context.Context, jobName string) error

WaitRegistryAllocRunning polls the Nomad API until the named job has at least one running allocation (i.e. the scale-up succeeded and a new alloc passed its start sequence). Bounded by the activity’s StartToCloseTimeout.

func (*Activities) WaitRegistryAllocsDrained

func (a *Activities) WaitRegistryAllocsDrained(ctx context.Context, jobName string) error

WaitRegistryAllocsDrained polls the Nomad API until the named job has zero running allocations. Heartbeats every poll. Bounded by the activity’s StartToCloseTimeout (set on the workflow side); returns ctx.Err() when exceeded.

type CleanupConfig

CleanupConfig holds workflow-level configuration passed as input.

type CleanupConfig struct {
    DataDir     string `json:"data_dir"`     // Base directory to scan (default: /opt/nomad/data)
    GraceDays   int    `json:"grace_days"`   // Only delete directories older than this (default: 7)
    DryRun      bool   `json:"dry_run"`      // If true, only report what would be deleted
    DockerPrune bool   `json:"docker_prune"` // If true, also prune unused Docker images
}

type CleanupResult

CleanupResult holds the outcome of a cleanup operation on a single node.

type CleanupResult struct {
    NodeName         string   `json:"node_name"`
    NodeAddr         string   `json:"node_addr"`
    Scanned          int      `json:"scanned"`
    Orphaned         int      `json:"orphaned"`
    Deleted          int      `json:"deleted"`
    Skipped          int      `json:"skipped"`
    DockerSpaceFreed string   `json:"docker_space_freed"`
    Errors           []string `json:"errors,omitempty"`
    Output           string   `json:"output"`
}

type Config

Config holds SSH-related settings for node cleanup activities.

type Config struct {
    SSHKeyPath    string // Path to SSH private key (default: /root/.ssh/id_ed25519)
    SSHCertPath   string // Path to SSH client certificate (default: /root/.ssh/id_ed25519-cert.pub)
    SSHHostCAPath string // Path to SSH host CA public key (default: /root/.ssh/ssh-host-ca.pub)
}

func (*Config) Validate

func (c *Config) Validate()

Validate applies defaults for optional fields.

type NodeInfo

NodeInfo contains information about a Nomad client node needed for SSH connection and cleanup script execution.

type NodeInfo struct {
    ID       string `json:"id"`
    Name     string `json:"name"`
    Address  string `json:"address"`
    HTTPAddr string `json:"http_addr"` // Nomad agent HTTP address (e.g., "10.200.0.11:4646")
    IsOracle bool   `json:"is_oracle"` // Oracle nodes use ubuntu user instead of root
}

type RegistryGCConfig

RegistryGCConfig holds workflow-level configuration passed as input.

type RegistryGCConfig struct {
    // JobName identifies the registry's Nomad job. Defaults to "registry".
    JobName string `json:"job_name"`
    // GroupName is the task group inside the job to scale. Defaults to the
    // JobName (matches the convention used by the munchbox-service pack).
    GroupName string `json:"group_name"`
    // RegistryDataDir is the host path bind-mounted into the registry
    // container as /var/lib/registry. Defaults to
    // "/mnt/gdrive/munchbox-data/registry".
    RegistryDataDir string `json:"registry_data_dir"`
    // RegistryImage is the docker image used for the one-shot GC run.
    // Should match the running registry's image. Defaults to "registry:3".
    RegistryImage string `json:"registry_image"`
    // DryRun runs garbage-collect with --dry-run, logging blobs that
    // would be deleted without actually freeing space.
    DryRun bool `json:"dry_run"`
    // DeleteUntagged tells GC to also remove manifests not referenced by
    // any tag (and the blobs they reference). Default true — without it
    // every tag overwrite (e.g. CI re-pushing :latest) leaves an
    // orphaned manifest forever.
    DeleteUntagged bool `json:"delete_untagged"`
}

func (*RegistryGCConfig) ApplyDefaults

func (c *RegistryGCConfig) ApplyDefaults()

ApplyDefaults fills in unset fields with their defaults. Called by the workflow before any activities run so every activity sees a fully populated config and the values are deterministic across replay.

type RegistryGCResult

RegistryGCResult holds the workflow-level outcome reported back to the trigger / caller.

type RegistryGCResult struct {
    NodeName       string `json:"node_name"`
    NodeAddr       string `json:"node_addr"`
    BlobsDeleted   int    `json:"blobs_deleted"`
    BytesReclaimed string `json:"bytes_reclaimed"`
    BeforeBytes    string `json:"before_bytes"`
    AfterBytes     string `json:"after_bytes"`
    DryRun         bool   `json:"dry_run"`
}

type RegistryGCRunResult

RegistryGCRunResult is the small struct returned by RunRegistryGarbageCollect. The workflow folds it into RegistryGCResult.

type RegistryGCRunResult struct {
    BlobsDeleted int    `json:"blobs_deleted"`
    Output       string `json:"output"`
}

Generated by gomarkdoc