Switch from python to rust

This commit is contained in:
Fabian Stamm 2022-11-06 17:38:49 +01:00
parent 789bd294c3
commit c5960a55a9
18 changed files with 1525 additions and 173 deletions

1
.dockerignore Normal file
View File

@ -0,0 +1 @@
target/

3
.gitignore vendored
View File

@ -1,2 +1 @@
csi_pb2.py
csi_pb2_grpc.py
target/

View File

@ -1 +0,0 @@
python 3.10.5

1023
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

17
Cargo.toml Normal file
View File

@ -0,0 +1,17 @@
[package]
name = "gluster-dir-csi"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tonic = "0.8"
prost = "0.11"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = { version = "0.1", features = ["net"] }
hostname = "0.3.1"
prost-types = "0.11.2"
[build-dependencies]
tonic-build = "0.8"

View File

@ -1,12 +1,27 @@
FROM python:3.10-slim-bullseye
FROM docker.io/rust:1.65.0-slim-bullseye as builder
RUN apt-get update -yq && \
apt-get install -y --no-install-recommends glusterfs-client
apt install -y --no-install-recommends \
build-essential \
libssl-dev \
pkg-config \
protobuf-compiler \
libprotobuf-dev
WORKDIR /src
COPY . .
RUN cargo build --release
FROM docker.io/debian:bullseye-slim
RUN apt-get update -yq && \
apt-get install -y --no-install-recommends glusterfs-client htop
COPY --from=builder /src/target/release/gluster-dir-csi /usr/local/bin/gluster-dir-csi
ENTRYPOINT ["/usr/local/bin/gluster-dir-csi"]
COPY requirements.txt /
RUN pip install --upgrade pip
RUN pip install -r /requirements.txt
RUN pip3 install -r /requirements.txt
COPY *.py /
RUN mkdir /csi
CMD python3 -u /main.py

View File

@ -1,6 +0,0 @@
# Intro
This is a super alpha implementation that supports the CSI spec for directories on a gluster volume.
Each top level directory is treated as a volume.
Currently tested on hashicorp nomad.

Binary file not shown.

View File

@ -1,3 +0,0 @@
#!/usr/bin/env bash
docker build -t ahelberg/gluster-dir-csi .

View File

@ -1,3 +0,0 @@
#!/usr/bin/env bash
python -m grpc_tools.protoc -Icsi-spec/ --python_out=. --grpc_python_out=. csi-spec/csi.proto

View File

@ -1,3 +0,0 @@
#!/usr/bin/env bash
docker push ahelberg/gluster-dir-csi

View File

@ -1,2 +0,0 @@
#!/usr/bin/env bash
./build.sh && docker run --rm -ti --privileged -v "$PWD/docker-csi:/csi" ahelberg/gluster-dir-csi

4
build.rs Normal file
View File

@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("csi-spec/csi.proto")?;
Ok(())
}

@ -1 +1 @@
Subproject commit ad238e5cad5a27188fcdae1ff3ace3f612638ca5
Subproject commit 1de425b860085fa0e987d8ece6fe36ed47cdebb3

138
main.py
View File

