Skip to content

Fleet API Reference

The Fleet module provides classes and functions for managing multiple OpenWRT devices.

Fleet Configuration

FleetDefaults

Bases: BaseModel

Default settings applied to all devices in the fleet.

Source code in src/wrtkit/fleet.py
class FleetDefaults(BaseModel):
    """Default settings applied to all devices in the fleet."""

    timeout: int = Field(default=30, description="Connection timeout in seconds")
    username: str = Field(default="root", description="Default SSH username")
    commit_delay: int = Field(default=10, description="Seconds to wait before coordinated commit")

FleetDevice

Bases: BaseModel

Definition of a single device in the fleet.

Source code in src/wrtkit/fleet.py
class FleetDevice(BaseModel):
    """Definition of a single device in the fleet."""

    target: str = Field(..., description="Device address (IP, hostname, or serial port)")
    username: Optional[str] = Field(default=None, description="SSH username override")
    password: Optional[str] = Field(default=None, description="SSH/login password")
    key_file: Optional[str] = Field(default=None, description="SSH private key file")
    timeout: Optional[int] = Field(default=None, description="Connection timeout override")
    configs: List[str] = Field(default_factory=list, description="Config files to merge")
    tags: List[str] = Field(default_factory=list, description="Tags for filtering")

FleetConfig

Bases: BaseModel

Fleet inventory configuration.

Source code in src/wrtkit/fleet.py
class FleetConfig(BaseModel):
    """Fleet inventory configuration."""

    defaults: FleetDefaults = Field(default_factory=FleetDefaults)
    config_layers: Dict[str, str] = Field(
        default_factory=dict, description="Named config file paths for interpolation"
    )
    devices: Dict[str, FleetDevice] = Field(
        default_factory=dict, description="Device definitions keyed by name"
    )

Fleet Functions

load_fleet(fleet_file)

Load a fleet inventory file.

Uses OmegaConf for variable interpolation including: - Environment variables: ${oc.env:VAR_NAME} - Config layer references: ${config_layers.base}

Parameters:

Name Type Description Default
fleet_file str

Path to the fleet YAML file

required

Returns:

Type Description
FleetConfig

FleetConfig instance

Source code in src/wrtkit/fleet.py
def load_fleet(fleet_file: str) -> FleetConfig:
    """
    Load a fleet inventory file.

    Uses OmegaConf for variable interpolation including:
    - Environment variables: ${oc.env:VAR_NAME}
    - Config layer references: ${config_layers.base}

    Args:
        fleet_file: Path to the fleet YAML file

    Returns:
        FleetConfig instance
    """
    fleet_path = Path(fleet_file)
    if not fleet_path.exists():
        raise FileNotFoundError(f"Fleet file not found: {fleet_file}")

    with open(fleet_path, "r") as f:
        yaml_content = f.read()

    # Load through OmegaConf for interpolation
    omega_conf = OmegaConf.create(yaml_content)

    # Resolve interpolations
    data = OmegaConf.to_container(omega_conf, resolve=True)
    if not isinstance(data, dict):
        raise ValueError("Fleet file must be a YAML dictionary")

    return FleetConfig.model_validate(data)

merge_device_configs(device, fleet_path)

Merge multiple config files for a device using OmegaConf.

Config files are merged in order, with later files overriding earlier ones.

Parameters:

Name Type Description Default
device FleetDevice

The device definition with config file list

required
fleet_path Path

Path to the fleet file (for resolving relative paths)

required

Returns:

Type Description
UCIConfig

Merged UCIConfig instance

Source code in src/wrtkit/fleet.py
def merge_device_configs(
    device: FleetDevice,
    fleet_path: Path,
) -> UCIConfig:
    """
    Merge multiple config files for a device using OmegaConf.

    Config files are merged in order, with later files overriding earlier ones.

    Args:
        device: The device definition with config file list
        fleet_path: Path to the fleet file (for resolving relative paths)

    Returns:
        Merged UCIConfig instance
    """
    if not device.configs:
        return UCIConfig()

    base_dir = fleet_path.parent

    # Load and merge all config files
    merged_omega: Optional[Any] = None

    for config_path in device.configs:
        # Resolve relative paths from fleet file location
        full_path = (
            base_dir / config_path if not Path(config_path).is_absolute() else Path(config_path)
        )

        if not full_path.exists():
            raise FileNotFoundError(f"Config file not found: {full_path}")

        with open(full_path, "r") as f:
            yaml_content = f.read()

        config_omega = OmegaConf.create(yaml_content)

        if merged_omega is None:
            merged_omega = config_omega
        else:
            merged_omega = OmegaConf.merge(merged_omega, config_omega)

    if merged_omega is None:
        return UCIConfig()

    # Resolve all interpolations and convert to dict
    data = OmegaConf.to_container(merged_omega, resolve=True)
    if not isinstance(data, dict):
        raise ValueError("Merged config must be a dictionary")

    return UCIConfig.from_dict(cast(Dict[str, Any], data))

