Changeset 594
- Timestamp:
- 02/23/05 18:23:54 (4 years ago)
- Files:
-
- trunk/src/stream.c (modified) (4 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/src/stream.c
r535 r594 43 43 int stream_io_connect(GIOChannel *source,GIOCondition condition,gpointer data); 44 44 int stream_io_read(GIOChannel *source,GIOCondition condition,gpointer data); 45 int stream_io_write(GIOChannel *source,GIOCondition condition,gpointer data);46 45 int stream_write_hello(Stream *s); 47 46 … … 233 232 } 234 233 235 int stream_io_write(GIOChannel *source,GIOCondition condition,gpointer data){ 236 Stream *s; 237 char * str; 234 int stream_write_bytes(Stream *s,const char *buf,int l){ 238 235 GIOStatus st; 239 236 GError *error=NULL; 237 gsize written=0; 240 238 gsize br; 241 242 s=(Stream *)data; 243 g_assert(s); 244 245 if (s->write_buf && s->write_pos>=0 && s->write_len>=0){ 246 st=g_io_channel_write_chars(source, 247 s->write_buf+s->write_pos, 248 s->write_len-s->write_pos, 239 gchar *str; 240 241 if (!l) return 0; 242 g_assert(buf!=NULL); 243 g_assert(l>=0); 244 while(written<l) { 245 st=g_io_channel_write_chars(s->ioch, 246 buf+written, 247 l-written, 249 248 &br, 250 249 &error); 251 250 if (st==G_IO_STATUS_ERROR){ 252 251 g_warning("write error: %s",error?error->message:"unknwown"); 253 s->write_watch=0;254 252 s->xs->f(XSTREAM_CLOSE,NULL,s); 255 253 if (error) g_error_free(error); 256 return FALSE;254 return -1; 257 255 } 258 if (st==G_IO_STATUS_AGAIN) return TRUE;256 if (st==G_IO_STATUS_AGAIN) continue; 259 257 if (st!=G_IO_STATUS_NORMAL){ 260 258 g_warning("write status: %i (%s)",st,error?error->message:"unknown"); 261 259 if (error) g_error_free(error); 262 return TRUE;260 return -1; 263 261 } 264 262 if (error) g_error_free(error); … … 266 264 str=g_new(char,br+1); 267 265 g_assert(str!=NULL); 268 memcpy(str, s->write_buf+s->write_pos,br);266 memcpy(str,buf+written,br); 269 267 str[br]=0; 270 268 debug("OUT: %s",str); 271 269 g_free(str); 272 s->write_pos+=br; 273 debug("write_pos: %i, write_len: %i, br: %i\n",s->write_pos,s->write_len,(int)br); 274 if (s->write_pos==s->write_len){ 275 s->write_watch=0; 276 s->write_len=0; 277 s->write_pos=-1; 278 return FALSE; 279 } 280 } 281 return TRUE; 282 } 283 284 int stream_write_bytes(Stream *s,const char *buf,int l){ 285 time_t stream_flush_timeout; 286 GIOStatus st; 287 GError *error=NULL; 288 289 if (!l) return 0; 290 g_assert(buf!=NULL); 291 g_assert(l>=0); 292 if (s->write_len+l > MAX_WRITE_BUF){ 293 stream_flush_timeout=time(NULL)+WRITE_BUF_FLUSH_TIMEOUT; 294 if (s->write_len){ 295 debug("Flushing stream write buffer, as it is full (%i characters, %i to be added)",s->write_len,l); 296 /* block to flush the buffer */ 297 while(time(NULL)<stream_flush_timeout){ 298 if (!g_main_is_running(main_loop)) break; 299 stream_io_write(s->ioch,G_IO_OUT,(gpointer)s); 300 if (s->write_len==0) break; 301 } 302 } 303 if (!s->write_len && l>MAX_WRITE_BUF){ 304 int pos=0; 305 gsize written; 306 char *str; 307 debug("Flushing data that don't fit into the write buffer."); 308 while(pos<l){ 309 st=g_io_channel_write_chars(s->ioch, 310 (gchar *)buf+pos, 311 l-pos, 312 &written, 313 &error); 314 if (st!=G_IO_STATUS_AGAIN && st!=G_IO_STATUS_NORMAL){ 315 g_warning("write: %i: %s",st,error?error->message:"unknown"); 316 if (error) g_error_free(error); 317 return -2; 318 } 319 if (error) g_error_free(error); 320 error=NULL; 321 str=g_new(char,written+1); 322 g_assert(str!=NULL); 323 memcpy(str,buf+pos,written); 324 str[written]=0; 325 debug("OUT: %s",str); 326 g_free(str); 327 pos+=written; 328 } 329 return 0; 330 } 331 if (s->write_len+l > MAX_WRITE_BUF){ 332 debug("Write buffer overflow!"); 333 return -2; 334 } 335 } 336 if (!s->write_buf){ 337 s->write_buf=g_new(char,1024); 338 g_assert(s->write_buf!=NULL); 339 s->write_buf_len=1024; 340 } 341 else if (s->write_len+l > s->write_buf_len){ 342 s->write_buf_len+=1024*((l+1023)/1024); 343 s->write_buf=(char *)g_realloc(s->write_buf,s->write_buf_len); 344 g_assert(s->write_buf!=NULL); 345 } 346 memcpy(s->write_buf+s->write_len,buf,l); 347 s->write_len+=l; 348 if (s->write_pos<0) s->write_pos=0; 349 if (!s->write_watch) s->write_watch=g_io_add_watch(s->ioch,G_IO_OUT,stream_io_write,s); 270 written+=br; 271 } 350 272 return 0; 351 273 } … … 410 332 if (s->err_watch) g_source_remove(s->err_watch); 411 333 if (s->read_watch) g_source_remove(s->read_watch); 412 if (s->write_watch) g_source_remove(s->write_watch);413 334 if (s->read_buf) free(s->read_buf); 414 if (s->write_buf) free(s->write_buf);415 335 pool_free(s->xs->p); 416 336 g_io_channel_close(s->ioch);