@ -1,138 +0,0 @@
from concurrent import futures
import logging
import socket
import os
import grpc
import csi_pb2
import csi_pb2_grpc
import subprocess
gluster_host = os.environ["GLUSTER_HOST"]
gluster_volume = os.environ["GLUSTER_VOLUME"]
csi_sock_full_path = os.environ.get("CSI_PATH", "/csi/") + "csi.sock"
def unmount(path):
print("Unmounting", path)
return subprocess.run(["umount", path])
def mount(volume, path):
print("Mounting", volume, "at", path)
p = subprocess.run(["mount", "-t", "glusterfs", "%s:/%s/%s" %
(gluster_host, gluster_volume, volume), path])
if p.returncode == 0:
return True
# Might be a stale mount, unmount and try again
if p.returncode == 32:
return True
print("Mounting failed with return code:", p.returncode)
return p
class Identity(csi_pb2_grpc.Identity):
def GetPluginInfo(self, request, context):
return csi_pb2.GetPluginInfoResponse(name='gluster-dir-csi', vendor_version="1")
def Probe(self, request, context):
return csi_pb2.ProbeResponse()
def GetPluginCapabilities(self, request, context):
return csi_pb2.GetPluginCapabilitiesResponse(
capabilities=[
csi_pb2.PluginCapability(
service=csi_pb2.PluginCapability.Service(
type=csi_pb2.PluginCapability.Service.Type.CONTROLLER_SERVICE)
)
])
class Controller(csi_pb2_grpc.Controller):
def ControllerGetCapabilities(self, request, context):
return csi_pb2.ControllerGetCapabilitiesResponse(capabilities=[
csi_pb2.ControllerServiceCapability(rpc=csi_pb2.ControllerServiceCapability.RPC(
type=csi_pb2.ControllerServiceCapability.RPC.Type.CREATE_DELETE_VOLUME)),
])
def CreateVolume(self, request, context):
name = request.name
print("CreateVolume", name)
p = subprocess.run(["mkdir", "-p", "/mnt/main/%s" % name])
volume = csi_pb2.Volume(volume_id=name)
return csi_pb2.CreateVolumeResponse(volume=volume)
class Node(csi_pb2_grpc.Node):
def NodeGetInfo(self, request, context):
node_id = os.getenv("HOSTNAME")
return csi_pb2.NodeGetInfoResponse(node_id=node_id)
def NodeGetCapabilities(self, request, context):
return csi_pb2.NodeGetCapabilitiesResponse(capabilities=[
csi_pb2.NodeServiceCapability(rpc=csi_pb2.NodeServiceCapability.RPC(
type=csi_pb2.NodeServiceCapability.RPC.Type.STAGE_UNSTAGE_VOLUME))
])
def NodePublishVolume(self, request, context):
volume_id = request.volume_id
path = request.target_path
print("Node Publish Volume", path)
p = subprocess.run(["mkdir", "-p", path], check=True)
res = mount(volume_id, path)
if res is True:
return csi_pb2.NodePublishVolumeResponse()
print(res)
def NodeUnpublishVolume(self, request, context):
path = request.target_path
print("NodeUnpublishVolume", path)
p = subprocess.run(["umount", path])
return csi_pb2.NodeUnpublishVolumeResponse()
def NodeStageVolume(self, request, context):
volume_id = request.volume_id
path = request.staging_target_path
print("Node Stage Volume", path)
res = mount(volume_id, path)
if res is True:
return csi_pb2.NodeStageVolumeResponse()
print(res)
def NodeUnstageVolume(self, request, context):
path = request.staging_target_path
print("NodeUnstageVolume", path)
p = subprocess.run(["umount", path])
return csi_pb2.NodeUnstageVolumeResponse()
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
csi_pb2_grpc.add_IdentityServicer_to_server(Identity(), server)
csi_pb2_grpc.add_ControllerServicer_to_server(Controller(), server)
csi_pb2_grpc.add_NodeServicer_to_server(Node(), server)
print("About to start listening on", csi_sock_full_path)
server.add_insecure_port(f'unix://%s' % csi_sock_full_path)
server.start()
print("Waiting for termination")
server.wait_for_termination()
if __name__ == '__main__':
p = subprocess.run(["mkdir", "-p", "/mnt/main"])
p = subprocess.run(["mount", "-t", "glusterfs", "%s:/%s" %
(gluster_host, gluster_volume), "/mnt/main"], check=True)
logging.basicConfig()
serve()

View File

@ -1,5 +0,0 @@
grpcio==1.47.0
grpcio-tools==1.47.0
protobuf==3.20.1
googleapis-common-protos==1.53.0
six==1.16.0

454
src/main.rs Normal file
View File

