scl_lib/controller_utils/
mod.rs

1// SPDX-License-Identifier: EUPL-1.2
2pub mod error;
3
4use log::{debug, info, warn};
5
6#[cfg(any(test, feature = "test-utils"))]
7use std::process::Command;
8use std::time::Duration;
9
10use tokio::time;
11
12#[cfg(any(test, feature = "test-utils"))]
13use crate::api_objects::{MetaData, SeparationContext, VlanTag};
14
15use crate::api_objects::{Controller, FinalizerId, SclEvent, SclObject};
16use crate::client::{async_trait, SclClient, SclRequest, StatusCode};
17pub use crate::controller_utils::error::ControllerBasicError;
18
19/// Provides a single threaded [`SclObjectController::run`] that performs a full
20/// synchronization of all `T` objects if any [`SclEvent`] occurs or a timeout
21/// passes.
22#[async_trait]
23pub trait SclObjectController<T: SclObject + Send> {
24    fn client(&self) -> &SclClient;
25
26    /// Fetches all `T`s and processes them one by one according to the controller's logic.
27    async fn synchronize_all(&self) -> Result<(), ControllerBasicError>;
28
29    /// Performs full synchronization on all related `T` objects if any watch event is
30    /// received or a periodic timer is triggered.
31    async fn run(&self, watch_event_timeout_ms: u64) -> Result<(), ControllerBasicError> {
32        let url = format!(
33            "{}/watch{}",
34            self.client().api_url,
35            T::api_endpoint(None, None)
36        );
37        let client = self.client();
38        let mut event_stream = client.get(url).send().await?.error_for_status()?;
39        let timeout_duration = Duration::from_millis(watch_event_timeout_ms);
40
41        loop {
42            match tokio::time::timeout(
43                timeout_duration,
44                SclClient::recv_event::<T>(&mut event_stream),
45            )
46            .await
47            {
48                // An event was received within the timeout, perform event handling.
49                Ok(event) => {
50                    use SclEvent::*;
51                    debug!("An event was received within the timeout, perform event handling.");
52                    match event {
53                        Ok(Created(_) | Deleted(_) | Updated(_)) => {
54                            debug!("The event is of the type `Created`, `Deleted` or `Updated`, perform a full synchronisation.");
55                            self.synchronize_all().await?
56                        }
57                        Ok(Info(msg)) => info!("SclEvent::Info {:?}", msg),
58                        Err(e) => warn!("Failed to read an event from the stream: {}", e),
59                    };
60                }
61                // No event was received within the timeout, perform a full synchronization.
62                Err(_) => {
63                    debug!(
64                        "No event was received within the timeout, perform a full synchronization."
65                    );
66                    self.synchronize_all().await?
67                }
68            }
69        }
70    }
71}
72
73/// Updates the `ctrl` object periodically with a fresh timestamp. Creates the `ctrl` object if
74/// necessary.
75pub async fn heartbeat(client: &SclClient, ctrl: &Controller) -> Result<(), ControllerBasicError> {
76    let get_req = SclRequest::<Controller>::new().obj_name(ctrl.metadata.name());
77    loop {
78        // Try to get controller object.
79        match client.show(&get_req).await {
80            // Controller was found. Update last_interaction field.
81            Ok(mut ctrl) => {
82                ctrl.update_last_interaction();
83                client.update(&get_req.clone().body(&ctrl)).await?;
84            }
85            // Controller was not found. Create it.
86            Err(err) if err.status() == Some(StatusCode::NOT_FOUND) => {
87                let post_req = SclRequest::<Controller>::new().body(ctrl);
88                client.create(&post_req).await?;
89            }
90            // Some other Error occurred.
91            _ => (),
92        };
93        time::sleep(time::Duration::from_millis(5000)).await;
94    }
95}
96
97/// Attempts to clear the provided finalizer via an update request to the SCL API.
98pub async fn remove_finalizer<T: SclObject>(
99    client: &SclClient,
100    mut t: T,
101    finalizer: FinalizerId,
102) -> Result<(), ControllerBasicError> {
103    let dm = match t.metadata_mut().deletion_mark.as_mut() {
104        Some(dm) => dm,
105        None => return Err(ControllerBasicError::Application), // Derived totally wrong action somewhere.
106    };
107
108    if let Some(f) = dm.finalizers.take_next_finalizer() {
109        if f == finalizer {
110            let mut req = SclRequest::new().obj_name(t.name().as_str()).body(&t);
111            req.sc_name = t.separation_context().map(|s| s.to_string());
112            return client
113                .update(&req)
114                .await
115                .map_err(ControllerBasicError::SclClient);
116        }
117    }
118    Err(ControllerBasicError::Application) // Derived totally wrong action somewhere.
119}
120
121/// Asserts that the complete [Command] string matches the `expected` string.
122///
123/// # Example
124///
125/// ```
126/// use std::process::Command;
127/// use scl_lib::controller_utils::expect_cmd_string;
128///
129/// let mut cmd = Command::new("echo");
130/// cmd.arg("hello")
131///    .arg("world");
132///
133/// expect_cmd_string(&cmd, "echo hello world");
134/// ```
135#[cfg(any(test, feature = "test-utils"))]
136pub fn expect_cmd_string(cmd: &Command, expected: &str) {
137    let cmd = format!("{:?}", &cmd).replace('\"', "");
138    assert_eq!(&cmd, expected);
139}
140
141#[cfg(any(test, feature = "test-utils"))]
142pub fn new_sc_with_vlan_tag(vlan_tag: u16) -> SeparationContext {
143    use crate::api_objects::SclName;
144
145    SeparationContext {
146        vlan_tag: VlanTag::try_from(vlan_tag).unwrap(),
147        metadata: MetaData::new(SclName::try_from("something-something").unwrap()),
148    }
149}
150
151#[cfg(test)]
152mod test {
153    use super::*;
154
155    use crate::api_objects::{DeletionMark, Finalizers};
156    use crate::client::reqwest_client;
157
158    #[tokio::test]
159    async fn test_remove_finalizer() {
160        use httpmock::prelude::{MockServer, PUT};
161        let mut sc_input = new_sc_with_vlan_tag(1337);
162        sc_input.metadata.deletion_mark = Some(DeletionMark {
163            finalizers: Finalizers::from(vec![FinalizerId::CleanUpNetworkInfrastructure]),
164        });
165
166        let server = MockServer::start();
167        let mut expected_sc_update = sc_input.clone();
168        expected_sc_update.metadata.deletion_mark = Some(DeletionMark {
169            finalizers: Finalizers::from(vec![]),
170        });
171
172        let mock_endpoint = server.mock(|when, then| {
173            when.method(PUT)
174                .path(sc_input.get_api_endpoint())
175                .header("content-type", "application/json")
176                .json_body_obj(&expected_sc_update);
177            then.status(200)
178                .header("content-type", "application/json")
179                .body("Kek");
180        });
181
182        let client = SclClient::new(server.base_url(), reqwest_client(None, None).unwrap());
183
184        assert!(
185            remove_finalizer(&client, sc_input, FinalizerId::CleanUpNetworkInfrastructure)
186                .await
187                .is_ok()
188        );
189        mock_endpoint.assert();
190    }
191}