filter_devices(fleet, target=None, tags=None)

Filter fleet devices by name/glob pattern and/or tags.

Parameters:

Name Type Description Default
fleet FleetConfig

The fleet configuration

required
target Optional[str]

Device name or glob pattern (e.g., "ap-*")

None
tags Optional[List[str]]

List of tags (AND logic - device must have all tags)

None

Returns:

Type Description
Dict[str, FleetDevice]

Dictionary of matching devices keyed by name

Source code in src/wrtkit/fleet.py
def filter_devices(
    fleet: FleetConfig,
    target: Optional[str] = None,
    tags: Optional[List[str]] = None,
) -> Dict[str, FleetDevice]:
    """
    Filter fleet devices by name/glob pattern and/or tags.

    Args:
        fleet: The fleet configuration
        target: Device name or glob pattern (e.g., "ap-*")
        tags: List of tags (AND logic - device must have all tags)

    Returns:
        Dictionary of matching devices keyed by name
    """
    result: Dict[str, FleetDevice] = {}

    for name, device in fleet.devices.items():
        # Check target filter (name or glob)
        if target is not None:
            if not fnmatch.fnmatch(name, target):
                continue

        # Check tags filter (AND logic)
        if tags is not None:
            device_tags: Set[str] = set(device.tags)
            required_tags: Set[str] = set(tags)
            if not required_tags.issubset(device_tags):
                continue

        result[name] = device

    return result

get_device_connection_params(device, defaults)

Get connection parameters for a device, applying defaults.

Parameters:

Name Type Description Default
device FleetDevice

The device definition

required
defaults FleetDefaults

Fleet defaults to apply

required

Returns:

Type Description
Dict[str, Any]

Dictionary with connection parameters

Source code in src/wrtkit/fleet.py
def get_device_connection_params(
    device: FleetDevice,
    defaults: FleetDefaults,
) -> Dict[str, Any]:
    """
    Get connection parameters for a device, applying defaults.

    Args:
        device: The device definition
        defaults: Fleet defaults to apply

    Returns:
        Dictionary with connection parameters
    """
    return {
        "target": device.target,
        "username": device.username or defaults.username,
        "password": device.password,
        "key_file": device.key_file,
        "timeout": device.timeout or defaults.timeout,
    }

Fleet Executor

DeviceResult dataclass

Result of operations on a single device.

Source code in src/wrtkit/fleet_executor.py
@dataclass
class DeviceResult:
    """Result of operations on a single device."""

    name: str
    target: str
    success: bool
    error: Optional[str] = None
    diff: Optional[ConfigDiff] = None
    changes_count: int = 0

FleetResult dataclass

Result of fleet-wide operations.

Source code in src/wrtkit/fleet_executor.py
@dataclass
class FleetResult:
    """Result of fleet-wide operations."""

    devices: Dict[str, DeviceResult] = field(default_factory=dict)
    phase: str = "unknown"
    aborted: bool = False
    abort_reason: Optional[str] = None

    @property
    def success_count(self) -> int:
        return sum(1 for d in self.devices.values() if d.success)

    @property
    def failure_count(self) -> int:
        return sum(1 for d in self.devices.values() if not d.success)

    @property
    def total_count(self) -> int:
        return len(self.devices)

    @property
    def all_successful(self) -> bool:
        return all(d.success for d in self.devices.values())

FleetExecutor

Executes fleet operations with two-phase coordinated updates.

Phase 1 (Stage): Push UCI commands to all devices in parallel without committing. Fail fast if any device fails. Phase 2 (Commit): Send coordinated commit commands to all devices simultaneously.

