scl_lib/controller_utils/
mod.rs1pub mod error;
3
4use log::{debug, info, warn};
5
6#[cfg(any(test, feature = "test-utils"))]
7use std::process::Command;
8use std::time::Duration;
9
10use tokio::time;
11
12#[cfg(any(test, feature = "test-utils"))]
13use crate::api_objects::{MetaData, SeparationContext, VlanTag};
14
15use crate::api_objects::{Controller, FinalizerId, SclEvent, SclObject};
16use crate::client::{async_trait, SclClient, SclRequest, StatusCode};
17pub use crate::controller_utils::error::ControllerBasicError;
18
19#[async_trait]
23pub trait SclObjectController<T: SclObject + Send> {
24 fn client(&self) -> &SclClient;
25
26 async fn synchronize_all(&self) -> Result<(), ControllerBasicError>;
28
29 async fn run(&self, watch_event_timeout_ms: u64) -> Result<(), ControllerBasicError> {
32 let url = format!(
33 "{}/watch{}",
34 self.client().api_url,
35 T::api_endpoint(None, None)
36 );
37 let client = self.client();
38 let mut event_stream = client.get(url).send().await?.error_for_status()?;
39 let timeout_duration = Duration::from_millis(watch_event_timeout_ms);
40
41 loop {
42 match tokio::time::timeout(
43 timeout_duration,
44 SclClient::recv_event::<T>(&mut event_stream),
45 )
46 .await
47 {
48 Ok(event) => {
50 use SclEvent::*;
51 debug!("An event was received within the timeout, perform event handling.");
52 match event {
53 Ok(Created(_) | Deleted(_) | Updated(_)) => {
54 debug!("The event is of the type `Created`, `Deleted` or `Updated`, perform a full synchronisation.");
55 self.synchronize_all().await?
56 }
57 Ok(Info(msg)) => info!("SclEvent::Info {:?}", msg),
58 Err(e) => warn!("Failed to read an event from the stream: {}", e),
59 };
60 }
61 Err(_) => {
63 debug!(
64 "No event was received within the timeout, perform a full synchronization."
65 );
66 self.synchronize_all().await?
67 }
68 }
69 }
70 }
71}
72
73pub async fn heartbeat(client: &SclClient, ctrl: &Controller) -> Result<(), ControllerBasicError> {
76 let get_req = SclRequest::<Controller>::new().obj_name(ctrl.metadata.name());
77 loop {
78 match client.show(&get_req).await {
80 Ok(mut ctrl) => {
82 ctrl.update_last_interaction();
83 client.update(&get_req.clone().body(&ctrl)).await?;
84 }
85 Err(err) if err.status() == Some(StatusCode::NOT_FOUND) => {
87 let post_req = SclRequest::<Controller>::new().body(ctrl);
88 client.create(&post_req).await?;
89 }
90 _ => (),
92 };
93 time::sleep(time::Duration::from_millis(5000)).await;
94 }
95}
96
97pub async fn remove_finalizer<T: SclObject>(
99 client: &SclClient,
100 mut t: T,
101 finalizer: FinalizerId,
102) -> Result<(), ControllerBasicError> {
103 let dm = match t.metadata_mut().deletion_mark.as_mut() {
104 Some(dm) => dm,
105 None => return Err(ControllerBasicError::Application), };
107
108 if let Some(f) = dm.finalizers.take_next_finalizer() {
109 if f == finalizer {
110 let mut req = SclRequest::new().obj_name(t.name().as_str()).body(&t);
111 req.sc_name = t.separation_context().map(|s| s.to_string());
112 return client
113 .update(&req)
114 .await
115 .map_err(ControllerBasicError::SclClient);
116 }
117 }
118 Err(ControllerBasicError::Application) }
120
121#[cfg(any(test, feature = "test-utils"))]
136pub fn expect_cmd_string(cmd: &Command, expected: &str) {
137 let cmd = format!("{:?}", &cmd).replace('\"', "");
138 assert_eq!(&cmd, expected);
139}
140
141#[cfg(any(test, feature = "test-utils"))]
142pub fn new_sc_with_vlan_tag(vlan_tag: u16) -> SeparationContext {
143 use crate::api_objects::SclName;
144
145 SeparationContext {
146 vlan_tag: VlanTag::try_from(vlan_tag).unwrap(),
147 metadata: MetaData::new(SclName::try_from("something-something").unwrap()),
148 }
149}
150
151#[cfg(test)]
152mod test {
153 use super::*;
154
155 use crate::api_objects::{DeletionMark, Finalizers};
156 use crate::client::reqwest_client;
157
158 #[tokio::test]
159 async fn test_remove_finalizer() {
160 use httpmock::prelude::{MockServer, PUT};
161 let mut sc_input = new_sc_with_vlan_tag(1337);
162 sc_input.metadata.deletion_mark = Some(DeletionMark {
163 finalizers: Finalizers::from(vec![FinalizerId::CleanUpNetworkInfrastructure]),
164 });
165
166 let server = MockServer::start();
167 let mut expected_sc_update = sc_input.clone();
168 expected_sc_update.metadata.deletion_mark = Some(DeletionMark {
169 finalizers: Finalizers::from(vec![]),
170 });
171
172 let mock_endpoint = server.mock(|when, then| {
173 when.method(PUT)
174 .path(sc_input.get_api_endpoint())
175 .header("content-type", "application/json")
176 .json_body_obj(&expected_sc_update);
177 then.status(200)
178 .header("content-type", "application/json")
179 .body("Kek");
180 });
181
182 let client = SclClient::new(server.base_url(), reqwest_client(None, None).unwrap());
183
184 assert!(
185 remove_finalizer(&client, sc_input, FinalizerId::CleanUpNetworkInfrastructure)
186 .await
187 .is_ok()
188 );
189 mock_endpoint.assert();
190 }
191}