@ -0,0 +1,454 @@
use std::io::Error;
use std::process::Command;
use tonic::{transport::Server, Request, Response, Status};
pub mod csi {
tonic::include_proto!("csi.v1");
}
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
use csi::controller_server::{Controller, ControllerServer};
use csi::identity_server::{Identity, IdentityServer};
use csi::node_server::{Node, NodeServer};
fn mount_volume(volume_id: String, target_path: String) -> Result<(), Status> {
std::fs::create_dir_all(&target_path).unwrap();
// Mount gluster volume to target path
let output = std::process::Command::new("mount")
.arg("-t")
.arg("glusterfs")
.arg(format!(
"{}:/{}/{}",
std::env::var("GLUSTER_HOST").unwrap(),
std::env::var("GLUSTER_VOLUME").unwrap(),
volume_id
))
.arg(&target_path)
.output()
.expect("failed to execute process");
// Check if mount was successful
// Code 0 means success
// Code 32 means already mounted
if output.status.code() != Some(0) && output.status.code() != Some(32) {
println!(
"Mount failed with code: {:?} {}",
output.status.code(),
String::from_utf8_lossy(&output.stderr)
);
return Err(Status::internal(format!(
"Mount failed with code: {:?}",
output.status.code(),
)));
}
return Ok(());
}
fn unmount_volume(target_path: String) -> Result<(), Status> {
// Unmount gluster volume
let output = std::process::Command::new("umount")
.arg("-l") //TODO: Think about this
.arg(target_path)
.output()
.expect("failed to execute process");
// Check if unmount was successful
// Code 0 means success
// Code 32 means already unmounted
if output.status.code() != Some(0) && output.status.code() != Some(32) {
println!(
"Unmount failed with code: {:?} {}",
output.status.code(),
String::from_utf8_lossy(&output.stderr)
);
return Err(Status::internal(format!(
"Unmount failed with code: {:?}",
output.status.code(),
)));
}
return Ok(());
}
#[derive(Debug, Default)]
struct GlusterNode {}
#[tonic::async_trait]
impl Node for GlusterNode {
async fn node_get_info(
&self,
request: Request<csi::NodeGetInfoRequest>,
) -> Result<Response<csi::NodeGetInfoResponse>, Status> {
println!("Got a request: {:?}", request);
// Get hostname of system
let hostname = hostname::get().unwrap().into_string().unwrap();
let reply = csi::NodeGetInfoResponse {
node_id: hostname,
max_volumes_per_node: 0,
accessible_topology: None,
};
Ok(Response::new(reply))
}
async fn node_get_capabilities(
&self,
request: Request<csi::NodeGetCapabilitiesRequest>,
) -> Result<Response<csi::NodeGetCapabilitiesResponse>, Status> {
println!("Got a request: {:?}", request);
let reply = csi::NodeGetCapabilitiesResponse {
capabilities: vec![csi::NodeServiceCapability {
r#type: Some(csi::node_service_capability::Type::Rpc(
csi::node_service_capability::Rpc {
r#type: csi::node_service_capability::rpc::Type::StageUnstageVolume as i32,
},
)),
}],
};
Ok(Response::new(reply))
}
async fn node_publish_volume(
&self,
request: Request<csi::NodePublishVolumeRequest>,
) -> Result<Response<csi::NodePublishVolumeResponse>, Status> {
println!("Got a request: {:?}", request);
let volume_id = request.get_ref().volume_id.clone();
let target_path = request.get_ref().target_path.clone();
// Mount gluster volume to target path
mount_volume(volume_id, target_path)?;
let reply = csi::NodePublishVolumeResponse {};
Ok(Response::new(reply))
}
async fn node_unpublish_volume(
&self,
request: Request<csi::NodeUnpublishVolumeRequest>,
) -> Result<Response<csi::NodeUnpublishVolumeResponse>, Status> {
println!("Got a request: {:?}", request);
let target_path = request.get_ref().target_path.clone();
// Unmount gluster volume
unmount_volume(target_path)?;
let reply = csi::NodeUnpublishVolumeResponse {};
Ok(Response::new(reply))
}
async fn node_stage_volume(
&self,
request: Request<csi::NodeStageVolumeRequest>,
) -> Result<Response<csi::NodeStageVolumeResponse>, Status> {
println!("Got a request: {:?}", request);
let volume_id = request.get_ref().volume_id.clone();
let target_path = request.get_ref().staging_target_path.clone();
// Mount gluster volume to target path
mount_volume(volume_id, target_path)?;
let reply = csi::NodeStageVolumeResponse {};
Ok(Response::new(reply))
}
async fn node_unstage_volume(
&self,
request: Request<csi::NodeUnstageVolumeRequest>,
) -> Result<Response<csi::NodeUnstageVolumeResponse>, Status> {
println!("Got a request: {:?}", request);
let target_path = request.get_ref().staging_target_path.clone();
// Unmount gluster volume
unmount_volume(target_path)?;
let reply = csi::NodeUnstageVolumeResponse {};
Ok(Response::new(reply))
}
async fn node_expand_volume(
&self,
request: Request<csi::NodeExpandVolumeRequest>,
) -> Result<Response<csi::NodeExpandVolumeResponse>, Status> {
println!("Got a request: {:?}", request);
Err(Status::unimplemented("Not implemented"))
}
async fn node_get_volume_stats(
&self,
request: Request<csi::NodeGetVolumeStatsRequest>,
) -> Result<Response<csi::NodeGetVolumeStatsResponse>, Status> {
println!("Got a request: {:?}", request);
Err(Status::unimplemented("Not implemented"))
}
}
#[derive(Debug, Default)]
struct GlusterController {}
#[tonic::async_trait]
impl Controller for GlusterController {
async fn controller_get_capabilities(
&self,
request: Request<csi::ControllerGetCapabilitiesRequest>,
) -> Result<Response<csi::ControllerGetCapabilitiesResponse>, Status> {
println!("Got a request: {:?}", request);
// Return CREATE_DELETE_VOLUME capability
let reply = csi::ControllerGetCapabilitiesResponse {
capabilities: vec![csi::ControllerServiceCapability {
r#type: Some(csi::controller_service_capability::Type::Rpc(
csi::controller_service_capability::Rpc {
r#type: csi::controller_service_capability::rpc::Type::CreateDeleteVolume
as i32,
},
)),
}],
};
Ok(Response::new(reply))
}
async fn create_volume(
&self,
request: Request<csi::CreateVolumeRequest>,
) -> Result<Response<csi::CreateVolumeResponse>, Status> {
println!("Got a request: {:?}", request);
let name = request.into_inner().name;
std::fs::create_dir_all(std::path::Path::new("/mnt/main/").join(&name))?;
let reply = csi::CreateVolumeResponse {
volume: Some(csi::Volume {
capacity_bytes: 0,
volume_id: name,
volume_context: Default::default(),
content_source: None,
accessible_topology: Default::default(),
}),
};
Ok(Response::new(reply))
}
async fn delete_volume(
&self,
request: Request<csi::DeleteVolumeRequest>,
) -> Result<Response<csi::DeleteVolumeResponse>, Status> {
println!("Got a request: {:?}", request);
// let name = request.into_inner().volume_id;
// std::fs::remove_dir_all(std::path::Path::new("/mnt/glusterfs").join(&name))?;
let reply = csi::DeleteVolumeResponse {};
Ok(Response::new(reply))
}
async fn controller_publish_volume(
&self,
request: Request<csi::ControllerPublishVolumeRequest>,
) -> Result<Response<csi::ControllerPublishVolumeResponse>, Status> {
println!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn controller_unpublish_volume(
&self,
request: Request<csi::ControllerUnpublishVolumeRequest>,
) -> Result<Response<csi::ControllerUnpublishVolumeResponse>, Status> {
println!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn validate_volume_capabilities(
&self,
request: Request<csi::ValidateVolumeCapabilitiesRequest>,
) -> Result<Response<csi::ValidateVolumeCapabilitiesResponse>, Status> {
println!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn list_volumes(
&self,
request: Request<csi::ListVolumesRequest>,
) -> Result<Response<csi::ListVolumesResponse>, Status> {
println!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn get_capacity(
&self,
request: Request<csi::GetCapacityRequest>,
) -> Result<Response<csi::GetCapacityResponse>, Status> {
println!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn controller_get_volume(
&self,
request: Request<csi::ControllerGetVolumeRequest>,
) -> Result<Response<csi::ControllerGetVolumeResponse>, Status> {
println!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn create_snapshot(
&self,
request: Request<csi::CreateSnapshotRequest>,
) -> Result<Response<csi::CreateSnapshotResponse>, Status> {
println!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn delete_snapshot(
&self,
request: Request<csi::DeleteSnapshotRequest>,
) -> Result<Response<csi::DeleteSnapshotResponse>, Status> {
println!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn list_snapshots(
&self,
request: Request<csi::ListSnapshotsRequest>,
) -> Result<Response<csi::ListSnapshotsResponse>, Status> {
println!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
async fn controller_expand_volume(
&self,
request: Request<csi::ControllerExpandVolumeRequest>,
) -> Result<Response<csi::ControllerExpandVolumeResponse>, Status> {
println!("Got a request: {:?}", request);
Err(Status::unimplemented("not implemented"))
}
}
#[derive(Debug, Default)]
struct GlusterIdentity {}
#[tonic::async_trait]
impl Identity for GlusterIdentity {
async fn get_plugin_info(
&self,
request: Request<csi::GetPluginInfoRequest>,
) -> Result<Response<csi::GetPluginInfoResponse>, Status> {
println!("Got a request: {:?}", request);
let reply = csi::GetPluginInfoResponse {
name: "glusterf-dir-csi".to_string(),
vendor_version: "0.1.0".to_string(),
..Default::default()
};
Ok(Response::new(reply))
}
async fn get_plugin_capabilities(
&self,
request: Request<csi::GetPluginCapabilitiesRequest>,
) -> Result<Response<csi::GetPluginCapabilitiesResponse>, Status> {
println!("Got a request: {:?}", request);
let reply = csi::GetPluginCapabilitiesResponse {
capabilities: vec![csi::PluginCapability {
r#type: Some(csi::plugin_capability::Type::Service(
csi::plugin_capability::Service {
r#type: csi::plugin_capability::service::Type::ControllerService as i32,
},
)),
}],
};
Ok(Response::new(reply))
}
async fn probe(
&self,
request: Request<csi::ProbeRequest>,
) -> Result<Response<csi::ProbeResponse>, Status> {
println!("Got a request: {:?}", request);
let reply = csi::ProbeResponse { ready: Some(true) };
Ok(Response::new(reply))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
std::fs::create_dir_all("/mnt/main")?;
// Mount glusterfs main
let res = Command::new("mount")
.arg("-t")
.arg("glusterfs")
.arg(format!(
"{}:{}/",
std::env::var("GLUSTER_HOST")?,
std::env::var("GLUSTER_VOLUME")?
))
.arg("/mnt/main")
.output()
.expect("failed to mount glusterfs");
if res.status.success() {
println!("Glusterfs mounted");
} else {
println!(
"Glusterfs mount failed: {}",
String::from_utf8_lossy(&res.stderr)
);
return Err("Glusterfs mount failed".into());
}
let identity = GlusterIdentity::default();
let controller = GlusterController::default();
let node = GlusterNode::default();
println!("Starting server...");
let csi_sock_base = std::env::var("CSI_PATH")
.or_else(|_| Ok::<String, Error>("/csi/".to_owned()))
.unwrap();
std::fs::create_dir_all(&csi_sock_base)?;
let csi_sock: String = csi_sock_base + "csi.sock";
let uds = UnixListener::bind(csi_sock)?;
let uds_stream = UnixListenerStream::new(uds);
Server::builder()
.add_service(IdentityServer::new(identity))
.add_service(ControllerServer::new(controller))
.add_service(NodeServer::new(node))
.serve_with_incoming(uds_stream)
.await?;
Ok(())
}