Source code in src/wrtkit/fleet_executor.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
class FleetExecutor:
    """
    Executes fleet operations with two-phase coordinated updates.

    Phase 1 (Stage): Push UCI commands to all devices in parallel without committing.
                     Fail fast if any device fails.
    Phase 2 (Commit): Send coordinated commit commands to all devices simultaneously.
    """

    def __init__(
        self,
        fleet: FleetConfig,
        fleet_path: Path,
        on_device_start: Optional[Callable[[str, str], None]] = None,
        on_device_complete: Optional[Callable[[str, DeviceResult], None]] = None,
        on_phase_start: Optional[Callable[[str], None]] = None,
    ):
        """
        Initialize fleet executor.

        Args:
            fleet: Fleet configuration
            fleet_path: Path to fleet file (for resolving relative paths)
            on_device_start: Callback when starting work on a device (name, target)
            on_device_complete: Callback when device work completes (name, result)
            on_phase_start: Callback when a phase starts (phase_name)
        """
        self.fleet = fleet
        self.fleet_path = fleet_path
        self.on_device_start = on_device_start
        self.on_device_complete = on_device_complete
        self.on_phase_start = on_phase_start
        self._connections: Dict[str, Connection] = {}
        self._staged_devices: Dict[str, ConfigDiff] = {}

    def _notify_device_start(self, name: str, target: str) -> None:
        if self.on_device_start:
            self.on_device_start(name, target)

    def _notify_device_complete(self, name: str, result: DeviceResult) -> None:
        if self.on_device_complete:
            self.on_device_complete(name, result)

    def _notify_phase_start(self, phase: str) -> None:
        if self.on_phase_start:
            self.on_phase_start(phase)

    def preview(
        self,
        target: Optional[str] = None,
        tags: Optional[List[str]] = None,
        max_workers: int = 5,
    ) -> FleetResult:
        """
        Preview changes for targeted devices without applying.

        Args:
            target: Device name or glob pattern
            tags: List of tags to filter by
            max_workers: Maximum parallel connections

        Returns:
            FleetResult with diff information for each device
        """
        devices = filter_devices(self.fleet, target, tags)
        result = FleetResult(phase="preview")

        if not devices:
            return result

        self._notify_phase_start("preview")

        def preview_device(name: str, device: FleetDevice) -> DeviceResult:
            self._notify_device_start(name, device.target)
            try:
                # Merge configs for this device
                config = merge_device_configs(device, self.fleet_path)

                # Create connection
                params = get_device_connection_params(device, self.fleet.defaults)
                conn = create_connection(
                    target=params["target"],
                    password=params["password"],
                    key_file=params["key_file"],
                    timeout=params["timeout"],
                    username=params["username"],
                )

                with conn:
                    # Compute diff
                    diff = config.diff(conn, show_remote_only=True, verbose=False)  # type: ignore[arg-type]

                    changes = len(diff.to_add) + len(diff.to_modify) + len(diff.to_remove)
                    return DeviceResult(
                        name=name,
                        target=device.target,
                        success=True,
                        diff=diff,
                        changes_count=changes,
                    )

            except Exception as e:
                return DeviceResult(
                    name=name,
                    target=device.target,
                    success=False,
                    error=str(e),
                )

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(preview_device, name, device): name
                for name, device in devices.items()
            }

            for future in as_completed(futures):
                name = futures[future]
                try:
                    device_result = future.result()
                except Exception as e:
                    device_result = DeviceResult(
                        name=name,
                        target=devices[name].target,
                        success=False,
                        error=str(e),
                    )

                result.devices[name] = device_result
                self._notify_device_complete(name, device_result)

        return result

    def stage(
        self,
        target: Optional[str] = None,
        tags: Optional[List[str]] = None,
        max_workers: int = 5,
        remove_unmanaged: bool = False,
    ) -> FleetResult:
        """
        Stage changes to devices (Phase 1).

        Pushes UCI commands without committing. Fails fast on any error.

        Args:
            target: Device name or glob pattern
            tags: List of tags to filter by
            max_workers: Maximum parallel connections
            remove_unmanaged: Remove settings not in config

        Returns:
            FleetResult with staging results
        """
        devices = filter_devices(self.fleet, target, tags)
        result = FleetResult(phase="stage")
        self._staged_devices.clear()
        self._connections.clear()

        if not devices:
            return result

        self._notify_phase_start("stage")

        def stage_device(name: str, device: FleetDevice) -> DeviceResult:
            self._notify_device_start(name, device.target)
            try:
                # Merge configs for this device
                config = merge_device_configs(device, self.fleet_path)

                # Create connection
                params = get_device_connection_params(device, self.fleet.defaults)
                conn = create_connection(
                    target=params["target"],
                    password=params["password"],
                    key_file=params["key_file"],
                    timeout=params["timeout"],
                    username=params["username"],
                )

                conn.connect()

                # Apply diff without commit/reload
                diff = config.apply_diff(
                    conn,  # type: ignore[arg-type]
                    remove_unmanaged=remove_unmanaged,
                    dry_run=False,
                    auto_commit=False,
                    auto_reload=False,
                    verbose=False,
                )

                changes = len(diff.to_add) + len(diff.to_modify) + len(diff.to_remove)

                # Keep connection open for commit phase
                self._connections[name] = conn
                self._staged_devices[name] = diff

                return DeviceResult(
                    name=name,
                    target=device.target,
                    success=True,
                    diff=diff,
                    changes_count=changes,
                )

            except Exception as e:
                return DeviceResult(
                    name=name,
                    target=device.target,
                    success=False,
                    error=str(e),
                )

        # Execute staging in parallel
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(stage_device, name, device): name
                for name, device in devices.items()
            }

            for future in as_completed(futures):
                name = futures[future]
                try:
                    device_result = future.result()
                except Exception as e:
                    device_result = DeviceResult(
                        name=name,
                        target=devices[name].target,
                        success=False,
                        error=str(e),
                    )

                result.devices[name] = device_result
                self._notify_device_complete(name, device_result)

                # Fail fast: abort if any device fails
                if not device_result.success:
                    result.aborted = True
                    result.abort_reason = f"Device '{name}' failed: {device_result.error}"
                    # Cancel remaining futures (best effort)
                    for f in futures:
                        f.cancel()
                    break

        # If aborted, rollback staged changes
        if result.aborted:
            self._rollback_all()

        return result

    def commit(self, delay: Optional[int] = None) -> FleetResult:
        """
        Commit staged changes on all devices (Phase 2).

        Sends coordinated commit commands with optional delay for synchronization.

        Args:
            delay: Seconds to delay before commit (uses fleet default if None)

        Returns:
            FleetResult with commit results
        """
        result = FleetResult(phase="commit")

        if not self._connections:
            return result

        commit_delay = delay if delay is not None else self.fleet.defaults.commit_delay
        self._notify_phase_start("commit")

        def commit_device(name: str, conn: Connection) -> DeviceResult:
            target = conn.host if hasattr(conn, "host") else getattr(conn, "port", "unknown")
            self._notify_device_start(name, str(target))
            try:
                # Execute commit and reload via background command
                # This ensures all devices start the commit at roughly the same time
                commit_cmd = (
                    f"nohup sh -c 'sleep {commit_delay} && "
                    f"uci commit && "
                    f"/etc/init.d/network restart && "
                    f"wifi reload' > /dev/null 2>&1 &"
                )

                conn.execute(commit_cmd)

                return DeviceResult(
                    name=name,
                    target=str(target),
                    success=True,
                )

            except Exception as e:
                return DeviceResult(
                    name=name,
                    target=str(target),
                    success=False,
                    error=str(e),
                )
            finally:
                try:
                    conn.disconnect()
                except Exception:
                    pass

        # Send commit commands to all devices in parallel
        with ThreadPoolExecutor(max_workers=len(self._connections)) as executor:
            futures = {
                executor.submit(commit_device, name, conn): name
                for name, conn in self._connections.items()
            }

            for future in as_completed(futures):
                name = futures[future]
                try:
                    device_result = future.result()
                except Exception as e:
                    device_result = DeviceResult(
                        name=name,
                        target="unknown",
                        success=False,
                        error=str(e),
                    )

                result.devices[name] = device_result
                self._notify_device_complete(name, device_result)

        self._connections.clear()
        self._staged_devices.clear()

        return result

    def apply(
        self,
        target: Optional[str] = None,
        tags: Optional[List[str]] = None,
        max_workers: int = 5,
        remove_unmanaged: bool = False,
        commit_delay: Optional[int] = None,
    ) -> tuple[FleetResult, FleetResult]:
        """
        Apply changes to fleet devices with two-phase execution.

        Phase 1: Stage all changes in parallel (fail fast)
        Phase 2: Coordinated commit if staging succeeded

        Args:
            target: Device name or glob pattern
            tags: List of tags to filter by
            max_workers: Maximum parallel connections
            remove_unmanaged: Remove settings not in config
            commit_delay: Override default commit delay

        Returns:
            Tuple of (stage_result, commit_result)
        """
        # Phase 1: Stage
        stage_result = self.stage(
            target=target,
            tags=tags,
            max_workers=max_workers,
            remove_unmanaged=remove_unmanaged,
        )

        # If staging failed or was aborted, don't proceed to commit
        if stage_result.aborted or not stage_result.all_successful:
            return stage_result, FleetResult(phase="commit", aborted=True)

        # Phase 2: Commit
        commit_result = self.commit(delay=commit_delay)

        return stage_result, commit_result

    def _rollback_all(self) -> None:
        """Rollback staged changes on all devices."""
        for name, conn in self._connections.items():
            try:
                # Revert UCI changes
                conn.execute("uci revert")
                conn.disconnect()
            except Exception:
                pass

        self._connections.clear()
        self._staged_devices.clear()

    def cleanup(self) -> None:
        """Clean up any open connections."""
        for conn in self._connections.values():
            try:
                conn.disconnect()
            except Exception:
                pass
        self._connections.clear()
        self._staged_devices.clear()

