Browse Source

Clean shutdown on /quit

tags/v0.5.0-beta.1
Danilo Bargen 1 year ago
parent
commit
35be904c63
5 changed files with 116 additions and 60 deletions
  1. 1
    0
      .gitignore
  2. 1
    3
      Cargo.toml
  3. 2
    12
      README.md
  4. 89
    35
      examples/chat/main.rs
  5. 23
    10
      src/lib.rs

+ 1
- 0
.gitignore View File

@@ -3,6 +3,7 @@
**/*.rs.bk
Cargo.lock
.env
*.log

# Certificates
*.pem

+ 1
- 3
Cargo.toml View File

@@ -22,11 +22,9 @@ tokio-core = "0.1.9"
websocket = "0.20.2"

[dev-dependencies]
chrono = "0.4"
clap = "2.27"
cursive = "0.7"
dotenv = "0.10"
env_logger = "0.5.0-rc.1"
log4rs = "0.8"

[features]
default = []

+ 2
- 12
README.md View File

@@ -74,22 +74,12 @@ to connect to the initiator with a responder.
To see all options, use `cargo run --example chat -- initiator --help` and
`cargo run --example chat -- responder --help`.

The chat example will log to a file called `chat.log`.

**Note:** The example chat currently expects a [SaltyRTC
Server](https://github.com/saltyrtc/saltyrtc-server-python/) instance to run on
`localhost:6699`.

## Logging

The examples use [`env_logger`](https://doc.rust-lang.org/log/env_logger/index.html).
To see the logs, export an env variable:

export RUST_LOG=saltyrtc_client=TRACE

The examples initialize the [`dotenv`](https://crates.io/crates/dotenv) crate,
so you can also store this setting in an `.env` file:

echo "RUST_LOG=saltyrtc_client=DEBUG" >> .env


## Msgpack Debugging


+ 89
- 35
examples/chat/main.rs View File

@@ -1,24 +1,21 @@
//! Connect to a server as initiator and print the connection info.

extern crate chrono;
extern crate clap;
extern crate cursive;
extern crate data_encoding;
extern crate dotenv;
extern crate env_logger;
#[macro_use] extern crate failure;
extern crate futures;
#[macro_use] extern crate log;
extern crate native_tls;
extern crate saltyrtc_client;
extern crate log4rs;
extern crate tokio_core;

mod chat_task;

use std::cell::RefCell;
use std::env;
use std::fs::File;
use std::io::{Read, Write};
use std::io::Read;
use std::path::Path;
use std::process;
use std::rc::Rc;
@@ -26,18 +23,23 @@ use std::sync::mpsc;
use std::thread;
use std::time::Duration;

use chrono::Local;
use clap::{Arg, App, SubCommand};
use cursive::{Cursive};
use cursive::traits::{Identifiable};
use cursive::views::{TextView, EditView, BoxView, LinearLayout};
use data_encoding::{HEXLOWER};
use env_logger::{Builder};
use futures::{Sink, Stream, future};
use futures::future::{Future};
use futures::sync::mpsc::{channel};
use futures::future::Future;
use futures::sync::mpsc::channel;
use log::LevelFilter;
use log4rs::Handle;
use log4rs::append::console::ConsoleAppender;
use log4rs::append::file::FileAppender;
use log4rs::encode::pattern::PatternEncoder;
use log4rs::config::{Appender, Config, Logger, Root};
use log4rs::filter::threshold::ThresholdFilter;
use native_tls::{TlsConnector, Certificate, Protocol};
use saltyrtc_client::{SaltyClientBuilder, Role, WsClient, Task, BoxedFuture};
use saltyrtc_client::{SaltyClientBuilder, Role, WsClient, Task, BoxedFuture, CloseCode};
use saltyrtc_client::crypto::{KeyPair, AuthToken, public_key_from_hex_str};
use saltyrtc_client::errors::{SaltyError};
use tokio_core::reactor::{Core};
@@ -60,19 +62,6 @@ macro_rules! boxed {


fn main() {
dotenv::dotenv().ok();
Builder::new()
.format(|buf, record| {
writeln!(buf, "{} [{:<5}] {} ({}:{})",
Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"),
record.level(),
record.args(),
record.file().unwrap_or("?"),
record.line().map(|num| num.to_string()).unwrap_or("?".to_string()))
})
.parse(&env::var("RUST_LOG").unwrap_or_default())
.init();

const ARG_PATH: &'static str = "path";
const ARG_AUTHTOKEN: &'static str = "authtoken";
const ARG_PING_INTERVAL: &'static str = "ping_interval";
@@ -83,7 +72,7 @@ fn main() {
.takes_value(true)
.value_name("SECONDS")
.required(false)
.default_value("30")
.default_value("60")
.help("The WebSocket ping interval (set to 0 to disable pings)");
let app = App::new("SaltyRTC Test Client")
.version(VERSION)
@@ -128,6 +117,9 @@ fn main() {
},
};

// Set up logging
setup_logging(role);

// Tokio reactor core
let mut core = Core::new().unwrap();

@@ -252,13 +244,13 @@ fn main() {
let (cb_sink_tx, cb_sink_rx) = mpsc::sync_channel(1);
let (chat_msg_tx, chat_msg_rx) = channel::<String>(TUI_CHANNEL_BUFFER_SIZE);
let remote = core.remote();
thread::spawn(move || {
let tui_thread = thread::spawn(move || {
// Launch TUI
let mut tui = Cursive::new();
tui.set_fps(10);

// Create text view (for displaying messages)
let text_view = TextView::new("=== Welcome to SaltyChat! ===\nPress Ctrl+C to exit.\nType /help to list available commands.\n\n")
let text_view = TextView::new("=== Welcome to SaltyChat! ===\nType /quit to exit.\nType /help to list available commands.\n\n")
.scrollable(true)
.with_id(VIEW_TEXT_ID);

@@ -347,6 +339,9 @@ fn main() {
tui.quit();
})).unwrap();

// Disconnect
chat_task.close(CloseCode::WsGoingAway);

boxed!(future::err(Ok(())))
}
"/nick" => {
@@ -378,13 +373,19 @@ fn main() {
}
})
.or_else(|res| match res {
Ok(_) => future::ok(()),
Ok(_) => future::ok(debug!("† Send loop future done")),
Err(_) => future::err(SaltyError::Crash("Something went wrong when forwarding messages to task".into()))
});

// Chat message receive loop
// TODO: Sanitize incoming messages
//
// The closure passed to `for_each` must return:
//
// * `future::ok(())` to continue listening for incoming messages
// * `future::err(Ok(()))` to stop the loop without an error
// * `future::err(Err(_))` to stop the loop with an error
let receive_loop = incoming_rx
.map_err(|_| Err(()))
.for_each({
|msg: ChatMessage| {
match msg {
@@ -394,26 +395,79 @@ fn main() {
.ok()
.and_then(|p| p.clone())
.unwrap_or("?".to_string());
log_line!("{}> {}", pn, text)
log_line!("{}> {}", pn, text);
future::ok(())
},
ChatMessage::NickChange(new_nick) => {
log_line!("*** Partner nick changed to {}", new_nick)
log_line!("*** Partner nick changed to {}", new_nick);
future::ok(())
},
ChatMessage::Disconnect(reason) => {
log_line!("*** Connection closed, reason: {}", reason)
log_line!("*** Connection closed, reason: {}", reason);
future::err(Ok(()))
}
};
future::ok(())
}
}
})
.map_err(|_| SaltyError::Crash("Something went wrong in message receive loop".into()));
.or_else(|res| match res {
Ok(_) => future::ok(debug!("† Receive loop future done")),
Err(_) => future::err(SaltyError::Crash("Something went wrong in message receive loop".into())),
});

// Main future
let main_loop = task_loop
.join(
send_loop
.select(receive_loop)
.map_err(|(e, ..)| e)
);

// Run future in reactor
match core.run(task_loop.join(send_loop).join(receive_loop)) {
match core.run(main_loop) {
Ok(_) => println!("Success."),
Err(e) => {
println!("{}", e);
process::exit(1);
},
};

// Wait for TUI thread to exit
tui_thread.join().unwrap();

info!("Goodbye!");
}

fn setup_logging(role: Role) -> Handle {
// Log format
let format = "{d(%Y-%m-%dT%H:%M:%S%.3f)} [{l:<5}] {m} (({f}:{L})){n}";

// Instantiate appenders
let stdout = ConsoleAppender::builder()
.encoder(Box::new(PatternEncoder::new(format)))
.build();
let file = FileAppender::builder()
.encoder(Box::new(PatternEncoder::new(format)))
.build(match role {
Role::Initiator => "chat.initiator.log",
Role::Responder => "chat.responder.log",
})
.unwrap();

// Instantiate filters
let info_filter = ThresholdFilter::new(LevelFilter::Info);

let config = Config::builder()
// Appenders
.appender(Appender::builder().filter(Box::new(info_filter)).build("stdout", Box::new(stdout)))
.appender(Appender::builder().build("file", Box::new(file)))

// Loggers
.logger(Logger::builder().build("saltyrtc_client", LevelFilter::Debug))
.logger(Logger::builder().build("chat", LevelFilter::Debug))

// Root logger
.build(Root::builder().appender("stdout").appender("file").build(LevelFilter::Info))
.unwrap();

log4rs::init_config(config).unwrap()
}

+ 23
- 10
src/lib.rs View File

@@ -379,7 +379,7 @@ pub fn connect(
fn decode_ws_message(msg: OwnedMessage) -> SaltyResult<WsMessageDecoded> {
let decoded = match msg {
OwnedMessage::Binary(bytes) => {
debug!("Incoming binary message ({} bytes)", bytes.len());
debug!("--> Incoming binary message ({} bytes)", bytes.len());

// Parse into ByteBox
let bbox = ByteBox::from_slice(&bytes)
@@ -389,10 +389,11 @@ fn decode_ws_message(msg: OwnedMessage) -> SaltyResult<WsMessageDecoded> {
WsMessageDecoded::ByteBox(bbox)
},
OwnedMessage::Ping(payload) => {
debug!("Incoming ping message");
debug!("--> Incoming ping message");
WsMessageDecoded::Ping(payload)
},
OwnedMessage::Close(close_data) => {
debug!("--> Incoming close message");
match close_data {
Some(data) => {
let close_code = CloseCode::from_number(data.status_code);
@@ -568,7 +569,7 @@ pub fn do_handshake(
pub fn task_loop(
client: WsClient,
salty: Rc<RefCell<SaltyClient>>,
) -> Result<(Arc<Mutex<BoxedTask>>, BoxedFuture<(((), ()), ()), SaltyError>), SaltyError> {
) -> Result<(Arc<Mutex<BoxedTask>>, BoxedFuture<(), SaltyError>), SaltyError> {
let task_name = salty
.deref()
.try_borrow()
@@ -704,7 +705,7 @@ pub fn task_loop(
let pong = OwnedMessage::Pong(payload);
let future = raw_outgoing_tx
.send(pong)
.map(|_| debug!("Enqueued pong message"))
.map(|_| debug!("<-- Enqueuing pong message"))
.map_err(|e| Err(SaltyError::Network(format!("Could not enqueue pong message: {}", e))));
boxed!(future)
},
@@ -729,7 +730,7 @@ pub fn task_loop(
}));
raw_outgoing_tx
.send(close)
.map(|_| debug!("Sent close message"))
.map(|_| debug!("<-- Enqueuing close message to peer"))
.or_else(|e| {
warn!("Could not enqueue close message: {}", e);
future::ok(())
@@ -742,7 +743,7 @@ pub fn task_loop(
})
)

.map(|_| ())
.map(|_| debug!("† Reader future done"))
.map_err(|(e, _next)| e);

// Transform future that sends values from the outgoing channel to the raw outgoing channel
@@ -756,7 +757,10 @@ pub fn task_loop(
// TODO: Can we do something about the errors here?
match salty.deref().try_borrow_mut() {
Ok(mut s) => match s.encrypt_task_message(val) {
Ok(bytes) => future::ok(OwnedMessage::Binary(bytes)),
Ok(bytes) => {
debug!("<-- Enqueuing task message to peer");
future::ok(OwnedMessage::Binary(bytes))
},
Err(_) => future::err(())
},
Err(_) => future::err(()),
@@ -768,7 +772,7 @@ pub fn task_loop(
.forward(raw_outgoing_tx.sink_map_err(|_| ()))

// Ignore stream/sink
.map(|(_, _)| ())
.map(|(_, _)| debug!("† Transformer future done"))

// Map error types
.map_err(|_| SaltyError::Crash("TODO: read error".into()));
@@ -786,10 +790,19 @@ pub fn task_loop(
)

// Ignore sink
.map(|_| ());
.map(|_| debug!("† Writer future done"));

// The task loop is finished when all futures are resolved.
let task_loop = boxed!(reader.join(transformer).join(writer));
let task_loop = boxed!(
future::ok(info!("Starting task loop future"))
.and_then(|_| future::select_all(vec![
boxed!(reader),
boxed!(transformer),
boxed!(writer),
]))
.and_then(|_| future::ok(info!("† Task loop future done")))
.map_err(|(e, _, _)| e)
);

// Get reference to task
let task = match salty.try_borrow_mut() {

Loading…
Cancel
Save