From 75a0281a6b04772b818cb7a6a01216fdf523d206 Mon Sep 17 00:00:00 2001 From: Aris Adamantiadis Date: Sun, 26 Sep 2010 22:33:58 +0200 Subject: Fixed outgoing flow control + writes behaviours --- examples/senddata.c | 2 +- include/libssh/socket.h | 4 ++-- src/channels.c | 4 ++-- src/client.c | 3 --- src/connect.c | 2 +- src/packet.c | 3 --- src/session.c | 6 +++--- src/socket.c | 52 ++++++++++++++++++++++++++----------------------- 8 files changed, 37 insertions(+), 39 deletions(-) diff --git a/examples/senddata.c b/examples/senddata.c index 9de76387..d74081a5 100644 --- a/examples/senddata.c +++ b/examples/senddata.c @@ -9,7 +9,7 @@ int main(void) { char buffer[1024*1024]; int rc; - session = connect_ssh("172.16.104.134", NULL, 0); + session = connect_ssh("localhost", NULL, 0); if (session == NULL) { return 1; } diff --git a/include/libssh/socket.h b/include/libssh/socket.h index d9bab1c7..92fc1b09 100644 --- a/include/libssh/socket.h +++ b/include/libssh/socket.h @@ -48,8 +48,8 @@ void ssh_socket_fd_set(ssh_socket s, fd_set *set, int *fd_max); void ssh_socket_set_fd_in(ssh_socket s, socket_t fd); void ssh_socket_set_fd_out(ssh_socket s, socket_t fd); int ssh_socket_nonblocking_flush(ssh_socket s); -void ssh_socket_set_towrite(ssh_socket s); -void ssh_socket_set_toread(ssh_socket s); +void ssh_socket_set_write_wontblock(ssh_socket s); +void ssh_socket_set_read_wontblock(ssh_socket s); void ssh_socket_set_except(ssh_socket s); int ssh_socket_get_status(ssh_socket s); int ssh_socket_data_available(ssh_socket s); diff --git a/src/channels.c b/src/channels.c index 6900beea..90fee25e 100644 --- a/src/channels.c +++ b/src/channels.c @@ -2469,13 +2469,13 @@ int ssh_channel_select(ssh_channel *readchans, ssh_channel *writechans, for (i = 0; readchans[i] != NULL; i++) { if (ssh_socket_fd_isset(readchans[i]->session->socket, &rset)) { - ssh_socket_set_toread(readchans[i]->session->socket); + ssh_socket_set_read_wontblock(readchans[i]->session->socket); } } for (i = 0; writechans[i] != NULL; i++) { if (ssh_socket_fd_isset(writechans[i]->session->socket, &wset)) { - ssh_socket_set_towrite(writechans[i]->session->socket); + ssh_socket_set_write_wontblock(writechans[i]->session->socket); } } diff --git a/src/client.c b/src/client.c index fcdc87d1..0b55f355 100644 --- a/src/client.c +++ b/src/client.c @@ -216,9 +216,6 @@ int ssh_send_banner(ssh_session session, int server) { if (ssh_socket_write(session->socket, buffer, strlen(buffer)) == SSH_ERROR) { goto end; } - if (ssh_socket_nonblocking_flush(session->socket) == SSH_ERROR){ - goto end; - } #ifdef WITH_PCAP if(session->pcap_ctx) ssh_pcap_context_write(session->pcap_ctx,SSH_PCAP_DIR_OUT,buffer,strlen(buffer),strlen(buffer)); diff --git a/src/connect.c b/src/connect.c index d877982c..1dc9bf0b 100644 --- a/src/connect.c +++ b/src/connect.c @@ -565,7 +565,7 @@ int ssh_select(ssh_channel *channels, ssh_channel *outchannels, socket_t maxfd, for (i = 0; channels[i]; i++) { if (channels[i]->session->alive && ssh_socket_fd_isset(channels[i]->session->socket,&localset)) { - ssh_socket_set_toread(channels[i]->session->socket); + ssh_socket_set_read_wontblock(channels[i]->session->socket); } } diff --git a/src/packet.c b/src/packet.c index a97db93b..65f97248 100644 --- a/src/packet.c +++ b/src/packet.c @@ -433,9 +433,6 @@ static int ssh_packet_write(ssh_session session) { rc=ssh_socket_write(session->socket, ssh_buffer_get_begin(session->out_buffer), ssh_buffer_get_len(session->out_buffer)); - if(rc == SSH_OK){ - rc=ssh_socket_nonblocking_flush(session->socket); - } leave_function(); return rc; } diff --git a/src/session.c b/src/session.c index bce63afe..dbcb2809 100644 --- a/src/session.c +++ b/src/session.c @@ -308,7 +308,7 @@ void ssh_set_fd_toread(ssh_session session) { return; } - ssh_socket_set_toread(session->socket); + ssh_socket_set_read_wontblock(session->socket); } /** @@ -321,7 +321,7 @@ void ssh_set_fd_towrite(ssh_session session) { return; } - ssh_socket_set_towrite(session->socket); + ssh_socket_set_write_wontblock(session->socket); } /** @@ -362,7 +362,7 @@ int ssh_handle_packets(ssh_session session, int timeout) { enter_function(); spoll_in=ssh_socket_get_poll_handle_in(session->socket); spoll_out=ssh_socket_get_poll_handle_out(session->socket); - ssh_poll_set_events(spoll_in, POLLIN | POLLERR); + ssh_poll_add_events(spoll_in, POLLIN | POLLERR); ctx=ssh_poll_get_ctx(spoll_in); if(ctx==NULL){ ctx=ssh_get_global_poll_ctx(session); diff --git a/src/socket.c b/src/socket.c index 31756da0..e70abc64 100644 --- a/src/socket.c +++ b/src/socket.c @@ -81,9 +81,9 @@ struct ssh_socket_struct { socket_t fd_out; int fd_is_socket; int last_errno; - int data_to_read; /* reading now on socket will + int read_wontblock; /* reading now on socket will not block */ - int data_to_write; + int write_wontblock; int data_except; enum ssh_socket_states_e state; ssh_buffer out_buffer; @@ -152,8 +152,8 @@ ssh_socket ssh_socket_new(ssh_session session) { SAFE_FREE(s); return NULL; } - s->data_to_read = 0; - s->data_to_write = 0; + s->read_wontblock = 0; + s->write_wontblock = 0; s->data_except = 0; s->poll_in=s->poll_out=NULL; s->state=SSH_SOCKET_NONE; @@ -175,7 +175,7 @@ void ssh_socket_set_callbacks(ssh_socket s, ssh_socket_callbacks callbacks){ int ssh_socket_pollcallback(struct ssh_poll_handle_struct *p, socket_t fd, int revents, void *v_s){ ssh_socket s=(ssh_socket )v_s; char buffer[4096]; - int r,w; + int r; int err=0; socklen_t errlen=sizeof(err); /* Do not do anything if this socket was already closed */ @@ -199,7 +199,7 @@ int ssh_socket_pollcallback(struct ssh_poll_handle_struct *p, socket_t fd, int r revents |= POLLIN; } if(revents & POLLIN){ - s->data_to_read=1; + s->read_wontblock=1; r=ssh_socket_unbuffered_read(s,buffer,sizeof(buffer)); if(r<0){ err=-1; @@ -246,18 +246,16 @@ int ssh_socket_pollcallback(struct ssh_poll_handle_struct *p, socket_t fd, int r return 0; } /* So, we can write data */ - s->data_to_write=1; + s->write_wontblock=1; + ssh_poll_remove_events(p,POLLOUT); + /* If buffered data is pending, write it */ if(buffer_get_rest_len(s->out_buffer) > 0){ - w=ssh_socket_unbuffered_write(s, buffer_get_rest(s->out_buffer), - buffer_get_rest_len(s->out_buffer)); - if(w>0) - buffer_pass_bytes(s->out_buffer,w); + ssh_socket_nonblocking_flush(s); } else if(s->callbacks && s->callbacks->controlflow){ /* Otherwise advertise the upper level that write can be done */ s->callbacks->controlflow(SSH_SOCKET_FLOW_WRITEWONTBLOCK,s->callbacks->userdata); } - ssh_poll_remove_events(p,POLLOUT); /* TODO: Find a way to put back POLLOUT when buffering occurs */ } return err; @@ -432,7 +430,7 @@ static int ssh_socket_unbuffered_read(ssh_socket s, void *buffer, uint32_t len) #else s->last_errno = errno; #endif - s->data_to_read = 0; + s->read_wontblock = 0; if (rc < 0) { s->data_except = 1; @@ -460,7 +458,7 @@ static int ssh_socket_unbuffered_write(ssh_socket s, const void *buffer, #else s->last_errno = errno; #endif - s->data_to_write = 0; + s->write_wontblock = 0; /* Reactive the POLLOUT detector in the poll multiplexer system */ if(s->poll_out){ ssh_log(s->session, SSH_LOG_PACKET, "Enabling POLLOUT for socket"); @@ -519,7 +517,7 @@ int ssh_socket_write(ssh_socket s, const void *buffer, int len) { if (buffer_add_data(s->out_buffer, buffer, len) < 0) { return SSH_ERROR; } - ssh_socket_set_towrite(s); + ssh_socket_nonblocking_flush(s); } leave_function(); return SSH_OK; @@ -549,7 +547,13 @@ int ssh_socket_nonblocking_flush(ssh_socket s) { } len = buffer_get_rest_len(s->out_buffer); - if (s->data_to_write && len > 0) { + if (!s->write_wontblock && s->poll_out && len > 0) { + /* force the poll system to catch pollout events */ + ssh_poll_add_events(s->poll_out, POLLOUT); + leave_function(); + return SSH_AGAIN; + } + if (s->write_wontblock && len > 0) { w = ssh_socket_unbuffered_write(s, buffer_get_rest(s->out_buffer), len); if (w < 0) { session->alive = 0; @@ -569,7 +573,7 @@ int ssh_socket_nonblocking_flush(ssh_socket s) { len = buffer_get_rest_len(s->out_buffer); if (s->poll_out && len > 0) { /* force the poll system to catch pollout events */ - ssh_poll_set_events(s->poll_out, ssh_poll_get_events(s->poll_out) | POLLOUT); + ssh_poll_add_events(s->poll_out, POLLOUT); leave_function(); return SSH_AGAIN; } @@ -579,12 +583,12 @@ int ssh_socket_nonblocking_flush(ssh_socket s) { return SSH_OK; } -void ssh_socket_set_towrite(ssh_socket s) { - s->data_to_write = 1; +void ssh_socket_set_write_wontblock(ssh_socket s) { + s->write_wontblock = 1; } -void ssh_socket_set_toread(ssh_socket s) { - s->data_to_read = 1; +void ssh_socket_set_read_wontblock(ssh_socket s) { + s->read_wontblock = 1; } void ssh_socket_set_except(ssh_socket s) { @@ -592,17 +596,17 @@ void ssh_socket_set_except(ssh_socket s) { } int ssh_socket_data_available(ssh_socket s) { - return s->data_to_read; + return s->read_wontblock; } int ssh_socket_data_writable(ssh_socket s) { - return s->data_to_write; + return s->write_wontblock; } int ssh_socket_get_status(ssh_socket s) { int r = 0; - if (s->data_to_read) { + if (s->read_wontblock) { r |= SSH_READ_PENDING; } -- cgit v1.2.3