__init__(fleet, fleet_path, on_device_start=None, on_device_complete=None, on_phase_start=None)

Initialize fleet executor.

Parameters:

Name Type Description Default
fleet FleetConfig

Fleet configuration

required
fleet_path Path

Path to fleet file (for resolving relative paths)

required
on_device_start Optional[Callable[[str, str], None]]

Callback when starting work on a device (name, target)

None
on_device_complete Optional[Callable[[str, DeviceResult], None]]

Callback when device work completes (name, result)

None
on_phase_start Optional[Callable[[str], None]]

Callback when a phase starts (phase_name)

None
Source code in src/wrtkit/fleet_executor.py
def __init__(
    self,
    fleet: FleetConfig,
    fleet_path: Path,
    on_device_start: Optional[Callable[[str, str], None]] = None,
    on_device_complete: Optional[Callable[[str, DeviceResult], None]] = None,
    on_phase_start: Optional[Callable[[str], None]] = None,
):
    """
    Initialize fleet executor.

    Args:
        fleet: Fleet configuration
        fleet_path: Path to fleet file (for resolving relative paths)
        on_device_start: Callback when starting work on a device (name, target)
        on_device_complete: Callback when device work completes (name, result)
        on_phase_start: Callback when a phase starts (phase_name)
    """
    self.fleet = fleet
    self.fleet_path = fleet_path
    self.on_device_start = on_device_start
    self.on_device_complete = on_device_complete
    self.on_phase_start = on_phase_start
    self._connections: Dict[str, Connection] = {}
    self._staged_devices: Dict[str, ConfigDiff] = {}

