TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_UDP_SOCKET_HPP
11 : #define BOOST_COROSIO_UDP_SOCKET_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/detail/platform.hpp>
15 : #include <boost/corosio/detail/except.hpp>
16 : #include <boost/corosio/detail/native_handle.hpp>
17 : #include <boost/corosio/io/io_object.hpp>
18 : #include <boost/capy/io_result.hpp>
19 : #include <boost/corosio/detail/buffer_param.hpp>
20 : #include <boost/corosio/endpoint.hpp>
21 : #include <boost/corosio/udp.hpp>
22 : #include <boost/capy/ex/executor_ref.hpp>
23 : #include <boost/capy/ex/execution_context.hpp>
24 : #include <boost/capy/ex/io_env.hpp>
25 : #include <boost/capy/concept/executor.hpp>
26 :
27 : #include <system_error>
28 :
29 : #include <concepts>
30 : #include <coroutine>
31 : #include <cstddef>
32 : #include <stop_token>
33 : #include <type_traits>
34 :
35 : namespace boost::corosio {
36 :
37 : /** An asynchronous UDP socket for coroutine I/O.
38 :
39 : This class provides asynchronous UDP datagram operations that
40 : return awaitable types. Each operation participates in the affine
41 : awaitable protocol, ensuring coroutines resume on the correct
42 : executor.
43 :
44 : Supports two modes of operation:
45 :
46 : **Connectionless mode**: each `send_to` specifies a destination
47 : endpoint, and each `recv_from` captures the source endpoint.
48 : The socket must be opened (and optionally bound) before I/O.
49 :
50 : **Connected mode**: call `connect()` to set a default peer,
51 : then use `send()`/`recv()` without endpoint arguments.
52 : The kernel filters incoming datagrams to those from the
53 : connected peer.
54 :
55 : @par Thread Safety
56 : Distinct objects: Safe.@n
57 : Shared objects: Unsafe. A socket must not have concurrent
58 : operations of the same type (e.g., two simultaneous recv_from).
59 : One send_to and one recv_from may be in flight simultaneously.
60 :
61 : @par Example
62 : @code
63 : // Connectionless mode
64 : io_context ioc;
65 : udp_socket sock( ioc );
66 : sock.open( udp::v4() );
67 : sock.bind( endpoint( ipv4_address::any(), 9000 ) );
68 :
69 : char buf[1024];
70 : endpoint sender;
71 : auto [ec, n] = co_await sock.recv_from(
72 : capy::mutable_buffer( buf, sizeof( buf ) ), sender );
73 : if ( !ec )
74 : co_await sock.send_to(
75 : capy::const_buffer( buf, n ), sender );
76 :
77 : // Connected mode
78 : udp_socket csock( ioc );
79 : auto [cec] = co_await csock.connect(
80 : endpoint( ipv4_address::loopback(), 9000 ) );
81 : if ( !cec )
82 : co_await csock.send(
83 : capy::const_buffer( buf, n ) );
84 : @endcode
85 : */
86 : class BOOST_COROSIO_DECL udp_socket : public io_object
87 : {
88 : public:
89 : /** Define backend hooks for UDP socket operations.
90 :
91 : Platform backends (epoll, kqueue, select) derive from
92 : this to implement datagram I/O and option management.
93 : */
94 : struct implementation : io_object::implementation
95 : {
96 : /** Initiate an asynchronous send_to operation.
97 :
98 : @param h Coroutine handle to resume on completion.
99 : @param ex Executor for dispatching the completion.
100 : @param buf The buffer data to send.
101 : @param dest The destination endpoint.
102 : @param token Stop token for cancellation.
103 : @param ec Output error code.
104 : @param bytes_out Output bytes transferred.
105 :
106 : @return Coroutine handle to resume immediately.
107 : */
108 : virtual std::coroutine_handle<> send_to(
109 : std::coroutine_handle<> h,
110 : capy::executor_ref ex,
111 : buffer_param buf,
112 : endpoint dest,
113 : std::stop_token token,
114 : std::error_code* ec,
115 : std::size_t* bytes_out) = 0;
116 :
117 : /** Initiate an asynchronous recv_from operation.
118 :
119 : @param h Coroutine handle to resume on completion.
120 : @param ex Executor for dispatching the completion.
121 : @param buf The buffer to receive into.
122 : @param source Output endpoint for the sender's address.
123 : @param token Stop token for cancellation.
124 : @param ec Output error code.
125 : @param bytes_out Output bytes transferred.
126 :
127 : @return Coroutine handle to resume immediately.
128 : */
129 : virtual std::coroutine_handle<> recv_from(
130 : std::coroutine_handle<> h,
131 : capy::executor_ref ex,
132 : buffer_param buf,
133 : endpoint* source,
134 : std::stop_token token,
135 : std::error_code* ec,
136 : std::size_t* bytes_out) = 0;
137 :
138 : /// Return the platform socket descriptor.
139 : virtual native_handle_type native_handle() const noexcept = 0;
140 :
141 : /** Request cancellation of pending asynchronous operations.
142 :
143 : All outstanding operations complete with operation_canceled
144 : error. Check `ec == cond::canceled` for portable comparison.
145 : */
146 : virtual void cancel() noexcept = 0;
147 :
148 : /** Set a socket option.
149 :
150 : @param level The protocol level (e.g. `SOL_SOCKET`).
151 : @param optname The option name.
152 : @param data Pointer to the option value.
153 : @param size Size of the option value in bytes.
154 : @return Error code on failure, empty on success.
155 : */
156 : virtual std::error_code set_option(
157 : int level,
158 : int optname,
159 : void const* data,
160 : std::size_t size) noexcept = 0;
161 :
162 : /** Get a socket option.
163 :
164 : @param level The protocol level (e.g. `SOL_SOCKET`).
165 : @param optname The option name.
166 : @param data Pointer to receive the option value.
167 : @param size On entry, the size of the buffer. On exit,
168 : the size of the option value.
169 : @return Error code on failure, empty on success.
170 : */
171 : virtual std::error_code
172 : get_option(int level, int optname, void* data, std::size_t* size)
173 : const noexcept = 0;
174 :
175 : /// Return the cached local endpoint.
176 : virtual endpoint local_endpoint() const noexcept = 0;
177 :
178 : /// Return the cached remote endpoint (connected mode).
179 : virtual endpoint remote_endpoint() const noexcept = 0;
180 :
181 : /** Initiate an asynchronous connect to set the default peer.
182 :
183 : @param h Coroutine handle to resume on completion.
184 : @param ex Executor for dispatching the completion.
185 : @param ep The remote endpoint to connect to.
186 : @param token Stop token for cancellation.
187 : @param ec Output error code.
188 :
189 : @return Coroutine handle to resume immediately.
190 : */
191 : virtual std::coroutine_handle<> connect(
192 : std::coroutine_handle<> h,
193 : capy::executor_ref ex,
194 : endpoint ep,
195 : std::stop_token token,
196 : std::error_code* ec) = 0;
197 :
198 : /** Initiate an asynchronous connected send operation.
199 :
200 : @param h Coroutine handle to resume on completion.
201 : @param ex Executor for dispatching the completion.
202 : @param buf The buffer data to send.
203 : @param token Stop token for cancellation.
204 : @param ec Output error code.
205 : @param bytes_out Output bytes transferred.
206 :
207 : @return Coroutine handle to resume immediately.
208 : */
209 : virtual std::coroutine_handle<> send(
210 : std::coroutine_handle<> h,
211 : capy::executor_ref ex,
212 : buffer_param buf,
213 : std::stop_token token,
214 : std::error_code* ec,
215 : std::size_t* bytes_out) = 0;
216 :
217 : /** Initiate an asynchronous connected recv operation.
218 :
219 : @param h Coroutine handle to resume on completion.
220 : @param ex Executor for dispatching the completion.
221 : @param buf The buffer to receive into.
222 : @param token Stop token for cancellation.
223 : @param ec Output error code.
224 : @param bytes_out Output bytes transferred.
225 :
226 : @return Coroutine handle to resume immediately.
227 : */
228 : virtual std::coroutine_handle<> recv(
229 : std::coroutine_handle<> h,
230 : capy::executor_ref ex,
231 : buffer_param buf,
232 : std::stop_token token,
233 : std::error_code* ec,
234 : std::size_t* bytes_out) = 0;
235 : };
236 :
237 : /** Represent the awaitable returned by @ref send_to.
238 :
239 : Captures the destination endpoint and buffer, then dispatches
240 : to the backend implementation on suspension.
241 : */
242 : struct send_to_awaitable
243 : {
244 : udp_socket& s_;
245 : buffer_param buf_;
246 : endpoint dest_;
247 : std::stop_token token_;
248 : mutable std::error_code ec_;
249 : mutable std::size_t bytes_ = 0;
250 :
251 HIT 22 : send_to_awaitable(
252 : udp_socket& s, buffer_param buf, endpoint dest) noexcept
253 22 : : s_(s)
254 22 : , buf_(buf)
255 22 : , dest_(dest)
256 : {
257 22 : }
258 :
259 22 : bool await_ready() const noexcept
260 : {
261 22 : return token_.stop_requested();
262 : }
263 :
264 22 : capy::io_result<std::size_t> await_resume() const noexcept
265 : {
266 22 : if (token_.stop_requested())
267 MIS 0 : return {make_error_code(std::errc::operation_canceled), 0};
268 HIT 22 : return {ec_, bytes_};
269 : }
270 :
271 22 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
272 : -> std::coroutine_handle<>
273 : {
274 22 : token_ = env->stop_token;
275 66 : return s_.get().send_to(
276 66 : h, env->executor, buf_, dest_, token_, &ec_, &bytes_);
277 : }
278 : };
279 :
280 : /** Represent the awaitable returned by @ref recv_from.
281 :
282 : Captures the receive buffer and source endpoint reference,
283 : then dispatches to the backend implementation on suspension.
284 : */
285 : struct recv_from_awaitable
286 : {
287 : udp_socket& s_;
288 : buffer_param buf_;
289 : endpoint& source_;
290 : std::stop_token token_;
291 : mutable std::error_code ec_;
292 : mutable std::size_t bytes_ = 0;
293 :
294 32 : recv_from_awaitable(
295 : udp_socket& s, buffer_param buf, endpoint& source) noexcept
296 32 : : s_(s)
297 32 : , buf_(buf)
298 32 : , source_(source)
299 : {
300 32 : }
301 :
302 32 : bool await_ready() const noexcept
303 : {
304 32 : return token_.stop_requested();
305 : }
306 :
307 32 : capy::io_result<std::size_t> await_resume() const noexcept
308 : {
309 32 : if (token_.stop_requested())
310 2 : return {make_error_code(std::errc::operation_canceled), 0};
311 30 : return {ec_, bytes_};
312 : }
313 :
314 32 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
315 : -> std::coroutine_handle<>
316 : {
317 32 : token_ = env->stop_token;
318 64 : return s_.get().recv_from(
319 64 : h, env->executor, buf_, &source_, token_, &ec_, &bytes_);
320 : }
321 : };
322 :
323 : /** Represent the awaitable returned by @ref connect.
324 :
325 : Captures the target endpoint, then dispatches to the backend
326 : implementation on suspension.
327 : */
328 : struct connect_awaitable
329 : {
330 : udp_socket& s_;
331 : endpoint endpoint_;
332 : std::stop_token token_;
333 : mutable std::error_code ec_;
334 :
335 12 : connect_awaitable(udp_socket& s, endpoint ep) noexcept
336 12 : : s_(s)
337 12 : , endpoint_(ep)
338 : {
339 12 : }
340 :
341 12 : bool await_ready() const noexcept
342 : {
343 12 : return token_.stop_requested();
344 : }
345 :
346 12 : capy::io_result<> await_resume() const noexcept
347 : {
348 12 : if (token_.stop_requested())
349 MIS 0 : return {make_error_code(std::errc::operation_canceled)};
350 HIT 12 : return {ec_};
351 : }
352 :
353 12 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
354 : -> std::coroutine_handle<>
355 : {
356 12 : token_ = env->stop_token;
357 12 : return s_.get().connect(h, env->executor, endpoint_, token_, &ec_);
358 : }
359 : };
360 :
361 : /** Represent the awaitable returned by @ref send.
362 :
363 : Captures the buffer, then dispatches to the backend
364 : implementation on suspension. No endpoint argument
365 : (uses the connected peer).
366 : */
367 : struct send_awaitable
368 : {
369 : udp_socket& s_;
370 : buffer_param buf_;
371 : std::stop_token token_;
372 : mutable std::error_code ec_;
373 : mutable std::size_t bytes_ = 0;
374 :
375 6 : send_awaitable(udp_socket& s, buffer_param buf) noexcept
376 6 : : s_(s)
377 6 : , buf_(buf)
378 : {
379 6 : }
380 :
381 6 : bool await_ready() const noexcept
382 : {
383 6 : return token_.stop_requested();
384 : }
385 :
386 6 : capy::io_result<std::size_t> await_resume() const noexcept
387 : {
388 6 : if (token_.stop_requested())
389 MIS 0 : return {make_error_code(std::errc::operation_canceled), 0};
390 HIT 6 : return {ec_, bytes_};
391 : }
392 :
393 6 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
394 : -> std::coroutine_handle<>
395 : {
396 6 : token_ = env->stop_token;
397 6 : return s_.get().send(h, env->executor, buf_, token_, &ec_, &bytes_);
398 : }
399 : };
400 :
401 : /** Represent the awaitable returned by @ref recv.
402 :
403 : Captures the receive buffer, then dispatches to the backend
404 : implementation on suspension. No source endpoint (connected
405 : mode filters at the kernel level).
406 : */
407 : struct recv_awaitable
408 : {
409 : udp_socket& s_;
410 : buffer_param buf_;
411 : std::stop_token token_;
412 : mutable std::error_code ec_;
413 : mutable std::size_t bytes_ = 0;
414 :
415 4 : recv_awaitable(udp_socket& s, buffer_param buf) noexcept
416 4 : : s_(s)
417 4 : , buf_(buf)
418 : {
419 4 : }
420 :
421 4 : bool await_ready() const noexcept
422 : {
423 4 : return token_.stop_requested();
424 : }
425 :
426 4 : capy::io_result<std::size_t> await_resume() const noexcept
427 : {
428 4 : if (token_.stop_requested())
429 MIS 0 : return {make_error_code(std::errc::operation_canceled), 0};
430 HIT 4 : return {ec_, bytes_};
431 : }
432 :
433 4 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
434 : -> std::coroutine_handle<>
435 : {
436 4 : token_ = env->stop_token;
437 4 : return s_.get().recv(h, env->executor, buf_, token_, &ec_, &bytes_);
438 : }
439 : };
440 :
441 : public:
442 : /** Destructor.
443 :
444 : Closes the socket if open, cancelling any pending operations.
445 : */
446 : ~udp_socket() override;
447 :
448 : /** Construct a socket from an execution context.
449 :
450 : @param ctx The execution context that will own this socket.
451 : */
452 : explicit udp_socket(capy::execution_context& ctx);
453 :
454 : /** Construct a socket from an executor.
455 :
456 : The socket is associated with the executor's context.
457 :
458 : @param ex The executor whose context will own the socket.
459 : */
460 : template<class Ex>
461 : requires(!std::same_as<std::remove_cvref_t<Ex>, udp_socket>) &&
462 : capy::Executor<Ex>
463 : explicit udp_socket(Ex const& ex) : udp_socket(ex.context())
464 : {
465 : }
466 :
467 : /** Move constructor.
468 :
469 : Transfers ownership of the socket resources.
470 :
471 : @param other The socket to move from.
472 : */
473 2 : udp_socket(udp_socket&& other) noexcept : io_object(std::move(other)) {}
474 :
475 : /** Move assignment operator.
476 :
477 : Closes any existing socket and transfers ownership.
478 :
479 : @param other The socket to move from.
480 : @return Reference to this socket.
481 : */
482 2 : udp_socket& operator=(udp_socket&& other) noexcept
483 : {
484 2 : if (this != &other)
485 : {
486 2 : close();
487 2 : h_ = std::move(other.h_);
488 : }
489 2 : return *this;
490 : }
491 :
492 : udp_socket(udp_socket const&) = delete;
493 : udp_socket& operator=(udp_socket const&) = delete;
494 :
495 : /** Open the socket.
496 :
497 : Creates a UDP socket and associates it with the platform
498 : reactor.
499 :
500 : @param proto The protocol (IPv4 or IPv6). Defaults to
501 : `udp::v4()`.
502 :
503 : @throws std::system_error on failure.
504 : */
505 : void open(udp proto = udp::v4());
506 :
507 : /** Close the socket.
508 :
509 : Releases socket resources. Any pending operations complete
510 : with `errc::operation_canceled`.
511 : */
512 : void close();
513 :
514 : /** Check if the socket is open.
515 :
516 : @return `true` if the socket is open and ready for operations.
517 : */
518 414 : bool is_open() const noexcept
519 : {
520 : #if BOOST_COROSIO_HAS_IOCP && !defined(BOOST_COROSIO_MRDOCS)
521 : return h_ && get().native_handle() != ~native_handle_type(0);
522 : #else
523 414 : return h_ && get().native_handle() >= 0;
524 : #endif
525 : }
526 :
527 : /** Bind the socket to a local endpoint.
528 :
529 : Associates the socket with a local address and port.
530 : Required before calling `recv_from`.
531 :
532 : @param ep The local endpoint to bind to.
533 :
534 : @return Error code on failure, empty on success.
535 :
536 : @throws std::logic_error if the socket is not open.
537 : */
538 : std::error_code bind(endpoint ep);
539 :
540 : /** Cancel any pending asynchronous operations.
541 :
542 : All outstanding operations complete with
543 : `errc::operation_canceled`. Check `ec == cond::canceled`
544 : for portable comparison.
545 : */
546 : void cancel();
547 :
548 : /** Get the native socket handle.
549 :
550 : @return The native socket handle, or -1 if not open.
551 : */
552 : native_handle_type native_handle() const noexcept;
553 :
554 : /** Set a socket option.
555 :
556 : @param opt The option to set.
557 :
558 : @throws std::logic_error if the socket is not open.
559 : @throws std::system_error on failure.
560 : */
561 : template<class Option>
562 20 : void set_option(Option const& opt)
563 : {
564 20 : if (!is_open())
565 MIS 0 : detail::throw_logic_error("set_option: socket not open");
566 HIT 20 : std::error_code ec = get().set_option(
567 : Option::level(), Option::name(), opt.data(), opt.size());
568 20 : if (ec)
569 MIS 0 : detail::throw_system_error(ec, "udp_socket::set_option");
570 HIT 20 : }
571 :
572 : /** Get a socket option.
573 :
574 : @return The current option value.
575 :
576 : @throws std::logic_error if the socket is not open.
577 : @throws std::system_error on failure.
578 : */
579 : template<class Option>
580 16 : Option get_option() const
581 : {
582 16 : if (!is_open())
583 MIS 0 : detail::throw_logic_error("get_option: socket not open");
584 HIT 16 : Option opt{};
585 16 : std::size_t sz = opt.size();
586 : std::error_code ec =
587 16 : get().get_option(Option::level(), Option::name(), opt.data(), &sz);
588 16 : if (ec)
589 MIS 0 : detail::throw_system_error(ec, "udp_socket::get_option");
590 HIT 16 : opt.resize(sz);
591 16 : return opt;
592 : }
593 :
594 : /** Get the local endpoint of the socket.
595 :
596 : @return The local endpoint, or a default endpoint if not bound.
597 : */
598 : endpoint local_endpoint() const noexcept;
599 :
600 : /** Send a datagram to the specified destination.
601 :
602 : @param buf The buffer containing data to send.
603 : @param dest The destination endpoint.
604 :
605 : @return An awaitable that completes with
606 : `io_result<std::size_t>`.
607 :
608 : @throws std::logic_error if the socket is not open.
609 : */
610 : template<capy::ConstBufferSequence Buffers>
611 22 : auto send_to(Buffers const& buf, endpoint dest)
612 : {
613 22 : if (!is_open())
614 MIS 0 : detail::throw_logic_error("send_to: socket not open");
615 HIT 22 : return send_to_awaitable(*this, buf, dest);
616 : }
617 :
618 : /** Receive a datagram and capture the sender's endpoint.
619 :
620 : @param buf The buffer to receive data into.
621 : @param source Reference to an endpoint that will be set to
622 : the sender's address on successful completion.
623 :
624 : @return An awaitable that completes with
625 : `io_result<std::size_t>`.
626 :
627 : @throws std::logic_error if the socket is not open.
628 : */
629 : template<capy::MutableBufferSequence Buffers>
630 32 : auto recv_from(Buffers const& buf, endpoint& source)
631 : {
632 32 : if (!is_open())
633 MIS 0 : detail::throw_logic_error("recv_from: socket not open");
634 HIT 32 : return recv_from_awaitable(*this, buf, source);
635 : }
636 :
637 : /** Initiate an asynchronous connect to set the default peer.
638 :
639 : If the socket is not already open, it is opened automatically
640 : using the address family of @p ep.
641 :
642 : @param ep The remote endpoint to connect to.
643 :
644 : @return An awaitable that completes with `io_result<>`.
645 :
646 : @throws std::system_error if the socket needs to be opened
647 : and the open fails.
648 : */
649 12 : auto connect(endpoint ep)
650 : {
651 12 : if (!is_open())
652 8 : open(ep.is_v6() ? udp::v6() : udp::v4());
653 12 : return connect_awaitable(*this, ep);
654 : }
655 :
656 : /** Send a datagram to the connected peer.
657 :
658 : @param buf The buffer containing data to send.
659 :
660 : @return An awaitable that completes with
661 : `io_result<std::size_t>`.
662 :
663 : @throws std::logic_error if the socket is not open.
664 : */
665 : template<capy::ConstBufferSequence Buffers>
666 6 : auto send(Buffers const& buf)
667 : {
668 6 : if (!is_open())
669 MIS 0 : detail::throw_logic_error("send: socket not open");
670 HIT 6 : return send_awaitable(*this, buf);
671 : }
672 :
673 : /** Receive a datagram from the connected peer.
674 :
675 : @param buf The buffer to receive data into.
676 :
677 : @return An awaitable that completes with
678 : `io_result<std::size_t>`.
679 :
680 : @throws std::logic_error if the socket is not open.
681 : */
682 : template<capy::MutableBufferSequence Buffers>
683 4 : auto recv(Buffers const& buf)
684 : {
685 4 : if (!is_open())
686 MIS 0 : detail::throw_logic_error("recv: socket not open");
687 HIT 4 : return recv_awaitable(*this, buf);
688 : }
689 :
690 : /** Get the remote endpoint of the socket.
691 :
692 : Returns the address and port of the connected peer.
693 :
694 : @return The remote endpoint, or a default endpoint if
695 : not connected.
696 : */
697 : endpoint remote_endpoint() const noexcept;
698 :
699 : protected:
700 : /// Construct from a pre-built handle (for native_udp_socket).
701 : explicit udp_socket(io_object::handle h) noexcept : io_object(std::move(h))
702 : {
703 : }
704 :
705 : private:
706 : /// Open the socket for the given protocol triple.
707 : void open_for_family(int family, int type, int protocol);
708 :
709 556 : inline implementation& get() const noexcept
710 : {
711 556 : return *static_cast<implementation*>(h_.get());
712 : }
713 : };
714 :
715 : } // namespace boost::corosio
716 :
717 : #endif // BOOST_COROSIO_UDP_SOCKET_HPP
|