use log::{info, trace, warn}; use nanoid::nanoid; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::boxed::Box; use std::error::Error; use std::marker::Send; use std::{collections::HashMap, sync::Arc}; use tokio::sync::{mpsc::Sender, Mutex}; pub type Result = std::result::Result>; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct JRPCRequest { pub jsonrpc: String, pub id: Option, pub method: String, pub params: Value, } impl JRPCRequest { pub fn new_request(method: String, params: Value) -> JRPCRequest { JRPCRequest { jsonrpc: "2.0".to_string(), id: None, method, params, } } } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct JRPCError { pub code: i64, pub message: String, pub data: Value, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct JRPCResult { pub jsonrpc: String, pub id: String, pub result: Option, pub error: Option, } #[derive(Debug, Clone)] pub struct JRPCClient { message_sender: Sender, requests: Arc>>>, } impl JRPCClient { pub fn new(sender: Sender) -> JRPCClient { JRPCClient { message_sender: sender, requests: Arc::new(Mutex::new(HashMap::new())), } } pub async fn send_request(&self, mut request: JRPCRequest) -> Result { let (sender, mut receiver) = tokio::sync::mpsc::channel(1); if request.id.is_none() { request.id = Some(nanoid!()); } { let mut self_requests = self.requests.lock().await; self_requests.insert(request.id.clone().unwrap(), sender); } self.message_sender.send(request).await?; let result = receiver.recv().await; if let Some(result) = result { if let Some(error) = result.error { return Err(format!("Error while receiving result: {}", error.message).into()); } else if let Some(result) = result.result { return Ok(result); } else { return Err(format!("No result received").into()); } } else { return Err("Error while receiving result".into()); } } pub async fn send_notification(&self, mut request: JRPCRequest) { request.id = None; _ = self.message_sender.send(request).await; } pub async fn on_result(&self, result: JRPCResult) { let id = result.id.clone(); let mut self_requests = self.requests.lock().await; let sender = self_requests.get(&id); if let Some(sender) = sender { _ = sender.send(result).await; self_requests.remove(&id); } } } #[async_trait::async_trait] pub trait JRPCServerService: Send + Sync + 'static { fn get_id(&self) -> String; async fn handle(&self, request: &JRPCRequest, function: &str) -> Result<(bool, Value)>; } pub type JRPCServiceHandle = Arc; #[derive(Clone)] pub struct JRPCSession { server: JRPCServer, message_sender: Sender, } impl JRPCSession { pub fn new(server: JRPCServer, sender: Sender) -> JRPCSession { JRPCSession { server, message_sender: sender, } } async fn send_error(&self, request: JRPCRequest, error_msg: String, error_code: i64) -> () { if let Some(request_id) = request.id { let error = JRPCError { code: error_code, message: error_msg, data: Value::Null, }; let result = JRPCResult { jsonrpc: "2.0".to_string(), id: request_id, result: None, error: Some(error), }; // Send result let result = self.message_sender.send(result).await; if let Err(err) = result { warn!("Error while sending result: {}", err); } } } pub fn handle_request(&self, request: JRPCRequest) -> () { let session = self.clone(); tokio::task::spawn(async move { info!("Received request: {}", request.method); trace!("Request data: {:?}", request); let method: Vec<&str> = request.method.split('.').collect(); if method.len() != 2 { warn!("Invalid method received: {}", request.method); return; } let service = method[0]; let function = method[1]; let service = session.server.services.get(service); if let Some(service) = service { let result = service.handle(&request, function).await; match result { Ok((is_send, result)) => { if is_send && request.id.is_some() { let result = session .message_sender .send(JRPCResult { jsonrpc: "2.0".to_string(), id: request.id.unwrap(), result: Some(result), error: None, }) .await; if let Err(err) = result { warn!("Error while sending result: {}", err); } } } Err(err) => { warn!("Error while handling request: {}", err); session .send_error( request, format!("Error while handling request: {}", err), 1, ) .await; } } } else { warn!("Service not found: {}", method[0]); session .send_error(request, "Service not found".to_string(), 1) .await; return; } }); } } #[derive(Clone)] pub struct JRPCServer { services: HashMap, } impl JRPCServer { pub fn new() -> JRPCServer { JRPCServer { services: HashMap::new(), } } pub fn add_service(&mut self, service: JRPCServiceHandle) -> () { let id = service.get_id(); self.services.insert(id, service); } pub fn get_session(&self, sender: Sender) -> JRPCSession { JRPCSession::new(self.clone(), sender) } }