preview(target=None, tags=None, max_workers=5)

Preview changes for targeted devices without applying.

Parameters:

Name Type Description Default
target Optional[str]

Device name or glob pattern

None
tags Optional[List[str]]

List of tags to filter by

None
max_workers int

Maximum parallel connections

5

Returns:

Type Description
FleetResult

FleetResult with diff information for each device

Source code in src/wrtkit/fleet_executor.py
def preview(
    self,
    target: Optional[str] = None,
    tags: Optional[List[str]] = None,
    max_workers: int = 5,
) -> FleetResult:
    """
    Preview changes for targeted devices without applying.

    Args:
        target: Device name or glob pattern
        tags: List of tags to filter by
        max_workers: Maximum parallel connections

    Returns:
        FleetResult with diff information for each device
    """
    devices = filter_devices(self.fleet, target, tags)
    result = FleetResult(phase="preview")

    if not devices:
        return result

    self._notify_phase_start("preview")

    def preview_device(name: str, device: FleetDevice) -> DeviceResult:
        self._notify_device_start(name, device.target)
        try:
            # Merge configs for this device
            config = merge_device_configs(device, self.fleet_path)

            # Create connection
            params = get_device_connection_params(device, self.fleet.defaults)
            conn = create_connection(
                target=params["target"],
                password=params["password"],
                key_file=params["key_file"],
                timeout=params["timeout"],
                username=params["username"],
            )

            with conn:
                # Compute diff
                diff = config.diff(conn, show_remote_only=True, verbose=False)  # type: ignore[arg-type]

                changes = len(diff.to_add) + len(diff.to_modify) + len(diff.to_remove)
                return DeviceResult(
                    name=name,
                    target=device.target,
                    success=True,
                    diff=diff,
                    changes_count=changes,
                )

        except Exception as e:
            return DeviceResult(
                name=name,
                target=device.target,
                success=False,
                error=str(e),
            )

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(preview_device, name, device): name
            for name, device in devices.items()
        }

        for future in as_completed(futures):
            name = futures[future]
            try:
                device_result = future.result()
            except Exception as e:
                device_result = DeviceResult(
                    name=name,
                    target=devices[name].target,
                    success=False,
                    error=str(e),
                )

            result.devices[name] = device_result
            self._notify_device_complete(name, device_result)

    return result

