TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
12 :
13 : #include <boost/corosio/tcp_socket.hpp>
14 : #include <boost/corosio/shutdown_type.hpp>
15 : #include <boost/corosio/native/detail/reactor/reactor_basic_socket.hpp>
16 : #include <boost/corosio/detail/dispatch_coro.hpp>
17 : #include <boost/capy/buffers.hpp>
18 :
19 : #include <coroutine>
20 :
21 : #include <errno.h>
22 : #include <sys/socket.h>
23 : #include <sys/uio.h>
24 :
25 : namespace boost::corosio::detail {
26 :
27 : /** CRTP base for reactor-backed stream socket implementations.
28 :
29 : Inherits shared data members and cancel/close/register logic
30 : from reactor_basic_socket. Adds the stream-specific remote
31 : endpoint, shutdown, and I/O dispatch (connect, read, write).
32 :
33 : @tparam Derived The concrete socket type (CRTP).
34 : @tparam Service The backend's socket service type.
35 : @tparam ConnOp The backend's connect op type.
36 : @tparam ReadOp The backend's read op type.
37 : @tparam WriteOp The backend's write op type.
38 : @tparam DescState The backend's descriptor_state type.
39 : @tparam ImplBase The public vtable base
40 : (tcp_socket::implementation or
41 : local_stream_socket::implementation).
42 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
43 : */
44 : template<
45 : class Derived,
46 : class Service,
47 : class ConnOp,
48 : class ReadOp,
49 : class WriteOp,
50 : class DescState,
51 : class ImplBase = tcp_socket::implementation,
52 : class Endpoint = endpoint>
53 : class reactor_stream_socket
54 : : public reactor_basic_socket<
55 : Derived,
56 : ImplBase,
57 : Service,
58 : DescState,
59 : Endpoint>
60 : {
61 : using base_type = reactor_basic_socket<
62 : Derived,
63 : ImplBase,
64 : Service,
65 : DescState,
66 : Endpoint>;
67 : friend base_type;
68 : friend Derived;
69 :
70 : protected:
71 : // NOLINTNEXTLINE(bugprone-crtp-constructor-accessibility)
72 HIT 11099 : explicit reactor_stream_socket(Service& svc) noexcept : base_type(svc) {}
73 :
74 : protected:
75 : Endpoint remote_endpoint_;
76 :
77 : public:
78 : /// Pending connect operation slot.
79 : ConnOp conn_;
80 :
81 : /// Pending read operation slot.
82 : ReadOp rd_;
83 :
84 : /// Pending write operation slot.
85 : WriteOp wr_;
86 :
87 11099 : ~reactor_stream_socket() override = default;
88 :
89 : /// Return the cached remote endpoint.
90 46 : Endpoint remote_endpoint() const noexcept override
91 : {
92 46 : return remote_endpoint_;
93 : }
94 :
95 : // --- Virtual method overrides (satisfy ImplBase pure virtuals) ---
96 :
97 3672 : std::coroutine_handle<> connect(
98 : std::coroutine_handle<> h,
99 : capy::executor_ref ex,
100 : Endpoint ep,
101 : std::stop_token token,
102 : std::error_code* ec) override
103 : {
104 3672 : return do_connect(h, ex, ep, token, ec);
105 : }
106 :
107 254529 : std::coroutine_handle<> read_some(
108 : std::coroutine_handle<> h,
109 : capy::executor_ref ex,
110 : buffer_param param,
111 : std::stop_token token,
112 : std::error_code* ec,
113 : std::size_t* bytes_out) override
114 : {
115 254529 : return do_read_some(h, ex, param, token, ec, bytes_out);
116 : }
117 :
118 254235 : std::coroutine_handle<> write_some(
119 : std::coroutine_handle<> h,
120 : capy::executor_ref ex,
121 : buffer_param param,
122 : std::stop_token token,
123 : std::error_code* ec,
124 : std::size_t* bytes_out) override
125 : {
126 254235 : return do_write_some(h, ex, param, token, ec, bytes_out);
127 : }
128 :
129 : std::error_code
130 6 : shutdown(corosio::shutdown_type what) noexcept override
131 : {
132 6 : return do_shutdown(static_cast<int>(what));
133 : }
134 :
135 183 : void cancel() noexcept override
136 : {
137 183 : this->do_cancel();
138 183 : }
139 :
140 : // --- End virtual overrides ---
141 :
142 : /// Close the socket (non-virtual, called by the service).
143 : void close_socket() noexcept
144 : {
145 : this->do_close_socket();
146 : }
147 :
148 : /** Shut down part or all of the full-duplex connection.
149 :
150 : @param what 0 = receive, 1 = send, 2 = both.
151 : */
152 6 : std::error_code do_shutdown(int what) noexcept
153 : {
154 : int how;
155 6 : switch (what)
156 : {
157 2 : case 0: // shutdown_receive
158 2 : how = SHUT_RD;
159 2 : break;
160 2 : case 1: // shutdown_send
161 2 : how = SHUT_WR;
162 2 : break;
163 2 : case 2: // shutdown_both
164 2 : how = SHUT_RDWR;
165 2 : break;
166 MIS 0 : default:
167 0 : return make_err(EINVAL);
168 : }
169 HIT 6 : if (::shutdown(this->fd_, how) != 0)
170 MIS 0 : return make_err(errno);
171 HIT 6 : return {};
172 : }
173 :
174 : /// Cache local and remote endpoints.
175 7326 : void set_endpoints(Endpoint local, Endpoint remote) noexcept
176 : {
177 7326 : this->local_endpoint_ = std::move(local);
178 7326 : remote_endpoint_ = std::move(remote);
179 7326 : }
180 :
181 : /** Shared connect dispatch.
182 :
183 : Tries the connect syscall speculatively. On synchronous
184 : completion, returns via inline budget or posts through queue.
185 : On EINPROGRESS, registers with the reactor.
186 : */
187 : std::coroutine_handle<> do_connect(
188 : std::coroutine_handle<>,
189 : capy::executor_ref,
190 : Endpoint const&,
191 : std::stop_token const&,
192 : std::error_code*);
193 :
194 : /** Shared scatter-read dispatch.
195 :
196 : Tries readv() speculatively. On success or hard error,
197 : returns via inline budget or posts through queue.
198 : On EAGAIN, registers with the reactor.
199 : */
200 : std::coroutine_handle<> do_read_some(
201 : std::coroutine_handle<>,
202 : capy::executor_ref,
203 : buffer_param,
204 : std::stop_token const&,
205 : std::error_code*,
206 : std::size_t*);
207 :
208 : /** Shared gather-write dispatch.
209 :
210 : Tries the write via WriteOp::write_policy speculatively.
211 : On success or hard error, returns via inline budget or
212 : posts through queue. On EAGAIN, registers with the reactor.
213 : */
214 : std::coroutine_handle<> do_write_some(
215 : std::coroutine_handle<>,
216 : capy::executor_ref,
217 : buffer_param,
218 : std::stop_token const&,
219 : std::error_code*,
220 : std::size_t*);
221 :
222 : /** Close the socket and cancel pending operations.
223 :
224 : Extends the base do_close_socket() to also reset
225 : the remote endpoint.
226 : */
227 33315 : void do_close_socket() noexcept
228 : {
229 33315 : base_type::do_close_socket();
230 33315 : remote_endpoint_ = Endpoint{};
231 33315 : }
232 :
233 : private:
234 : // CRTP callbacks for reactor_basic_socket cancel/close
235 :
236 : template<class Op>
237 193 : reactor_op_base** op_to_desc_slot(Op& op) noexcept
238 : {
239 193 : if (&op == static_cast<void*>(&conn_))
240 MIS 0 : return &this->desc_state_.connect_op;
241 HIT 193 : if (&op == static_cast<void*>(&rd_))
242 193 : return &this->desc_state_.read_op;
243 MIS 0 : if (&op == static_cast<void*>(&wr_))
244 0 : return &this->desc_state_.write_op;
245 0 : return nullptr;
246 : }
247 :
248 : template<class Op>
249 0 : bool* op_to_cancel_flag(Op& op) noexcept
250 : {
251 0 : if (&op == static_cast<void*>(&conn_))
252 0 : return &this->desc_state_.connect_cancel_pending;
253 0 : if (&op == static_cast<void*>(&rd_))
254 0 : return &this->desc_state_.read_cancel_pending;
255 0 : if (&op == static_cast<void*>(&wr_))
256 0 : return &this->desc_state_.write_cancel_pending;
257 0 : return nullptr;
258 : }
259 :
260 : template<class Fn>
261 HIT 33500 : void for_each_op(Fn fn) noexcept
262 : {
263 33500 : fn(conn_);
264 33500 : fn(rd_);
265 33500 : fn(wr_);
266 33500 : }
267 :
268 : template<class Fn>
269 33500 : void for_each_desc_entry(Fn fn) noexcept
270 : {
271 33500 : fn(conn_, this->desc_state_.connect_op);
272 33500 : fn(rd_, this->desc_state_.read_op);
273 33500 : fn(wr_, this->desc_state_.write_op);
274 33500 : }
275 : };
276 :
277 : template<
278 : class Derived,
279 : class Service,
280 : class ConnOp,
281 : class ReadOp,
282 : class WriteOp,
283 : class DescState,
284 : class ImplBase,
285 : class Endpoint>
286 : std::coroutine_handle<>
287 3672 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
288 : do_connect(
289 : std::coroutine_handle<> h,
290 : capy::executor_ref ex,
291 : Endpoint const& ep,
292 : std::stop_token const& token,
293 : std::error_code* ec)
294 : {
295 3672 : auto& op = conn_;
296 :
297 3672 : sockaddr_storage storage{};
298 3672 : socklen_t addrlen = to_sockaddr(ep, socket_family(this->fd_), storage);
299 : int result =
300 3672 : ::connect(this->fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
301 :
302 3672 : if (result == 0)
303 : {
304 4 : sockaddr_storage local_storage{};
305 4 : socklen_t local_len = sizeof(local_storage);
306 4 : if (::getsockname(
307 : this->fd_, reinterpret_cast<sockaddr*>(&local_storage),
308 4 : &local_len) == 0)
309 MIS 0 : this->local_endpoint_ =
310 HIT 4 : from_sockaddr_as(local_storage, local_len, Endpoint{});
311 4 : remote_endpoint_ = ep;
312 : }
313 :
314 3672 : if (result == 0 || errno != EINPROGRESS)
315 : {
316 4 : int err = (result < 0) ? errno : 0;
317 4 : if (this->svc_.scheduler().try_consume_inline_budget())
318 : {
319 MIS 0 : *ec = err ? make_err(err) : std::error_code{};
320 0 : op.cont_op.cont.h = h;
321 0 : return dispatch_coro(ex, op.cont_op.cont);
322 : }
323 HIT 4 : op.reset();
324 4 : op.h = h;
325 4 : op.ex = ex;
326 4 : op.ec_out = ec;
327 4 : op.fd = this->fd_;
328 4 : op.target_endpoint = ep;
329 4 : op.start(token, static_cast<Derived*>(this));
330 4 : op.impl_ptr = this->shared_from_this();
331 4 : op.complete(err, 0);
332 4 : this->svc_.post(&op);
333 4 : return std::noop_coroutine();
334 : }
335 :
336 : // EINPROGRESS — register with reactor
337 3668 : op.reset();
338 3668 : op.h = h;
339 3668 : op.ex = ex;
340 3668 : op.ec_out = ec;
341 3668 : op.fd = this->fd_;
342 3668 : op.target_endpoint = ep;
343 3668 : op.start(token, static_cast<Derived*>(this));
344 3668 : op.impl_ptr = this->shared_from_this();
345 :
346 3668 : this->register_op(
347 3668 : op, this->desc_state_.connect_op, this->desc_state_.write_ready,
348 3668 : this->desc_state_.connect_cancel_pending, true);
349 3668 : return std::noop_coroutine();
350 : }
351 :
352 : template<
353 : class Derived,
354 : class Service,
355 : class ConnOp,
356 : class ReadOp,
357 : class WriteOp,
358 : class DescState,
359 : class ImplBase,
360 : class Endpoint>
361 : std::coroutine_handle<>
362 254529 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
363 : do_read_some(
364 : std::coroutine_handle<> h,
365 : capy::executor_ref ex,
366 : buffer_param param,
367 : std::stop_token const& token,
368 : std::error_code* ec,
369 : std::size_t* bytes_out)
370 : {
371 254529 : auto& op = rd_;
372 254529 : op.reset();
373 :
374 254529 : capy::mutable_buffer bufs[ReadOp::max_buffers];
375 254529 : op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
376 :
377 254529 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
378 : {
379 2 : op.empty_buffer_read = true;
380 2 : op.h = h;
381 2 : op.ex = ex;
382 2 : op.ec_out = ec;
383 2 : op.bytes_out = bytes_out;
384 2 : op.start(token, static_cast<Derived*>(this));
385 2 : op.impl_ptr = this->shared_from_this();
386 2 : op.complete(0, 0);
387 2 : this->svc_.post(&op);
388 2 : return std::noop_coroutine();
389 : }
390 :
391 509054 : for (int i = 0; i < op.iovec_count; ++i)
392 : {
393 254527 : op.iovecs[i].iov_base = bufs[i].data();
394 254527 : op.iovecs[i].iov_len = bufs[i].size();
395 : }
396 :
397 : // Speculative read; for the single-buffer case use recv() so the
398 : // kernel skips the readv iov_iter setup.
399 : ssize_t n;
400 254527 : if (op.iovec_count == 1)
401 : {
402 : do
403 : {
404 254527 : n = ::recv(this->fd_, bufs[0].data(), bufs[0].size(), 0);
405 : }
406 254527 : while (n < 0 && errno == EINTR);
407 : }
408 : else
409 : {
410 : do
411 : {
412 MIS 0 : n = ::readv(this->fd_, op.iovecs, op.iovec_count);
413 : }
414 0 : while (n < 0 && errno == EINTR);
415 : }
416 :
417 HIT 254527 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
418 : {
419 254142 : int err = (n < 0) ? errno : 0;
420 254142 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
421 :
422 254142 : if (this->svc_.scheduler().try_consume_inline_budget())
423 : {
424 203346 : if (err)
425 MIS 0 : *ec = make_err(err);
426 HIT 203346 : else if (n == 0)
427 10 : *ec = capy::error::eof;
428 : else
429 203336 : *ec = {};
430 203346 : *bytes_out = bytes;
431 203346 : op.cont_op.cont.h = h;
432 203346 : return dispatch_coro(ex, op.cont_op.cont);
433 : }
434 50796 : op.h = h;
435 50796 : op.ex = ex;
436 50796 : op.ec_out = ec;
437 50796 : op.bytes_out = bytes_out;
438 50796 : op.start(token, static_cast<Derived*>(this));
439 50796 : op.impl_ptr = this->shared_from_this();
440 50796 : op.complete(err, bytes);
441 50796 : this->svc_.post(&op);
442 50796 : return std::noop_coroutine();
443 : }
444 :
445 : // EAGAIN — register with reactor
446 385 : op.h = h;
447 385 : op.ex = ex;
448 385 : op.ec_out = ec;
449 385 : op.bytes_out = bytes_out;
450 385 : op.fd = this->fd_;
451 385 : op.start(token, static_cast<Derived*>(this));
452 385 : op.impl_ptr = this->shared_from_this();
453 :
454 385 : this->register_op(
455 385 : op, this->desc_state_.read_op, this->desc_state_.read_ready,
456 385 : this->desc_state_.read_cancel_pending);
457 385 : return std::noop_coroutine();
458 : }
459 :
460 : template<
461 : class Derived,
462 : class Service,
463 : class ConnOp,
464 : class ReadOp,
465 : class WriteOp,
466 : class DescState,
467 : class ImplBase,
468 : class Endpoint>
469 : std::coroutine_handle<>
470 254235 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
471 : do_write_some(
472 : std::coroutine_handle<> h,
473 : capy::executor_ref ex,
474 : buffer_param param,
475 : std::stop_token const& token,
476 : std::error_code* ec,
477 : std::size_t* bytes_out)
478 : {
479 254235 : auto& op = wr_;
480 254235 : op.reset();
481 :
482 254235 : capy::mutable_buffer bufs[WriteOp::max_buffers];
483 254235 : op.iovec_count =
484 254235 : static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
485 :
486 254235 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
487 : {
488 2 : op.h = h;
489 2 : op.ex = ex;
490 2 : op.ec_out = ec;
491 2 : op.bytes_out = bytes_out;
492 2 : op.start(token, static_cast<Derived*>(this));
493 2 : op.impl_ptr = this->shared_from_this();
494 2 : op.complete(0, 0);
495 2 : this->svc_.post(&op);
496 2 : return std::noop_coroutine();
497 : }
498 :
499 508466 : for (int i = 0; i < op.iovec_count; ++i)
500 : {
501 254233 : op.iovecs[i].iov_base = bufs[i].data();
502 254233 : op.iovecs[i].iov_len = bufs[i].size();
503 : }
504 :
505 : // Speculative write; the single-buffer case dispatches to a
506 : // backend-specific fast path so the kernel skips msghdr/iov_iter
507 : // setup (and so each backend can pick the right SIGPIPE strategy).
508 : ssize_t n;
509 254233 : if (op.iovec_count == 1)
510 : {
511 508466 : n = WriteOp::write_policy::write_one(
512 254233 : this->fd_, bufs[0].data(), bufs[0].size());
513 : }
514 : else
515 : {
516 MIS 0 : n = WriteOp::write_policy::write(
517 0 : this->fd_, op.iovecs, op.iovec_count);
518 : }
519 :
520 HIT 254233 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
521 : {
522 254233 : int err = (n < 0) ? errno : 0;
523 254233 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
524 :
525 254233 : if (this->svc_.scheduler().try_consume_inline_budget())
526 : {
527 203400 : *ec = err ? make_err(err) : std::error_code{};
528 203400 : *bytes_out = bytes;
529 203400 : op.cont_op.cont.h = h;
530 203400 : return dispatch_coro(ex, op.cont_op.cont);
531 : }
532 50833 : op.h = h;
533 50833 : op.ex = ex;
534 50833 : op.ec_out = ec;
535 50833 : op.bytes_out = bytes_out;
536 50833 : op.start(token, static_cast<Derived*>(this));
537 50833 : op.impl_ptr = this->shared_from_this();
538 50833 : op.complete(err, bytes);
539 50833 : this->svc_.post(&op);
540 50833 : return std::noop_coroutine();
541 : }
542 :
543 : // EAGAIN — register with reactor
544 MIS 0 : op.h = h;
545 0 : op.ex = ex;
546 0 : op.ec_out = ec;
547 0 : op.bytes_out = bytes_out;
548 0 : op.fd = this->fd_;
549 0 : op.start(token, static_cast<Derived*>(this));
550 0 : op.impl_ptr = this->shared_from_this();
551 :
552 0 : this->register_op(
553 0 : op, this->desc_state_.write_op, this->desc_state_.write_ready,
554 0 : this->desc_state_.write_cancel_pending, true);
555 0 : return std::noop_coroutine();
556 : }
557 :
558 : } // namespace boost::corosio::detail
559 :
560 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
|