diff --git a/src/quic/README.md b/src/quic/README.md index 9583acbcd76417..3d90cacdbb6c34 100644 --- a/src/quic/README.md +++ b/src/quic/README.md @@ -77,7 +77,7 @@ data channels that carry application data. Every entry point that may generate outbound data creates a `SendPendingDataScope`. Scopes nest — an internal depth counter ensures -`Application::SendPendingData()` is called exactly once, when the outermost +`Session::SendPendingData()` is called exactly once, when the outermost scope exits: ```cpp @@ -218,13 +218,13 @@ Session::Receive() ```text SendPendingDataScope::~SendPendingDataScope() - → Application::SendPendingData() + → Session::SendPendingData() Loop (up to max_packet_count): - ├── GetStreamData() // pull data from next stream - │ └── stream->Pull() // bob pull from Outbound→DataQueue - ├── WriteVStream() // ngtcp2_conn_writev_stream() + ├── application().GetStreamData() // pull data from next stream + │ └── stream->Pull() // bob pull from Outbound→DataQueue + ├── WriteVStream() // ngtcp2_conn_writev_stream() │ encrypts, frames, paces - ├── if ndatalen > 0: StreamCommit() + ├── if ndatalen > 0: application().StreamCommit() │ stream->Commit(datalen, fin) ├── if nwrite > 0: Send() // uv_udp_send() ├── if WRITE_MORE: continue // room for more in this packet diff --git a/src/quic/application.cc b/src/quic/application.cc index ce5d5e12154d8a..cc7e031204183c 100644 --- a/src/quic/application.cc +++ b/src/quic/application.cc @@ -165,24 +165,6 @@ MaybeLocal Session::Application_Options::ToObject( // ============================================================================ -std::string Session::Application::StreamData::ToString() const { - DebugIndentScope indent; - - size_t total_bytes = 0; - for (size_t n = 0; n < count; n++) { - total_bytes += data[n].len; - } - - auto prefix = indent.Prefix(); - std::string res("{"); - res += prefix + "count: " + std::to_string(count); - res += prefix + "id: " + std::to_string(id); - res += prefix + "fin: " + std::to_string(fin); - res += prefix + "total: " + std::to_string(total_bytes); - res += indent.Close(); - return res; -} - Session::Application::Application(Session* session, const Options& options) : session_(session) {} @@ -254,11 +236,6 @@ bool Session::Application::ValidateTicketData( return true; } -Packet::Ptr Session::Application::CreateStreamDataPacket() { - return session_->endpoint().CreatePacket( - session_->remote_address(), session_->max_packet_size(), "stream data"); -} - void Session::Application::ReceiveStreamClose(Stream* stream, QuicError&& error) { DCHECK_NOT_NULL(stream); @@ -277,425 +254,6 @@ void Session::Application::ReceiveStreamReset(Stream* stream, stream->ReceiveStreamReset(final_size, std::move(error)); } -// Attempts to pack a pending datagram into the current packet. -// Returns the nwrite value from ngtcp2_conn_writev_datagram. -// On fatal error, closes the session and returns the error code. -// The caller should check: -// > 0: packet is complete, send it (pos was NOT advanced — caller -// must add nwrite to pos and send) -// NGTCP2_ERR_WRITE_MORE: datagram packed, room for more -// 0: congestion controlled or doesn't fit, datagram stays in queue -// < 0 (other): fatal error, session already closed -ssize_t Session::Application::TryWritePendingDatagram(PathStorage* path, - uint8_t* dest, - size_t destlen, - uint64_t ts) { - CHECK(session_->HasPendingDatagrams()); - auto max_attempts = session_->config().options.max_datagram_send_attempts; - - // Skip datagrams that have already exceeded the send attempt limit - // from a previous SendPendingData cycle. - while (session_->HasPendingDatagrams()) { - auto& front = session_->PeekPendingDatagram(); - if (front.send_attempts < max_attempts) break; - Debug(session_, - "Datagram %" PRIu64 " abandoned after %u attempts", - front.id, - front.send_attempts); - session_->DatagramStatus(front.id, DatagramStatus::ABANDONED); - session_->PopPendingDatagram(); - } - - if (!session_->HasPendingDatagrams()) return 0; - auto& dg = session_->PeekPendingDatagram(); - ngtcp2_vec dgvec = dg.data; - int accepted = 0; - int dg_flags = NGTCP2_WRITE_DATAGRAM_FLAG_MORE; - - // PacketInfo for the datagram path. When libuv gains per-socket ECN - // marking, the value from ngtcp2 should be forwarded to the send path. - PacketInfo dg_pi; - ssize_t dg_nwrite = ngtcp2_conn_writev_datagram(*session_, - &path->path, - dg_pi, - dest, - destlen, - &accepted, - dg_flags, - dg.id, - &dgvec, - 1, - ts); - - if (accepted) { - // Nice, the datagram was accepted! - Debug(session_, "Datagram %" PRIu64 " accepted into packet", dg.id); - session_->DatagramSent(dg.id); - session_->PopPendingDatagram(); - } else { - Debug(session_, "Datagram %" PRIu64 " not accepted into packet", dg.id); - } - - switch (dg_nwrite) { - case 0: { - // If dg_nwrite is 0, we are either congestion controlled or - // there wasn't enough room in the packet for the datagram or - // we aren't in a state where we can send. - // We'll skip this attempt and return 0. - CHECK(!accepted); - dg.send_attempts++; - return 0; - } - case NGTCP2_ERR_WRITE_MORE: { - // There's still room left in the packet! - return NGTCP2_ERR_WRITE_MORE; - } - case NGTCP2_ERR_INVALID_STATE: - case NGTCP2_ERR_INVALID_ARGUMENT: { - // Non-fatal error cases. Peer either does not support datagrams - // or the datagram is too large for the peer's max. - // Abandon the datagram and signal skip by returning std::nullopt. - session_->DatagramStatus(dg.id, DatagramStatus::ABANDONED); - session_->PopPendingDatagram(); - return 0; - } - default: { - // Fatal errors: PKT_NUM_EXHAUSTED, CALLBACK_FAILURE, NOMEM, etc. - Debug(session_, "Fatal datagram error: %zd", dg_nwrite); - session_->SetLastError(QuicError::ForNgtcp2Error(dg_nwrite)); - session_->Close(CloseMethod::SILENT); - return dg_nwrite; - } - } - UNREACHABLE(); -} - -// the SendPendingData method is the primary driver for sending data from the -// application layer. It loops through available stream data and pending -// datagrams and generates packets to send until there is either no more -// data to send or we hit the maximum number of packets to send in one go. -// This method is extremely delicate. A bug in this method can break the -// entire QUIC implementation; so be very careful when making changes here -// and make sure to test thoroughly. When in doubt... don't change it. -void Session::Application::SendPendingData() { - DCHECK(!session().is_destroyed()); - if (!session().can_send_packets()) [[unlikely]] { - return; - } - // Upper bound on packets per SendPendingData call. ngtcp2's send quantum - // is typically 64 KB, which at 1200-byte minimum packet size is ~53 - // packets. 64 covers the worst case with headroom. The actual count per - // call is dynamically capped by ngtcp2_conn_get_send_quantum(). - static constexpr size_t kMaxPackets = 64; - Debug(session_, "Application sending pending data"); - // Cache the timestamp once for the entire send loop. ngtcp2 does not - // require nanosecond-accurate monotonicity within a single burst — - // a single timestamp per SendPendingData call is what other QUIC - // implementations use (e.g., quiche, msquic). When kernel-level - // packet pacing becomes available via libuv, this timestamp becomes - // the base for computing per-packet transmit timestamps. - const uint64_t ts = uv_hrtime(); - PathStorage path; - StreamData stream_data; - - bool closed = false; - - // Batch accumulation: packets are collected here and flushed via - // Session::SendBatch when the loop exits, the batch is full, or - // on early return. This enables synchronous batched delivery via - // uv_udp_try_send2 (sendmmsg) from the deferred flush path. - Packet::Ptr batch[kMaxPackets]; - PathStorage batch_paths[kMaxPackets]; - size_t batch_count = 0; - - auto flush_batch = [&] { - if (batch_count == 0) return; - session_->SendBatch(batch, batch_paths, batch_count); - batch_count = 0; - }; - - auto update_stats = OnScopeLeave([&] { - if (closed) return; - // Flush any remaining accumulated packets before updating stats. - flush_batch(); - if (session().is_destroyed()) [[unlikely]] - return; - - // Get a strong pointer to protect against potential destruction during - // updating the time and data stats. - BaseObjectPtr s(session_); - s->UpdatePacketTxTime(); - s->UpdateTimer(); - s->UpdateDataStats(); - }); - - // The maximum size of packet to create. - const size_t max_packet_size = session_->max_packet_size(); - - // The maximum number of packets to send in this call to SendPendingData. - const size_t max_packet_count = std::min( - kMaxPackets, ngtcp2_conn_get_send_quantum(*session_) / max_packet_size); - if (max_packet_count == 0) return; - - // The number of packets that have been prepared in this call. - size_t packet_send_count = 0; - - Packet::Ptr packet; - - auto ensure_packet = [&] { - if (!packet) { - packet = CreateStreamDataPacket(); - if (!packet) [[unlikely]] - return false; - } - DCHECK(packet); - return true; - }; - - // Accumulate a completed packet into the batch. - auto enqueue_packet = - [&](Packet::Ptr& pkt, size_t len, const PacketInfo& pi) { - Debug(session_, "Enqueuing packet with %zu bytes into batch", len); - pkt->Truncate(len); - pkt->set_pkt_info(pi); - path.CopyTo(&batch_paths[batch_count]); - batch[batch_count++] = std::move(pkt); - }; - - // We're going to enter a loop here to prepare and send no more than - // max_packet_count packets. - for (;;) { - // ndatalen is the amount of stream data that was accepted into the packet. - ssize_t ndatalen = 0; - - // Make sure we have a packet to write data into. - if (!ensure_packet()) [[unlikely]] { - Debug(session_, "Failed to create packet for stream data"); - // Doh! Could not create a packet. Time to bail. - session_->SetLastError(QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL)); - closed = true; - return session_->Close(CloseMethod::SILENT); - } - - // The stream_data is the next block of data from the application stream. - if (GetStreamData(&stream_data) < 0) { - Debug(session_, "Application failed to get stream data"); - session_->SetLastError(QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL)); - closed = true; - return session_->Close(CloseMethod::SILENT); - } - - // If we got here, we were at least successful in checking for stream data. - // There might not be any stream data to send. If there is no stream data, - // that's perfectly fine, we still need to serialize any frames we do have - // (pings, acks, datagrams, etc) so we'll just keep going. - if (stream_data.id >= 0) { - Debug(session_, "Application using stream data: %s", stream_data); - } else { - Debug(session_, "No stream data to send"); - } - if (session_->HasPendingDatagrams()) { - Debug(session_, "There are pending datagrams to send"); - } - - // Awesome, let's write our packet! - PacketInfo pi; - ssize_t nwrite = WriteVStream(&path, - &pi, - packet->data(), - &ndatalen, - packet->length(), - stream_data, - ts); - - // When ndatalen is > 0, that's our indication that stream data was accepted - // in to the packet. Yay! - if (ndatalen > 0) { - Debug(session_, - "Application accepted %zu bytes from stream %" PRIi64 - " into packet", - ndatalen, - stream_data.id); - if (!StreamCommit(&stream_data, ndatalen)) { - // Data was accepted into the packet, but for some reason adjusting - // the stream's committed data failed. Treat as fatal. - Debug(session_, "Failed to commit accepted bytes in stream"); - session_->SetLastError(QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL)); - closed = true; - return session_->Close(CloseMethod::SILENT); - } - } else if (stream_data.id >= 0) { - Debug(session_, - "Application did not accept any bytes from stream %" PRIi64 - " into packet", - stream_data.id); - } - - // When nwrite is zero, it means we are congestion limited or it is - // just not our turn to send something. Re-schedule the stream if it - // had unsent data (payload or FIN) so the next timer-triggered - // SendPendingData retries it. Without this, a FIN-only send that - // hits nwrite=0 is lost forever — the stream already returned EOS - // from Pull and won't be re-scheduled by anyone else. - // We call Application::ResumeStream directly (not Session::ResumeStream) - // to avoid creating a SendPendingDataScope — we're already inside - // SendPendingData and re-entering would just hit nwrite=0 again. - if (nwrite == 0) { - Debug(session_, "Congestion or not our turn to send"); - if (stream_data.id >= 0 && (stream_data.count > 0 || stream_data.fin)) { - ResumeStream(stream_data.id); - } - return; - } - - // A negative nwrite value indicates either an error or that there is more - // data to write into the packet. - if (nwrite < 0) { - switch (nwrite) { - case NGTCP2_ERR_STREAM_DATA_BLOCKED: { - // We could not write any data for this stream into the packet because - // the flow control for the stream itself indicates that the stream - // is blocked. We'll skip and move on to the next stream. - // ndatalen = -1 means that no stream data was accepted into the - // packet, which is what we want here. - DCHECK_EQ(ndatalen, -1); - // We should only have received this error if there was an actual - // stream identified in the stream data, but let's double check. - DCHECK_GE(stream_data.id, 0); - session_->StreamDataBlocked(stream_data.id); - continue; - } - case NGTCP2_ERR_STREAM_SHUT_WR: { - // Indicates that the writable side of the stream should be closed - // locally or the stream is being reset. In either case, we can't send - // data for this stream! - Debug(session_, - "Closing stream %" PRIi64 " for writing", - stream_data.id); - // ndatalen = -1 means that no stream data was accepted into the - // packet, which is what we want here. - DCHECK_EQ(ndatalen, -1); - // We should only have received this error if there was an actual - // stream identified in the stream data, but let's double check. - DCHECK_GE(stream_data.id, 0); - if (stream_data.stream) [[likely]] { - stream_data.stream->EndWritable(); - } - // Notify the application that the stream's write side is shut - // so it stops queuing data. Without this, GetStreamData would - // keep returning the same stream and we'd loop forever. - StreamWriteShut(stream_data.id); - continue; - } - case NGTCP2_ERR_WRITE_MORE: { - Debug(session_, "Packet buffer not full, coalesce more data into it"); - // Room for more in this packet. Try to pack a pending datagram - // if there is one. Otherwise just loop around and keep going. - if (session_->HasPendingDatagrams()) { - auto result = TryWritePendingDatagram( - &path, packet->data(), packet->length(), ts); - // When result is 0, either the datagram was congestion controlled, - // didn't fit in the packet, or was abandoned. Skip and continue. - - // When result is > 0, the packet is done and the result is the - // completed size of the packet we're sending. - if (result > 0) { - size_t len = result; - Debug(session_, "Sending packet with %zu bytes", len); - enqueue_packet(packet, len, pi); - if (++packet_send_count == max_packet_count) return; - } else if (result < 0) { - // Any negative result other than NGTCP2_ERR_WRITE_MORE - // at this point is fatal. The session will have been - // closed. - if (result != NGTCP2_ERR_WRITE_MORE) return; - } - } - continue; - } - case NGTCP2_ERR_CALLBACK_FAILURE: { - // This case really should not happen. It indicates that the - // ngtcp2 callback failed for some reason. This would be a - // bug in our code. - Debug(session_, "Internal failure with ngtcp2 callback"); - session_->SetLastError( - QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL)); - closed = true; - return session_->Close(CloseMethod::SILENT); - } - } - - // Some other type of error happened. - DCHECK_EQ(ndatalen, -1); - Debug(session_, - "Application encountered error while writing packet: %s", - ngtcp2_strerror(nwrite)); - session_->SetLastError(QuicError::ForNgtcp2Error(nwrite)); - closed = true; - return session_->Close(CloseMethod::SILENT); - } - - // At this point we have a packet prepared to send. The nwrite - // is the size of the packet we are sending. - size_t len = nwrite; - Debug(session_, "Sending packet with %zu bytes", len); - enqueue_packet(packet, len, pi); - if (++packet_send_count == max_packet_count) return; - - // If there are pending datagrams, try sending them in a fresh packet. - // This is necessary because ngtcp2_conn_writev_stream only returns - // NGTCP2_ERR_WRITE_MORE when there is actual stream data — when no - // streams are active, the coalescing path above is never reached and - // datagrams would never be sent. - if (session_->HasPendingDatagrams()) { - if (!ensure_packet()) [[unlikely]] { - Debug(session_, "Failed to create packet for datagram"); - session_->SetLastError(QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL)); - closed = true; - return session_->Close(CloseMethod::SILENT); - } - auto result = - TryWritePendingDatagram(&path, packet->data(), packet->length(), ts); - if (result > 0) { - Debug(session_, "Sending datagram packet with %zd bytes", result); - enqueue_packet(packet, static_cast(result), PacketInfo()); - if (++packet_send_count == max_packet_count) return; - } else if (result < 0 && result != NGTCP2_ERR_WRITE_MORE) { - // Fatal error — session already closed by TryWritePendingDatagram. - return; - } - // If result == 0 (congestion) or NGTCP2_ERR_WRITE_MORE (datagram - // packed but room for more), the loop continues normally. - } - } -} - -ssize_t Session::Application::WriteVStream(PathStorage* path, - PacketInfo* pi, - uint8_t* dest, - ssize_t* ndatalen, - size_t max_packet_size, - const StreamData& stream_data, - uint64_t ts) { - DCHECK_LE(stream_data.count, kMaxVectorCount); - uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE; - if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN; - // The PacketInfo out-param is populated by ngtcp2 with the ECN codepoint - // to apply when sending this packet. When libuv gains per-socket ECN - // marking, the value should be forwarded to the send path. - return ngtcp2_conn_writev_stream(*session_, - &path->path, - *pi, - dest, - max_packet_size, - ndatalen, - flags, - stream_data.id, - stream_data, - stream_data.count, - ts); -} - // ============================================================================ // The DefaultApplication is the default implementation of Session::Application // that is used for all unrecognized ALPN identifiers. diff --git a/src/quic/application.h b/src/quic/application.h index 0df9b9f0a0e68d..619b41dd8d0b12 100644 --- a/src/quic/application.h +++ b/src/quic/application.h @@ -205,10 +205,6 @@ class Session::Application : public MemoryRetainer { return false; } - // Signals to the Application that it should serialize and transmit any - // pending session and stream packets it has accumulated. - void SendPendingData(); - // Returns true if the application protocol supports sending and // receiving headers on streams (e.g. HTTP/3). Applications that // do not support headers should return false (the default). @@ -243,10 +239,6 @@ class Session::Application : public MemoryRetainer { return {StreamPriority::DEFAULT, StreamPriorityFlags::NON_INCREMENTAL}; } - // The StreamData struct is used by the application to pass pending stream - // data to the session for transmission. - struct StreamData; - virtual int GetStreamData(StreamData* data) = 0; virtual bool StreamCommit(StreamData* data, size_t datalen) = 0; @@ -262,57 +254,9 @@ class Session::Application : public MemoryRetainer { } private: - Packet::Ptr CreateStreamDataPacket(); - - // Tries to pack a pending datagram into the current packet buffer. - // If < 0 is returned, either NGTCP2_ERR_WRITE_MORE or a fatal error is - // returned; the caller must check. If > 0 is returned, the packet is done - // and the value is the size of the finalized packet. If 0 is returned, - // the datagram is either congestion limited or was abandoned - ssize_t TryWritePendingDatagram(PathStorage* path, - uint8_t* dest, - size_t destlen, - uint64_t ts); - - // Write the given stream_data into the buffer. The PacketInfo out-param - // is populated by ngtcp2 with per-packet metadata (e.g., ECN codepoint) - // that should be applied when sending the packet. - ssize_t WriteVStream(PathStorage* path, - PacketInfo* pi, - uint8_t* buf, - ssize_t* ndatalen, - size_t max_packet_size, - const StreamData& stream_data, - uint64_t ts); - Session* session_ = nullptr; }; -struct Session::Application::StreamData final { - // The actual number of vectors in the struct, up to kMaxVectorCount. - size_t count = 0; - // The stream identifier. If this is a negative value then no stream is - // identified. - stream_id id = -1; - int fin = 0; - ngtcp2_vec data[kMaxVectorCount]{}; - BaseObjectPtr stream; - - static_assert(sizeof(ngtcp2_vec) == sizeof(nghttp3_vec) && - alignof(ngtcp2_vec) == alignof(nghttp3_vec) && - offsetof(ngtcp2_vec, base) == offsetof(nghttp3_vec, base) && - offsetof(ngtcp2_vec, len) == offsetof(nghttp3_vec, len), - "ngtcp2_vec and nghttp3_vec must have identical layout"); - inline operator nghttp3_vec*() { - return reinterpret_cast(data); - } - - inline operator const ngtcp2_vec*() const { return data; } - inline operator ngtcp2_vec*() { return data; } - - std::string ToString() const; -}; - // Create a DefaultApplication for the given session. std::unique_ptr CreateDefaultApplication( Session* session, const Session::Application_Options& options); diff --git a/src/quic/http3.cc b/src/quic/http3.cc index bc479f96990577..072829870b09d5 100644 --- a/src/quic/http3.cc +++ b/src/quic/http3.cc @@ -209,15 +209,15 @@ class Http3ApplicationImpl final : public Session::Application { started_ = true; Debug(&session(), "Starting HTTP/3 application."); - auto params = ngtcp2_conn_get_remote_transport_params(session()); - if (params == nullptr) [[unlikely]] { + const auto params = session().remote_transport_params(); + if (!params) [[unlikely]] { // The params are not available yet. Cannot start. Debug(&session(), "Cannot start HTTP/3 application yet. No remote transport params"); return false; } - if (params->initial_max_streams_uni < 3) { + if (params.initial_max_streams_uni() < 3) { // HTTP3 requires 3 unidirectional control streams to be opened in each // direction in additional to the bidirectional streams that are used to // actually carry request and response payload back and forth. @@ -225,8 +225,9 @@ class Http3ApplicationImpl final : public Session::Application { // https://nghttp2.org/nghttp3/programmers-guide.html#binding-control-streams Debug(&session(), "Cannot start HTTP/3 application. Initial max " - "unidirectional streams [%zu] is too low. Must be at least 3", - params->initial_max_streams_uni); + "unidirectional streams [%" PRIu64 + "] is too low. Must be at least 3", + params.initial_max_streams_uni()); return false; } @@ -235,17 +236,14 @@ class Http3ApplicationImpl final : public Session::Application { // of requests that the client can actually created. if (session().is_server()) { nghttp3_conn_set_max_client_streams_bidi( - *this, params->initial_max_streams_bidi); + *this, params.initial_max_streams_bidi()); } Debug(&session(), "Creating and binding HTTP/3 control streams"); bool ret = - ngtcp2_conn_open_uni_stream(session(), &control_stream_id_, nullptr) == - 0 && - ngtcp2_conn_open_uni_stream( - session(), &qpack_enc_stream_id_, nullptr) == 0 && - ngtcp2_conn_open_uni_stream( - session(), &qpack_dec_stream_id_, nullptr) == 0 && + session().OpenUni(&control_stream_id_) && + session().OpenUni(&qpack_enc_stream_id_) && + session().OpenUni(&qpack_dec_stream_id_) && nghttp3_conn_bind_control_stream(*this, control_stream_id_) == 0 && nghttp3_conn_bind_qpack_streams( *this, qpack_enc_stream_id_, qpack_dec_stream_id_) == 0; @@ -306,8 +304,7 @@ class Http3ApplicationImpl final : public Session::Application { Debug(&session(), "Extending stream and connection offset by %zd bytes", nread); - session().ExtendStreamOffset(id, nread); - session().ExtendOffset(nread); + session().Consume(id, nread); } // If this data arrived as 0-RTT, mark the stream. We set it after @@ -365,24 +362,11 @@ class Http3ApplicationImpl final : public Session::Application { case EndpointLabel::LOCAL: return; case EndpointLabel::REMOTE: { - switch (direction) { - case Direction::BIDIRECTIONAL: { - Debug(&session(), - "HTTP/3 application extending max bidi streams by %" PRIu64, - max_streams); - ngtcp2_conn_extend_max_streams_bidi( - session(), static_cast(max_streams)); - break; - } - case Direction::UNIDIRECTIONAL: { - Debug(&session(), - "HTTP/3 application extending max uni streams by %" PRIu64, - max_streams); - ngtcp2_conn_extend_max_streams_uni( - session(), static_cast(max_streams)); - break; - } - } + Debug(&session(), + "HTTP/3 application extending max %s streams by %" PRIu64, + direction == Direction::BIDIRECTIONAL ? "bidi" : "uni", + max_streams); + session().ExtendMaxStreams(direction, max_streams); } } } @@ -530,8 +514,7 @@ class Http3ApplicationImpl final : public Session::Application { return; } - session().SetLastError( - QuicError::ForApplication(nghttp3_err_infer_quic_app_error_code(rv))); + session().SetApplicationError(nghttp3_err_infer_quic_app_error_code(rv)); session().Close(); } @@ -548,8 +531,7 @@ class Http3ApplicationImpl final : public Session::Application { return; } - session().SetLastError( - QuicError::ForApplication(nghttp3_err_infer_quic_app_error_code(rv))); + session().SetApplicationError(nghttp3_err_infer_quic_app_error_code(rv)); session().Close(); } @@ -688,12 +670,22 @@ class Http3ApplicationImpl final : public Session::Application { } int GetStreamData(StreamData* data) override { + static_assert( + sizeof(ngtcp2_vec) == sizeof(nghttp3_vec) && + alignof(ngtcp2_vec) == alignof(nghttp3_vec) && + offsetof(ngtcp2_vec, base) == offsetof(nghttp3_vec, base) && + offsetof(ngtcp2_vec, len) == offsetof(nghttp3_vec, len), + "ngtcp2_vec and nghttp3_vec must have identical layout"); data->count = kMaxVectorCount; ssize_t ret = 0; Debug(&session(), "HTTP/3 application getting stream data"); if (conn_ && session().max_data_left()) { - ret = nghttp3_conn_writev_stream( - *this, &data->id, &data->fin, *data, data->count); + ret = + nghttp3_conn_writev_stream(*this, + &data->id, + &data->fin, + reinterpret_cast(data->data), + data->count); // A negative return value indicates an error. if (ret < 0) { return static_cast(ret); @@ -720,8 +712,7 @@ class Http3ApplicationImpl final : public Session::Application { // nghttp3 tracks its own offset via add_write_offset. int err = nghttp3_conn_add_write_offset(*this, data->id, datalen); if (err != 0) { - session().SetLastError(QuicError::ForApplication( - nghttp3_err_infer_quic_app_error_code(err))); + session().SetApplicationError(nghttp3_err_infer_quic_app_error_code(err)); return false; } // Raw application bytes are committed to the stream's outbound @@ -1211,10 +1202,10 @@ class Http3ApplicationImpl final : public Session::Application { void* conn_user_data, void* stream_user_data) { NGHTTP3_CALLBACK_SCOPE(app); - auto& session = app.session(); - Debug(&session, "HTTP/3 application deferred consume %zu bytes", consumed); - session.ExtendStreamOffset(id, consumed); - session.ExtendOffset(consumed); + Debug(&app.session(), + "HTTP/3 application deferred consume %zu bytes", + consumed); + app.session().Consume(id, consumed); return NGTCP2_SUCCESS; } diff --git a/src/quic/session.cc b/src/quic/session.cc index 8380e477c01e80..1844585d081bfc 100644 --- a/src/quic/session.cc +++ b/src/quic/session.cc @@ -1730,10 +1730,480 @@ Session::SendPendingDataScope::~SendPendingDataScope() { Debug(session, "Send Scope Depth %zu", session->impl_->send_scope_depth_); if (--session->impl_->send_scope_depth_ == 0 && session->impl_->application_ && !session->impl_->handshake_deferred_) { - session->application().SendPendingData(); + session->SendPendingData(); } } +// ============================================================================ + +std::string StreamData::ToString() const { + DebugIndentScope indent; + + size_t total_bytes = 0; + for (size_t n = 0; n < count; n++) { + total_bytes += data[n].len; + } + + auto prefix = indent.Prefix(); + std::string res("{"); + res += prefix + "count: " + std::to_string(count); + res += prefix + "id: " + std::to_string(id); + res += prefix + "fin: " + std::to_string(fin); + res += prefix + "total: " + std::to_string(total_bytes); + res += indent.Close(); + return res; +} + +Packet::Ptr Session::CreateStreamDataPacket() { + return endpoint().CreatePacket( + remote_address(), max_packet_size(), "stream data"); +} + +// Attempts to pack a pending datagram into the current packet. +// Returns the nwrite value from ngtcp2_conn_writev_datagram. +// On fatal error, closes the session and returns the error code. +// The caller should check: +// > 0: packet is complete, send it (pos was NOT advanced — caller +// must add nwrite to pos and send) +// NGTCP2_ERR_WRITE_MORE: datagram packed, room for more +// 0: congestion controlled or doesn't fit, datagram stays in queue +// < 0 (other): fatal error, session already closed +ssize_t Session::TryWritePendingDatagram(PathStorage* path, + uint8_t* dest, + size_t destlen, + uint64_t ts) { + CHECK(HasPendingDatagrams()); + auto max_attempts = config().options.max_datagram_send_attempts; + + // Skip datagrams that have already exceeded the send attempt limit + // from a previous SendPendingData cycle. + while (HasPendingDatagrams()) { + auto& front = PeekPendingDatagram(); + if (front.send_attempts < max_attempts) break; + Debug(this, + "Datagram %" PRIu64 " abandoned after %u attempts", + front.id, + front.send_attempts); + DatagramStatus(front.id, DatagramStatus::ABANDONED); + PopPendingDatagram(); + } + + if (!HasPendingDatagrams()) return 0; + auto& dg = PeekPendingDatagram(); + ngtcp2_vec dgvec = dg.data; + int accepted = 0; + int dg_flags = NGTCP2_WRITE_DATAGRAM_FLAG_MORE; + + // PacketInfo for the datagram path. When libuv gains per-socket ECN + // marking, the value from ngtcp2 should be forwarded to the send path. + PacketInfo dg_pi; + ssize_t dg_nwrite = ngtcp2_conn_writev_datagram(*this, + &path->path, + dg_pi, + dest, + destlen, + &accepted, + dg_flags, + dg.id, + &dgvec, + 1, + ts); + + if (accepted) { + // Nice, the datagram was accepted! + Debug(this, "Datagram %" PRIu64 " accepted into packet", dg.id); + DatagramSent(dg.id); + PopPendingDatagram(); + } else { + Debug(this, "Datagram %" PRIu64 " not accepted into packet", dg.id); + } + + switch (dg_nwrite) { + case 0: { + // If dg_nwrite is 0, we are either congestion controlled or + // there wasn't enough room in the packet for the datagram or + // we aren't in a state where we can send. + // We'll skip this attempt and return 0. + CHECK(!accepted); + dg.send_attempts++; + return 0; + } + case NGTCP2_ERR_WRITE_MORE: { + // There's still room left in the packet! + return NGTCP2_ERR_WRITE_MORE; + } + case NGTCP2_ERR_INVALID_STATE: + case NGTCP2_ERR_INVALID_ARGUMENT: { + // Non-fatal error cases. Peer either does not support datagrams + // or the datagram is too large for the peer's max. + // Abandon the datagram and signal skip by returning std::nullopt. + DatagramStatus(dg.id, DatagramStatus::ABANDONED); + PopPendingDatagram(); + return 0; + } + default: { + // Fatal errors: PKT_NUM_EXHAUSTED, CALLBACK_FAILURE, NOMEM, etc. + Debug(this, "Fatal datagram error: %zd", dg_nwrite); + SetLastError(QuicError::ForNgtcp2Error(dg_nwrite)); + Close(CloseMethod::SILENT); + return dg_nwrite; + } + } + UNREACHABLE(); +} + +// the SendPendingData method is the primary driver for sending data from the +// application layer. It loops through available stream data and pending +// datagrams and generates packets to send until there is either no more +// data to send or we hit the maximum number of packets to send in one go. +// This method is extremely delicate. A bug in this method can break the +// entire QUIC implementation; so be very careful when making changes here +// and make sure to test thoroughly. When in doubt... don't change it. +void Session::SendPendingData() { + DCHECK(!is_destroyed()); + if (!can_send_packets()) [[unlikely]] { + return; + } + // Upper bound on packets per SendPendingData call. ngtcp2's send quantum + // is typically 64 KB, which at 1200-byte minimum packet size is ~53 + // packets. 64 covers the worst case with headroom. The actual count per + // call is dynamically capped by ngtcp2_conn_get_send_quantum(). + static constexpr size_t kMaxPackets = 64; + Debug(this, "Session sending pending data"); + // Cache the timestamp once for the entire send loop. ngtcp2 does not + // require nanosecond-accurate monotonicity within a single burst — + // a single timestamp per SendPendingData call is what other QUIC + // implementations use (e.g., quiche, msquic). When kernel-level + // packet pacing becomes available via libuv, this timestamp becomes + // the base for computing per-packet transmit timestamps. + const uint64_t ts = uv_hrtime(); + PathStorage path; + StreamData stream_data; + + bool closed = false; + + // Batch accumulation: packets are collected here and flushed via + // Session::SendBatch when the loop exits, the batch is full, or + // on early return. This enables synchronous batched delivery via + // uv_udp_try_send2 (sendmmsg) from the deferred flush path. + Packet::Ptr batch[kMaxPackets]; + PathStorage batch_paths[kMaxPackets]; + size_t batch_count = 0; + + auto flush_batch = [&] { + if (batch_count == 0) return; + SendBatch(batch, batch_paths, batch_count); + batch_count = 0; + }; + + auto update_stats = OnScopeLeave([&] { + if (closed) return; + // Flush any remaining accumulated packets before updating stats. + flush_batch(); + if (is_destroyed()) [[unlikely]] + return; + + // Get a strong pointer to protect against potential destruction during + // updating the time and data stats. + BaseObjectPtr s(this); + s->UpdatePacketTxTime(); + s->UpdateTimer(); + s->UpdateDataStats(); + }); + + // The maximum size of packet to create. + const size_t max_pktlen = max_packet_size(); + + // The maximum number of packets to send in this call to SendPendingData. + const size_t max_packet_count = + std::min(kMaxPackets, ngtcp2_conn_get_send_quantum(*this) / max_pktlen); + if (max_packet_count == 0) return; + + // The number of packets that have been prepared in this call. + size_t packet_send_count = 0; + + Packet::Ptr packet; + + auto ensure_packet = [&] { + if (!packet) { + packet = CreateStreamDataPacket(); + if (!packet) [[unlikely]] + return false; + } + DCHECK(packet); + return true; + }; + + // Accumulate a completed packet into the batch. + auto enqueue_packet = + [&](Packet::Ptr& pkt, size_t len, const PacketInfo& pi) { + Debug(this, "Enqueuing packet with %zu bytes into batch", len); + pkt->Truncate(len); + pkt->set_pkt_info(pi); + path.CopyTo(&batch_paths[batch_count]); + batch[batch_count++] = std::move(pkt); + }; + + // We're going to enter a loop here to prepare and send no more than + // max_packet_count packets. + for (;;) { + // ndatalen is the amount of stream data that was accepted into the packet. + ssize_t ndatalen = 0; + + // Make sure we have a packet to write data into. + if (!ensure_packet()) [[unlikely]] { + Debug(this, "Failed to create packet for stream data"); + // Doh! Could not create a packet. Time to bail. + SetLastError(QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL)); + closed = true; + return Close(CloseMethod::SILENT); + } + + // The stream_data is the next block of data from the application stream. + if (application().GetStreamData(&stream_data) < 0) { + Debug(this, "Application failed to get stream data"); + SetLastError(QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL)); + closed = true; + return Close(CloseMethod::SILENT); + } + + // If we got here, we were at least successful in checking for stream data. + // There might not be any stream data to send. If there is no stream data, + // that's perfectly fine, we still need to serialize any frames we do have + // (pings, acks, datagrams, etc) so we'll just keep going. + if (stream_data.id >= 0) { + Debug(this, "Session using stream data: %s", stream_data); + } else { + Debug(this, "No stream data to send"); + } + if (HasPendingDatagrams()) { + Debug(this, "There are pending datagrams to send"); + } + + // Awesome, let's write our packet! + PacketInfo pi; + ssize_t nwrite = WriteVStream(&path, + &pi, + packet->data(), + &ndatalen, + packet->length(), + stream_data, + ts); + + // When ndatalen is > 0, that's our indication that stream data was + // accepted in to the packet. Yay! + if (ndatalen > 0) { + Debug(this, + "Session accepted %zu bytes from stream %" PRIi64 " into packet", + ndatalen, + stream_data.id); + if (!application().StreamCommit(&stream_data, ndatalen)) { + // Data was accepted into the packet, but for some reason adjusting + // the stream's committed data failed. Treat as fatal. + Debug(this, "Failed to commit accepted bytes in stream"); + SetLastError(QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL)); + closed = true; + return Close(CloseMethod::SILENT); + } + // StreamCommit can call into JS, so a user callback could + // have destroyed the session: + if (is_destroyed()) [[unlikely]] { + closed = true; + return; + } + } else if (stream_data.id >= 0) { + Debug(this, + "Session did not accept any bytes from stream %" PRIi64 + " into packet", + stream_data.id); + } + + // When nwrite is zero, it means we are congestion limited or it is + // just not our turn to send something. Re-schedule the stream if it + // had unsent data (payload or FIN) so the next timer-triggered + // SendPendingData retries it. Without this, a FIN-only send that + // hits nwrite=0 is lost forever — the stream already returned EOS + // from Pull and won't be re-scheduled by anyone else. + // We call Application::ResumeStream directly (not Session::ResumeStream) + // to avoid creating a SendPendingDataScope — we're already inside + // SendPendingData and re-entering would just hit nwrite=0 again. + if (nwrite == 0) { + Debug(this, "Congestion or not our turn to send"); + if (stream_data.id >= 0 && (stream_data.count > 0 || stream_data.fin)) { + application().ResumeStream(stream_data.id); + } + + // nwrite == 0 also occurs on an otherwise-idle connection (no + // stream data and no frames due) and writev_stream never covers + // DATAGRAM frames: a datagram queued while the connection is idle + // still needs its own packet. TryWritePendingDatagram applies + // congestion control itself (result 0 when limited), so genuine + // congestion still ends the loop below. + if (HasPendingDatagrams()) { + auto result = TryWritePendingDatagram( + &path, packet->data(), packet->length(), ts); + if (result > 0) { + Debug(this, "Sending datagram packet with %zd bytes", result); + enqueue_packet(packet, static_cast(result), PacketInfo()); + if (++packet_send_count == max_packet_count) return; + continue; + } + if (result == NGTCP2_ERR_WRITE_MORE) continue; + if (result < 0) { + // Fatal error — session already closed + return; + } + // result == 0: congestion limited; fall through and return. + } + + return; + } + + // A negative nwrite value indicates either an error or that there is more + // data to write into the packet. + if (nwrite < 0) { + switch (nwrite) { + case NGTCP2_ERR_STREAM_DATA_BLOCKED: { + // We could not write any data for this stream into the packet + // because the flow control for the stream itself indicates that + // the stream is blocked. We'll skip and move on to the next stream. + // ndatalen = -1 means that no stream data was accepted into the + // packet, which is what we want here. + DCHECK_EQ(ndatalen, -1); + // We should only have received this error if there was an actual + // stream identified in the stream data, but let's double check. + DCHECK_GE(stream_data.id, 0); + StreamDataBlocked(stream_data.id); + continue; + } + case NGTCP2_ERR_STREAM_SHUT_WR: { + // Indicates that the writable side of the stream should be closed + // locally or the stream is being reset. In either case, we can't + // send data for this stream! + Debug(this, "Closing stream %" PRIi64 " for writing", stream_data.id); + // ndatalen = -1 means that no stream data was accepted into the + // packet, which is what we want here. + DCHECK_EQ(ndatalen, -1); + // We should only have received this error if there was an actual + // stream identified in the stream data, but let's double check. + DCHECK_GE(stream_data.id, 0); + if (stream_data.stream) [[likely]] { + stream_data.stream->EndWritable(); + } + // Notify the application that the stream's write side is shut + // so it stops queuing data. Without this, GetStreamData would + // keep returning the same stream and we'd loop forever. + application().StreamWriteShut(stream_data.id); + continue; + } + case NGTCP2_ERR_WRITE_MORE: { + Debug(this, "Packet buffer not full, coalesce more data into it"); + // Room for more in this packet. Try to pack a pending datagram + // if there is one. Otherwise just loop around and keep going. + if (HasPendingDatagrams()) { + auto result = TryWritePendingDatagram( + &path, packet->data(), packet->length(), ts); + // When result is 0, either the datagram was congestion controlled, + // didn't fit in the packet, or was abandoned. Skip and continue. + + // When result is > 0, the packet is done and the result is the + // completed size of the packet we're sending. + if (result > 0) { + size_t len = result; + Debug(this, "Sending packet with %zu bytes", len); + enqueue_packet(packet, len, pi); + if (++packet_send_count == max_packet_count) return; + } else if (result < 0) { + // Any negative result other than NGTCP2_ERR_WRITE_MORE + // at this point is fatal. The session will have been + // closed. + if (result != NGTCP2_ERR_WRITE_MORE) return; + } + } + continue; + } + case NGTCP2_ERR_CALLBACK_FAILURE: { + // This case really should not happen. It indicates that the + // ngtcp2 callback failed for some reason. This would be a + // bug in our code. + Debug(this, "Internal failure with ngtcp2 callback"); + SetLastError(QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL)); + closed = true; + return Close(CloseMethod::SILENT); + } + } + + // Some other type of error happened. + DCHECK_EQ(ndatalen, -1); + Debug(this, + "Session encountered error while writing packet: %s", + ngtcp2_strerror(nwrite)); + SetLastError(QuicError::ForNgtcp2Error(nwrite)); + closed = true; + return Close(CloseMethod::SILENT); + } + + // At this point we have a packet prepared to send. The nwrite + // is the size of the packet we are sending. + size_t len = nwrite; + Debug(this, "Sending packet with %zu bytes", len); + enqueue_packet(packet, len, pi); + if (++packet_send_count == max_packet_count) return; + + // If there are pending datagrams, try sending them in a fresh packet. + // This is necessary because ngtcp2_conn_writev_stream only returns + // NGTCP2_ERR_WRITE_MORE when there is actual stream data — when no + // streams are active, the coalescing path above is never reached and + // datagrams would never be sent. + if (HasPendingDatagrams()) { + if (!ensure_packet()) [[unlikely]] { + Debug(this, "Failed to create packet for datagram"); + SetLastError(QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL)); + closed = true; + return Close(CloseMethod::SILENT); + } + auto result = + TryWritePendingDatagram(&path, packet->data(), packet->length(), ts); + if (result > 0) { + Debug(this, "Sending datagram packet with %zd bytes", result); + enqueue_packet(packet, static_cast(result), PacketInfo()); + if (++packet_send_count == max_packet_count) return; + } else if (result < 0 && result != NGTCP2_ERR_WRITE_MORE) { + // Fatal error — session already closed by TryWritePendingDatagram. + return; + } + // If result == 0 (congestion) or NGTCP2_ERR_WRITE_MORE (datagram + // packed but room for more), the loop continues normally. + } + } +} + +ssize_t Session::WriteVStream(PathStorage* path, + PacketInfo* pi, + uint8_t* dest, + ssize_t* ndatalen, + size_t max_pktlen, + const StreamData& stream_data, + uint64_t ts) { + DCHECK_LE(stream_data.count, kMaxVectorCount); + uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE; + if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN; + // The PacketInfo out-param is populated by ngtcp2 with the ECN codepoint + // to apply when sending this packet. When libuv gains per-socket ECN + // marking, the value should be forwarded to the send path. + return ngtcp2_conn_writev_stream(*this, + &path->path, + *pi, + dest, + max_pktlen, + ndatalen, + flags, + stream_data.id, + stream_data, + stream_data.count, + ts); +} + // ============================================================================ BaseObjectPtr Session::Create( Endpoint* endpoint, @@ -2469,7 +2939,7 @@ void Session::FlushPendingData() { // Prefer synchronous sends during the deferred flush to avoid the // one-tick latency of async uv_udp_send from the uv_check callback. flags_.prefer_try_send = true; - application().SendPendingData(); + SendPendingData(); flags_.prefer_try_send = false; } } @@ -2959,6 +3429,30 @@ uint64_t Session::max_data_left() const { return ngtcp2_conn_get_max_data_left(*this); } +bool Session::OpenUni(stream_id* id) { + return ngtcp2_conn_open_uni_stream(*this, id, nullptr) == 0; +} + +void Session::ExtendMaxStreams(Direction direction, uint64_t max) { + switch (direction) { + case Direction::BIDIRECTIONAL: + ngtcp2_conn_extend_max_streams_bidi(*this, static_cast(max)); + break; + case Direction::UNIDIRECTIONAL: + ngtcp2_conn_extend_max_streams_uni(*this, static_cast(max)); + break; + } +} + +void Session::Consume(stream_id id, size_t len) { + ExtendStreamOffset(id, len); + ExtendOffset(len); +} + +void Session::SetApplicationError(error_code app_error_code) { + SetLastError(QuicError::ForApplication(app_error_code)); +} + uint64_t Session::max_local_streams_uni() const { DCHECK(!is_destroyed()); return ngtcp2_conn_get_streams_uni_left(*this); @@ -3157,7 +3651,7 @@ void Session::OnTimeout() { // which can synchronously destroy the session. Guard before proceeding. if (is_destroyed()) return; if (NGTCP2_OK(ret) && !is_in_closing_period() && !is_in_draining_period()) { - application().SendPendingData(); + SendPendingData(); if (is_destroyed()) return; CheckStreamIdleTimeout(uv_hrtime()); return; diff --git a/src/quic/session.h b/src/quic/session.h index 0caeb764ba56c8..c7e5bddc655e7c 100644 --- a/src/quic/session.h +++ b/src/quic/session.h @@ -27,6 +27,25 @@ namespace node::quic { class Endpoint; +// A block of pending outbound stream data, passed between the application +// layer (which fills it via GetStreamData) and the send pump (which hands +// it to ngtcp2_conn_writev_stream and commits the accepted length). +struct StreamData final { + // The actual number of vectors in the struct, up to kMaxVectorCount. + size_t count = 0; + // The stream identifier. If this is a negative value then no stream is + // identified. + stream_id id = -1; + int fin = 0; + ngtcp2_vec data[kMaxVectorCount]{}; + BaseObjectPtr stream; + + inline operator const ngtcp2_vec*() const { return data; } + inline operator ngtcp2_vec*() { return data; } + + std::string ToString() const; +}; + // A Session represents one half of a persistent connection between two QUIC // peers. Every Session is established first by performing a TLS handshake in // which the client sends an initial packet to the server containing a TLS @@ -417,6 +436,36 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source { // bindingdata.cc doesn't need the full Application type definition. void FlushPendingData(); + // The send pump: the primary driver for serializing outbound data. + // Loops through available stream data (pulled from the application + // layer via GetStreamData/StreamCommit) and pending datagrams, + // generating packets until there is no more data to send or the + // packet budget for this call is exhausted. + void SendPendingData(); + + Packet::Ptr CreateStreamDataPacket(); + + // Tries to pack a pending datagram into the current packet buffer. + // If < 0 is returned, either NGTCP2_ERR_WRITE_MORE or a fatal error is + // returned; the caller must check. If > 0 is returned, the packet is done + // and the value is the size of the finalized packet. If 0 is returned, + // the datagram is either congestion limited or was abandoned. + ssize_t TryWritePendingDatagram(PathStorage* path, + uint8_t* dest, + size_t destlen, + uint64_t ts); + + // Write the given stream_data into the buffer. The PacketInfo out-param + // is populated by ngtcp2 with per-packet metadata (e.g., ECN codepoint) + // that should be applied when sending the packet. + ssize_t WriteVStream(PathStorage* path, + PacketInfo* pi, + uint8_t* buf, + ssize_t* ndatalen, + size_t max_packet_size, + const StreamData& stream_data, + uint64_t ts); + // Send a batch of packets accumulated by SendPendingData. Uses // Endpoint::SendBatch (uv_udp_try_send2 / sendmmsg) for synchronous // batched delivery when called from the deferred flush path. @@ -486,6 +535,20 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source { void SetLastError(QuicError&& error); uint64_t max_data_left() const; + // Transport operations that protocol applications (e.g. HTTP/3) invoke on + // the session, encapsulating ngtcp2 transport details here. + + // Open a unidirectional stream, setting *id on success, or returning false + bool OpenUni(stream_id* id); + + void ExtendMaxStreams(Direction direction, uint64_t max); + + // Signal that we've consumed $len bytes on stream $id to update flow control + void Consume(stream_id id, size_t len); + + // Record an application-level error on the connection without closing it. + void SetApplicationError(error_code app_error_code); + PendingStream::PendingStreamQueue& pending_bidi_stream_queue() const; PendingStream::PendingStreamQueue& pending_uni_stream_queue() const; diff --git a/src/quic/streams.cc b/src/quic/streams.cc index e838392361f946..9f412d08882f54 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -1520,8 +1520,7 @@ void Stream::EntryRead(size_t amount) { // Extend the flow control window so the sender can transmit more. if (session().is_destroyed()) return; Session::SendPendingDataScope send_scope(&session()); - session().ExtendStreamOffset(id(), amount); - session().ExtendOffset(amount); + session().Consume(id(), amount); } void Stream::BeforePull() { diff --git a/src/quic/transportparams.cc b/src/quic/transportparams.cc index 183ba973ac1823..b49e527b038069 100644 --- a/src/quic/transportparams.cc +++ b/src/quic/transportparams.cc @@ -506,6 +506,16 @@ TransportParams::operator bool() const { return ptr_ != nullptr; } +uint64_t TransportParams::initial_max_streams_bidi() const { + DCHECK_NOT_NULL(ptr_); + return ptr_->initial_max_streams_bidi; +} + +uint64_t TransportParams::initial_max_streams_uni() const { + DCHECK_NOT_NULL(ptr_); + return ptr_->initial_max_streams_uni; +} + void TransportParams::Initialize(Environment* env, Local target) { NODE_DEFINE_CONSTANT(target, DEFAULT_MAX_STREAM_DATA); NODE_DEFINE_CONSTANT(target, DEFAULT_MAX_DATA); diff --git a/src/quic/transportparams.h b/src/quic/transportparams.h index 46724574611aff..bae20b9ac0d4f2 100644 --- a/src/quic/transportparams.h +++ b/src/quic/transportparams.h @@ -150,6 +150,9 @@ class TransportParams final { operator bool() const; + uint64_t initial_max_streams_bidi() const; + uint64_t initial_max_streams_uni() const; + // Returns a Store containing the encoded transport parameters. // If an error occurs during encoding, or if the parameters could // not be encoded, an empty Store will be returned.