stage(target=None, tags=None, max_workers=5, remove_unmanaged=False)

Stage changes to devices (Phase 1).

Pushes UCI commands without committing. Fails fast on any error.

Parameters:

Name Type Description Default
target Optional[str]

Device name or glob pattern

None
tags Optional[List[str]]

List of tags to filter by

None
max_workers int

Maximum parallel connections

5
remove_unmanaged bool

Remove settings not in config

False

Returns:

Type Description
FleetResult

FleetResult with staging results

Source code in src/wrtkit/fleet_executor.py
def stage(
    self,
    target: Optional[str] = None,
    tags: Optional[List[str]] = None,
    max_workers: int = 5,
    remove_unmanaged: bool = False,
) -> FleetResult:
    """
    Stage changes to devices (Phase 1).

    Pushes UCI commands without committing. Fails fast on any error.

    Args:
        target: Device name or glob pattern
        tags: List of tags to filter by
        max_workers: Maximum parallel connections
        remove_unmanaged: Remove settings not in config

    Returns:
        FleetResult with staging results
    """
    devices = filter_devices(self.fleet, target, tags)
    result = FleetResult(phase="stage")
    self._staged_devices.clear()
    self._connections.clear()

    if not devices:
        return result

    self._notify_phase_start("stage")

    def stage_device(name: str, device: FleetDevice) -> DeviceResult:
        self._notify_device_start(name, device.target)
        try:
            # Merge configs for this device
            config = merge_device_configs(device, self.fleet_path)

            # Create connection
            params = get_device_connection_params(device, self.fleet.defaults)
            conn = create_connection(
                target=params["target"],
                password=params["password"],
                key_file=params["key_file"],
                timeout=params["timeout"],
                username=params["username"],
            )

            conn.connect()

            # Apply diff without commit/reload
            diff = config.apply_diff(
                conn,  # type: ignore[arg-type]
                remove_unmanaged=remove_unmanaged,
                dry_run=False,
                auto_commit=False,
                auto_reload=False,
                verbose=False,
            )

            changes = len(diff.to_add) + len(diff.to_modify) + len(diff.to_remove)

            # Keep connection open for commit phase
            self._connections[name] = conn
            self._staged_devices[name] = diff

            return DeviceResult(
                name=name,
                target=device.target,
                success=True,
                diff=diff,
                changes_count=changes,
            )

        except Exception as e:
            return DeviceResult(
                name=name,
                target=device.target,
                success=False,
                error=str(e),
            )

    # Execute staging in parallel
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(stage_device, name, device): name
            for name, device in devices.items()
        }

        for future in as_completed(futures):
            name = futures[future]
            try:
                device_result = future.result()
            except Exception as e:
                device_result = DeviceResult(
                    name=name,
                    target=devices[name].target,
                    success=False,
                    error=str(e),
                )

            result.devices[name] = device_result
            self._notify_device_complete(name, device_result)

            # Fail fast: abort if any device fails
            if not device_result.success:
                result.aborted = True
                result.abort_reason = f"Device '{name}' failed: {device_result.error}"
                # Cancel remaining futures (best effort)
                for f in futures:
                    f.cancel()
                break

    # If aborted, rollback staged changes
    if result.aborted:
        self._rollback_all()

    return result

commit(delay=None)

Commit staged changes on all devices (Phase 2).

Sends coordinated commit commands with optional delay for synchronization.

Parameters:

Name Type Description Default
delay Optional[int]

