wibble  1.1
pipe.h
Go to the documentation of this file.
1 // -*- C++ -*- (c) 2008 Petr Rockai <me@mornfall.net>
2 
3 #include <wibble/sys/macros.h>
4 
5 #ifdef POSIX
6 #include <fcntl.h>
7 #include <sys/select.h>
8 #endif
9 #include <unistd.h>
10 
11 #include <deque>
12 #include <cerrno>
13 
14 #include <wibble/exception.h>
15 #include <wibble/sys/thread.h>
16 #include <wibble/sys/mutex.h>
17 #include <wibble/sys/exec.h>
18 
19 #ifndef WIBBLE_SYS_PIPE_H
20 #define WIBBLE_SYS_PIPE_H
21 
22 namespace wibble {
23 namespace sys {
24 
25 namespace wexcept = wibble::exception;
26 
27 struct Pipe {
28 
30  int fd;
31  bool close;
32  std::string data;
33  bool running;
34  bool closed;
36 
37  Writer() : fd( -1 ), close( false ), running( false ) {}
38 
39  void *main() {
40  do {
41  int wrote = 0;
42 
43  {
44  wibble::sys::MutexLock __l( mutex );
45  wrote = ::write( fd, data.c_str(), data.length() );
46  if ( wrote > 0 )
47  data.erase( data.begin(), data.begin() + wrote );
48  }
49 
50  if ( wrote == -1 ) {
51  if ( blocking( errno ) )
52 #ifdef POSIX
53  sched_yield();
54 #else
55  ;
56 #endif
57  else
58  throw wexcept::System( "writing to pipe" );
59  }
60  } while ( !done() );
61 
62  wibble::sys::MutexLock __l( mutex );
63  running = false;
64  if ( close )
65  ::close( fd );
66 
67  return 0;
68  }
69 
70  bool done() {
71  wibble::sys::MutexLock __l( mutex );
72  if ( data.empty() )
73  running = false;
74  return !running;
75  }
76 
77  void run( int _fd, std::string what ) {
78  wibble::sys::MutexLock __l( mutex );
79 
80  if ( running )
81  assert_eq( _fd, fd );
82  fd = _fd;
83  assert_neq( fd, -1 );
84 
85  data += what;
86  if ( running )
87  return;
88  running = true;
89  start();
90  }
91  };
92 
93  typedef std::deque< char > Buffer;
94  Buffer buffer;
95  int fd;
96  bool _eof;
98 
99  Pipe( int p ) : fd( p ), _eof( false )
100  {
101  if ( p == -1 )
102  return;
103 #ifdef POSIX
104  if ( fcntl( fd, F_SETFL, O_NONBLOCK ) == -1 )
105  throw wexcept::System( "fcntl on a pipe" );
106 #endif
107  }
108  Pipe() : fd( -1 ), _eof( false ) {}
109 
110  /* Writes data to the pipe, asynchronously. */
111  void write( std::string what ) {
112  writer.run( fd, what );
113  }
114 
115  void close() {
116  wibble::sys::MutexLock __l( writer.mutex );
117  writer.close = true;
118  if ( !writer.running )
119  ::close( fd );
120  }
121 
122  bool valid() {
123  return fd != -1;
124  }
125 
126  bool active() {
127  return valid() && !eof();
128  }
129 
130  bool eof() {
131  return _eof;
132  }
133 
134  static bool blocking( int err ) {
135 #ifdef POSIX
136  return err == EAGAIN || err == EWOULDBLOCK;
137 #else
138  return err == EAGAIN;
139 #endif
140  }
141 
142  int readMore() {
143  assert( valid() );
144  char _buffer[1024];
145  int r = ::read( fd, _buffer, 1023 );
146  if ( r == -1 && !blocking( errno ) )
147  throw wexcept::System( "reading from pipe" );
148  else if ( r == -1 )
149  return 0;
150  if ( r == 0 )
151  _eof = true;
152  else
153  std::copy( _buffer, _buffer + r, std::back_inserter( buffer ) );
154  return r;
155  }
156 
157  std::string nextChunk() {
158  std::string line( buffer.begin(), buffer.end() );
159  buffer.clear();
160  return line;
161  }
162 
163  std::string nextLine() {
164  assert( valid() );
165  Buffer::iterator nl =
166  std::find( buffer.begin(), buffer.end(), '\n' );
167  while ( nl == buffer.end() ) {
168  if ( !readMore() )
169  return ""; // would block, so give up
170  nl = std::find( buffer.begin(), buffer.end(), '\n' );
171  }
172  std::string line( buffer.begin(), nl );
173 
174  if ( nl != buffer.end() )
175  ++ nl;
176  buffer.erase( buffer.begin(), nl );
177 
178  return line;
179  }
180 
181  /* Only returns on eof() or when data is buffered. */
182  void wait() {
183  assert( valid() );
184 #ifdef POSIX
185  fd_set fds;
186  FD_ZERO( &fds );
187 #endif
188  while ( buffer.empty() && !eof() ) {
189  if ( readMore() )
190  return;
191  if ( eof() )
192  return;
193 #ifdef POSIX
194 #pragma GCC diagnostic push
195 #pragma GCC diagnostic ignored "-Wold-style-cast"
196  FD_SET( fd, &fds );
197  select( fd + 1, &fds, 0, 0, 0 );
198 #pragma GCC diagnostic pop
199 #else
200  sleep( 1 );
201 #endif
202  }
203  }
204  std::string nextLineBlocking() {
205  assert( valid() );
206  std::string l;
207  while ( !eof() ) {
208  l = nextLine();
209  if ( !l.empty() )
210  return l;
211  if ( eof() )
212  return std::string( buffer.begin(), buffer.end() );
213  wait();
214  }
215  return l;
216  }
217 
218 };
219 
221 {
222  std::string cmd;
223 
224  PipeThrough( const std::string& _cmd ) : cmd( _cmd ) {}
225 
226  std::string run( std::string data ) {
227  int _in, _out;
228 
229 #ifdef _WIN32
230  Exec exec(cmd);
231 #elif defined POSIX
232  ShellCommand exec(cmd);
233 #endif
234 
235  exec.setupRedirects( &_in, &_out, 0 );
236  exec.fork();
237 
238  Pipe in( _in ), out( _out );
239 
240  in.write( data );
241  in.close();
242  std::string ret;
243  while ( !out.eof() ) {
244  out.wait();
245  ret += out.nextChunk();
246  }
247  return ret;
248  }
249 };
250 
251 }
252 }
253 #endif
std::string run(std::string data)
Definition: pipe.h:226
Iterator< typename I::value_type > iterator(I i)
Definition: iterator.h:123
std::deque< char > Buffer
Definition: pipe.h:93
Definition: core.h:11
void start()
Start the thread.
Definition: thread.cpp:70
std::string data
Definition: pipe.h:32
bool _eof
Definition: pipe.h:96
Acquire a mutex lock, RAII-style.
Definition: mutex.h:200
void sleep(int secs)
Portable version of sleep.
Definition: thread.cpp:31
void run(int _fd, std::string what)
Definition: pipe.h:77
Definition: pipe.h:220
#define assert_eq(x, y)
Definition: test.h:33
#define assert(x)
Definition: test.h:30
bool active()
Definition: pipe.h:126
Definition: pipe.h:27
Encapsulates a thread.
Definition: thread.h:83
bool closed
Definition: pipe.h:34
pthread mutex wrapper; WARNING: the class allows copying and assignment, but this is not always safe...
Definition: mutex.h:47
Writer()
Definition: pipe.h:37
bool done()
Definition: pipe.h:70
std::string nextLineBlocking()
Definition: pipe.h:204
void write(std::string what)
Definition: pipe.h:111
int readMore()
Definition: pipe.h:142
Definition: pipe.h:29
void wait()
Definition: pipe.h:182
std::string nextLine()
Definition: pipe.h:163
wibble::sys::Mutex mutex
Definition: pipe.h:35
Pipe(int p)
Definition: pipe.h:99
void * main()
Main thread function, executed in the new thread after creation.
Definition: pipe.h:39
pid_t fork()
For a subprocess to run proc.
PipeThrough(const std::string &_cmd)
Definition: pipe.h:224
void setupRedirects(int *stdinfd=0, int *stdoutfd=0, int *stderrfd=0)
Definition: childprocess.cpp:145
int fd
Definition: pipe.h:30
Definition: amorph.h:17
void close()
Definition: pipe.h:115
int fd
Definition: pipe.h:95
#define assert_neq(x, y)
Definition: test.h:36
bool close
Definition: pipe.h:31
std::string nextChunk()
Definition: pipe.h:157
bool running
Definition: pipe.h:33
Pipe()
Definition: pipe.h:108
bool eof()
Definition: pipe.h:130
bool valid()
Definition: pipe.h:122
static bool blocking(int err)
Definition: pipe.h:134
Buffer buffer
Definition: pipe.h:94
Base class for system exceptions.
Definition: exception.h:396
std::string cmd
Definition: pipe.h:222
Writer writer
Definition: pipe.h:97
Execute a shell command using /bin/sh -c.
Definition: exec.h:97
Execute external commands, either forked as a ChildProcess or directly using exec().
Definition: exec.h:33