gluster-dir-csi/src/main.rs

478 lines
14 KiB
Rust

use std::io::Error;
use std::process::Command;
use tonic::{transport::Server, Request, Response, Status};
use log::{debug, error, info, trace, warn};
use std::time::SystemTime;
pub mod csi {
tonic::include_proto!("csi.v1");
}
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
use csi::controller_server::{Controller, ControllerServer};
use csi::identity_server::{Identity, IdentityServer};
use csi::node_server::{Node, NodeServer};
fn mount_volume(volume_id: String, target_path: String) -> Result<(), Status> {
std::fs::create_dir_all(&target_path).unwrap();
// Mount gluster volume to target path
let output = std::process::Command::new("mount")
.arg("-t")
.arg("glusterfs")
.arg(format!(
"{}:/{}/{}",
std::env::var("GLUSTER_HOST").unwrap(),
std::env::var("GLUSTER_VOLUME").unwrap(),
volume_id
))
.arg(&target_path)
.output()
.expect("failed to execute process");
// Check if mount was successful
// Code 0 means success
// Code 32 means already mounted
if output.status.code() != Some(0) && output.status.code() != Some(32) {
error!(
"Mount failed with code: {:?} {}",
output.status.code(),
String::from_utf8_lossy(&output.stderr)
);
return Err(Status::internal(format!(
"Mount failed with code: {:?}",
output.status.code(),
)));
}
return Ok(());
}
fn unmount_volume(target_path: String) -> Result<(), Status> {
// Unmount gluster volume
let output = std::process::Command::new("umount")
.arg("-l") //TODO: Think about this
.arg(target_path)
.output()
.expect("failed to execute process");
// Check if unmount was successful
// Code 0 means success
// Code 32 means already unmounted
if output.status.code() != Some(0) && output.status.code() != Some(32) {
error!(
"Unmount failed with code: {:?} {}",
output.status.code(),
String::from_utf8_lossy(&output.stderr)
);
return Err(Status::internal(format!(
"Unmount failed with code: {:?}",
output.status.code(),
)));
}
return Ok(());
}
#[derive(Debug, Default)]
struct GlusterNode {}
#[tonic::async_trait]
impl Node for GlusterNode {
async fn node_get_info(
&self,
request: Request<csi::NodeGetInfoRequest>,
) -> Result<Response<csi::NodeGetInfoResponse>, Status> {
debug!("Got a request: {:?}", request);
// Get hostname of system
let hostname = hostname::get().unwrap().into_string().unwrap();
let reply = csi::NodeGetInfoResponse {
node_id: hostname,
max_volumes_per_node: 0,
accessible_topology: None,
};
Ok(Response::new(reply))
}
async fn node_get_capabilities(
&self,
request: Request<csi::NodeGetCapabilitiesRequest>,
) -> Result<Response<csi::NodeGetCapabilitiesResponse>, Status> {
debug!("Got a request: {:?}", request);
let reply = csi::NodeGetCapabilitiesResponse {
capabilities: vec![csi::NodeServiceCapability {
r#type: Some(csi::node_service_capability::Type::Rpc(
csi::node_service_capability::Rpc {
r#type: csi::node_service_capability::rpc::Type::StageUnstageVolume as i32,
},
)),
}],
};
Ok(Response::new(reply))
}
async fn node_publish_volume(
&self,
request: Request<csi::NodePublishVolumeRequest>,
) -> Result<Response<csi::NodePublishVolumeResponse>, Status> {
debug!("Got a request: {:?}", request);
let volume_id = request.get_ref().volume_id.clone();
let target_path = request.get_ref().target_path.clone();
// Mount gluster volume to target path
mount_volume(volume_id, target_path)?;
let reply = csi::NodePublishVolumeResponse {};
Ok(Response::new(reply))
}
async fn node_unpublish_volume(
&self,
request: Request<csi::NodeUnpublishVolumeRequest>,
) -> Result<Response<csi::NodeUnpublishVolumeResponse>, Status> {
debug!("Got a request: {:?}", request);
let target_path = request.get_ref().target_path.clone();
// Unmount gluster volume
unmount_volume(target_path)?;
let reply = csi::NodeUnpublishVolumeResponse {};
Ok(Response::new(reply))
}
async fn node_stage_volume(
&self,
request: Request<csi::NodeStageVolumeRequest>,
) -> Result<Response<csi::NodeStageVolumeResponse>, Status> {
debug!("Got a request: {:?}", request);
let volume_id = request.get_ref().volume_id.clone();
let target_path = request.get_ref().staging_target_path.clone();
// Mount gluster volume to target path
mount_volume(volume_id, target_path)?;
let reply = csi::NodeStageVolumeResponse {};
Ok(Response::new(reply))
}
async fn node_unstage_volume(
&self,
request: Request<csi::NodeUnstageVolumeRequest>,
) -> Result<Response<csi::NodeUnstageVolumeResponse>, Status> {
debug!("Got a request: {:?}", request);
let target_path = request.get_ref().staging_target_path.clone();
// Unmount gluster volume
unmount_volume(target_path)?;
let reply = csi::NodeUnstageVolumeResponse {};
Ok(Response::new(reply))
}
async fn node_expand_volume(
&self,
request: Request<csi::NodeExpandVolumeRequest>,
) -> Result<Response<csi::NodeExpandVolumeResponse>, Status> {
debug!("Got a request: {:?}", request);
Err(Status::unimplemented("Not implemented"))
}
async fn node_get_volume_stats(
&self,
request: Request<csi::NodeGetVolumeStatsRequest>,
) -> Result<Response<csi::NodeGetVolumeStatsResponse>, Status> {
debug!("Got a request: {:?}", request);
Err(Status::unimplemented("Not implemented"))
}
}
#[derive(Debug, Default)]
struct GlusterController {}
#[tonic::async_trait]
impl Controller for GlusterController {
async fn controller_get_capabilities(
&self,
request: Request<csi::ControllerGetCapabilitiesRequest>,
) -> Result<Response<csi::ControllerGetCapabilitiesResponse>, Status> {
debug!("Got a request: {:?}", request);
// Return CREATE_DELETE_VOLUME capability
let reply = csi::ControllerGetCapabilitiesResponse {
capabilities: vec![csi::ControllerServiceCapability {
r#type: Some(csi::controller_service_capability::Type::Rpc(
csi::controller_service_capability::Rpc {
r#type: csi::controller_service_capability::rpc::Type::CreateDeleteVolume
as i32,
},
)),
}],
};
Ok(Response::new(reply))
}
async fn create_volume(
&self,
request: Request<csi::CreateVolumeRequest>,
) -> Result<Response<csi::CreateVolumeResponse>, Status> {
debug!("Got a request: {:?}", request);
let name = request.into_inner().name;
std::fs::create_dir_all(std::path::Path::new("/mnt/main/").join(&name))?;
let reply = csi::CreateVolumeResponse {
volume: Some(csi::Volume {
capacity_bytes: 0,
volume_id: name,
volume_context: Default::default(),
content_source: None,
accessible_topology: Default::default(),
}),
};
Ok(Response::new(reply))
}
async fn delete_volume(
&self,
request: Request<csi::DeleteVolumeRequest>,
) -> Result<Response<csi::DeleteVolumeResponse>, Status> {
debug!("Got a request: {:?}", request);
// let name = request.into_inner().volume_id;
// std::fs::remove_dir_all(std::path::Path::new("/mnt/glusterfs").join(&name))?;
let reply = csi::DeleteVolumeResponse {};
Ok(Response::new(reply))
}
async fn controller_publish_volume(
&self,
request: Request<csi::ControllerPublishVolumeRequest>,
) -> Result<Response<csi::ControllerPublishVolumeResponse>, Status> {
debug!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn controller_unpublish_volume(
&self,
request: Request<csi::ControllerUnpublishVolumeRequest>,
) -> Result<Response<csi::ControllerUnpublishVolumeResponse>, Status> {
debug!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn validate_volume_capabilities(
&self,
request: Request<csi::ValidateVolumeCapabilitiesRequest>,
) -> Result<Response<csi::ValidateVolumeCapabilitiesResponse>, Status> {
debug!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn list_volumes(
&self,
request: Request<csi::ListVolumesRequest>,
) -> Result<Response<csi::ListVolumesResponse>, Status> {
debug!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn get_capacity(
&self,
request: Request<csi::GetCapacityRequest>,
) -> Result<Response<csi::GetCapacityResponse>, Status> {
debug!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn controller_get_volume(
&self,
request: Request<csi::ControllerGetVolumeRequest>,
) -> Result<Response<csi::ControllerGetVolumeResponse>, Status> {
debug!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn create_snapshot(
&self,
request: Request<csi::CreateSnapshotRequest>,
) -> Result<Response<csi::CreateSnapshotResponse>, Status> {
debug!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn delete_snapshot(
&self,
request: Request<csi::DeleteSnapshotRequest>,
) -> Result<Response<csi::DeleteSnapshotResponse>, Status> {
debug!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn list_snapshots(
&self,
request: Request<csi::ListSnapshotsRequest>,
) -> Result<Response<csi::ListSnapshotsResponse>, Status> {
debug!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn controller_expand_volume(
&self,
request: Request<csi::ControllerExpandVolumeRequest>,
) -> Result<Response<csi::ControllerExpandVolumeResponse>, Status> {
debug!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
}
#[derive(Debug, Default)]
struct GlusterIdentity {}
#[tonic::async_trait]
impl Identity for GlusterIdentity {
async fn get_plugin_info(
&self,
request: Request<csi::GetPluginInfoRequest>,
) -> Result<Response<csi::GetPluginInfoResponse>, Status> {
debug!("Got a request: {:?}", request);
let reply = csi::GetPluginInfoResponse {
name: "glusterf-dir-csi".to_string(),
vendor_version: env!("CARGO_PKG_VERSION").to_string(),
..Default::default()
};
Ok(Response::new(reply))
}
async fn get_plugin_capabilities(
&self,
request: Request<csi::GetPluginCapabilitiesRequest>,
) -> Result<Response<csi::GetPluginCapabilitiesResponse>, Status> {
debug!("Got a request: {:?}", request);
let reply = csi::GetPluginCapabilitiesResponse {
capabilities: vec![csi::PluginCapability {
r#type: Some(csi::plugin_capability::Type::Service(
csi::plugin_capability::Service {
r#type: csi::plugin_capability::service::Type::ControllerService as i32,
},
)),
}],
};
Ok(Response::new(reply))
}
async fn probe(
&self,
request: Request<csi::ProbeRequest>,
) -> Result<Response<csi::ProbeResponse>, Status> {
debug!("Got a request: {:?}", request);
let reply = csi::ProbeResponse { ready: Some(true) };
Ok(Response::new(reply))
}
}
fn setup_logger() -> Result<(), fern::InitError> {
fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"[{} {} {}] {}",
humantime::format_rfc3339_seconds(SystemTime::now()),
record.level(),
record.target(),
message
))
})
.level(log::LevelFilter::Debug)
.chain(std::io::stdout())
.apply()?;
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
setup_logger()?;
std::fs::create_dir_all("/mnt/main")?;
// Mount glusterfs main
let res = Command::new("mount")
.arg("-t")
.arg("glusterfs")
.arg(format!(
"{}:{}/",
std::env::var("GLUSTER_HOST")?,
std::env::var("GLUSTER_VOLUME")?
))
.arg("/mnt/main")
.output()
.expect("failed to mount glusterfs");
if res.status.success() {
info!("Glusterfs mounted");
} else {
error!(
"Glusterfs mount failed: {}",
String::from_utf8_lossy(&res.stderr)
);
return Err("Glusterfs mount failed".into());
}
let identity = GlusterIdentity::default();
let controller = GlusterController::default();
let node = GlusterNode::default();
info!("Starting server...");
let csi_sock_base = std::env::var("CSI_PATH")
.or_else(|_| Ok::<String, Error>("/csi/".to_owned()))
.unwrap();
std::fs::create_dir_all(&csi_sock_base)?;
let csi_sock: String = csi_sock_base + "csi.sock";
info!("Listening on {},", csi_sock);
let uds = UnixListener::bind(csi_sock)?;
let uds_stream = UnixListenerStream::new(uds);
Server::builder()
.add_service(IdentityServer::new(identity))
.add_service(ControllerServer::new(controller))
.add_service(NodeServer::new(node))
.serve_with_incoming(uds_stream)
.await?;
Ok(())
}