Seconds to delay before commit (uses fleet default if None)

None

Returns:

Type Description
FleetResult

FleetResult with commit results

Source code in src/wrtkit/fleet_executor.py
def commit(self, delay: Optional[int] = None) -> FleetResult:
    """
    Commit staged changes on all devices (Phase 2).

    Sends coordinated commit commands with optional delay for synchronization.

    Args:
        delay: Seconds to delay before commit (uses fleet default if None)

    Returns:
        FleetResult with commit results
    """
    result = FleetResult(phase="commit")

    if not self._connections:
        return result

    commit_delay = delay if delay is not None else self.fleet.defaults.commit_delay
    self._notify_phase_start("commit")

    def commit_device(name: str, conn: Connection) -> DeviceResult:
        target = conn.host if hasattr(conn, "host") else getattr(conn, "port", "unknown")
        self._notify_device_start(name, str(target))
        try:
            # Execute commit and reload via background command
            # This ensures all devices start the commit at roughly the same time
            commit_cmd = (
                f"nohup sh -c 'sleep {commit_delay} && "
                f"uci commit && "
                f"/etc/init.d/network restart && "
                f"wifi reload' > /dev/null 2>&1 &"
            )

            conn.execute(commit_cmd)

            return DeviceResult(
                name=name,
                target=str(target),
                success=True,
            )

        except Exception as e:
            return DeviceResult(
                name=name,
                target=str(target),
                success=False,
                error=str(e),
            )
        finally:
            try:
                conn.disconnect()
            except Exception:
                pass

    # Send commit commands to all devices in parallel
    with ThreadPoolExecutor(max_workers=len(self._connections)) as executor:
        futures = {
            executor.submit(commit_device, name, conn): name
            for name, conn in self._connections.items()
        }

        for future in as_completed(futures):
            name = futures[future]
            try:
                device_result = future.result()
            except Exception as e:
                device_result = DeviceResult(
                    name=name,
                    target="unknown",
                    success=False,
                    error=str(e),
                )

            result.devices[name] = device_result
            self._notify_device_complete(name, device_result)

    self._connections.clear()
    self._staged_devices.clear()

    return result

apply(target=None, tags=None, max_workers=5, remove_unmanaged=False, commit_delay=None)

Apply changes to fleet devices with two-phase execution.

Phase 1: Stage all changes in parallel (fail fast) Phase 2: Coordinated commit if staging succeeded

Parameters:

Name Type Description Default
target Optional[str]

Device name or glob pattern

None
tags Optional[List[str]]

List of tags to filter by

None
max_workers int

Maximum parallel connections

5
remove_unmanaged bool

Remove settings not in config

False
commit_delay Optional[int]

Override default commit delay

None

Returns:

Type Description
tuple[FleetResult, FleetResult]

Tuple of (stage_result, commit_result)

Source code in src/wrtkit/fleet_executor.py
def apply(
    self,
    target: Optional[str] = None,
    tags: Optional[List[str]] = None,
    max_workers: int = 5,
    remove_unmanaged: bool = False,
    commit_delay: Optional[int] = None,
) -> tuple[FleetResult, FleetResult]:
    """
    Apply changes to fleet devices with two-phase execution.

    Phase 1: Stage all changes in parallel (fail fast)
    Phase 2: Coordinated commit if staging succeeded

    Args:
        target: Device name or glob pattern
        tags: List of tags to filter by
        max_workers: Maximum parallel connections
        remove_unmanaged: Remove settings not in config
        commit_delay: Override default commit delay

    Returns:
        Tuple of (stage_result, commit_result)
    """
    # Phase 1: Stage
    stage_result = self.stage(
        target=target,
        tags=tags,
        max_workers=max_workers,
        remove_unmanaged=remove_unmanaged,
    )

    # If staging failed or was aborted, don't proceed to commit
    if stage_result.aborted or not stage_result.all_successful:
        return stage_result, FleetResult(phase="commit", aborted=True)

    # Phase 2: Commit
    commit_result = self.commit(delay=commit_delay)

    return stage_result, commit_result

cleanup()

Clean up any open connections.

Source code in src/wrtkit/fleet_executor.py
def cleanup(self) -> None:
    """Clean up any open connections."""
    for conn in self._connections.values():
        try:
            conn.disconnect()
        except Exception:
            pass
    self._connections.clear()
    self._staged_devices.clear()