1
1
Fork 0
mirror of https://github.com/oxen-io/lokinet synced 2023-12-14 06:53:00 +01:00

returned for loop

This commit is contained in:
dan 2023-03-02 13:52:52 -08:00
parent 6d488f15d4
commit 4354704145

View file

@ -701,157 +701,158 @@ namespace llarp::quic
} }
} }
auto it = strs.begin();
while (!strs.empty() && stream_packets < max_stream_packets) while (!strs.empty() && stream_packets < max_stream_packets)
{ {
log::debug(logcat, "Max stream packets: {}\nCurrent stream packets: {}", max_stream_packets, stream_packets); for (auto it = strs.begin(); it != strs.end();)
auto& stream = **it;
auto bufs = stream.pending();
if (bufs.empty()) {
log::debug(logcat, "Stream buffer empty, have you considered moving on");
}
std::vector<ngtcp2_vec> vecs;
vecs.reserve(bufs.size());
std::transform(bufs.begin(), bufs.end(), std::back_inserter(vecs), [](const auto& buf) {
return ngtcp2_vec{const_cast<uint8_t*>(u8data(buf)), buf.size()};
});
#ifndef NDEBUG
{ {
std::string buf_sizes; log::debug(logcat, "Max stream packets: {}\nCurrent stream packets: {}", max_stream_packets, stream_packets);
for (auto& b : bufs)
{ auto& stream = **it;
if (!buf_sizes.empty()) auto bufs = stream.pending();
buf_sizes += '+';
buf_sizes += std::to_string(b.size()); if (bufs.empty()) {
log::debug(logcat, "Stream buffer empty, have you considered moving on");
} }
log::debug(
logcat, "Sending {} data for {}", buf_sizes.empty() ? "no" : buf_sizes, stream.id());
}
#endif
// debug std::vector<ngtcp2_vec> vecs;
fprintf( vecs.reserve(bufs.size());
stderr, "Calling add_stream_data for vector<ngtcp2_vec> of size %zu\n", vecs.size()); std::transform(bufs.begin(), bufs.end(), std::back_inserter(vecs), [](const auto& buf) {
return ngtcp2_vec{const_cast<uint8_t*>(u8data(buf)), buf.size()};
});
if (stream.is_closing && !stream.sent_fin) #ifndef NDEBUG
{
log::debug(logcat, "Sending FIN");
flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
stream.sent_fin = true;
}
else if (stream.is_new)
{
stream.is_new = false;
}
auto nwrite = ngtcp2_conn_writev_stream(
conn.get(),
&path.path,
&send_pkt_info,
u8data(send_buffer),
send_buffer.size(),
&ndatalen,
flags,
stream.id().id,
reinterpret_cast<const ngtcp2_vec*>(vecs.data()),
vecs.size(),
(!ts) ? get_timestamp() : ts);
log::debug(logcat,
"add_stream_data for stream {} returned [{},{}]",
stream.id(),
nwrite,
ndatalen);
if (nwrite < 0)
{
if (nwrite == -240) // NGTCP2_ERR_WRITE_MORE
{ {
log::debug(logcat, std::string buf_sizes;
"Consumed {} bytes from stream {} and have space left", for (auto& b : bufs)
ndatalen,
stream.id());
stream.wrote(ndatalen);
assert(ndatalen >= 0);
if (stream.unsent() > 0)
{ {
log::debug(logcat, "We have more to write on stream {}, proceeding", stream.id()); if (!buf_sizes.empty())
++stream_packets; buf_sizes += '+';
++it; buf_sizes += std::to_string(b.size());
}
log::debug(
logcat, "Sending {} data for {}", buf_sizes.empty() ? "no" : buf_sizes, stream.id());
}
#endif
// debug
fprintf(
stderr, "Calling add_stream_data for vector<ngtcp2_vec> of size %zu\n", vecs.size());
if (stream.is_closing && !stream.sent_fin)
{
log::debug(logcat, "Sending FIN");
flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
stream.sent_fin = true;
}
else if (stream.is_new)
{
stream.is_new = false;
}
auto nwrite = ngtcp2_conn_writev_stream(
conn.get(),
&path.path,
&send_pkt_info,
u8data(send_buffer),
send_buffer.size(),
&ndatalen,
flags,
stream.id().id,
reinterpret_cast<const ngtcp2_vec*>(vecs.data()),
vecs.size(),
(!ts) ? get_timestamp() : ts);
log::debug(logcat,
"add_stream_data for stream {} returned [{},{}]",
stream.id(),
nwrite,
ndatalen);
if (nwrite < 0)
{
if (nwrite == -240) // NGTCP2_ERR_WRITE_MORE
{
log::debug(logcat,
"Consumed {} bytes from stream {} and have space left",
ndatalen,
stream.id());
stream.wrote(ndatalen);
assert(ndatalen >= 0);
if (stream.unsent() > 0)
{
log::debug(logcat, "We have more to write on stream {}, proceeding", stream.id());
++stream_packets;
++it;
continue;
}
it = strs.erase(it);
break;
}
if (nwrite == -230) // NGTCP2_ERR_CLOSING
{
log::debug(logcat, "Cannot write to {}: stream is closing", stream.id());
it = strs.erase(it);
continue; continue;
} }
it = strs.erase(it); if (nwrite == -221) // NGTCP2_ERR_STREAM_SHUT_WR
{
log::debug(logcat, "Cannot add to stream {}: stream is shut, proceeding", stream.id());
assert(ndatalen == -1);
it = strs.erase(it);
continue;
}
if (nwrite == -210) // NGTCP2_ERR_STREAM_DATA_BLOCKED
{
log::debug(logcat, "Cannot add to stream {}: stream is blocked", stream.id());
it = strs.erase(it);
continue;
}
log::warning(logcat, "Error writing non-stream data: {}", ngtcp2_strerror(nwrite));
break; break;
} }
if (nwrite == -230) // NGTCP2_ERR_CLOSING if (ndatalen >= 0)
{ {
log::debug(logcat, "Cannot write to {}: stream is closing", stream.id()); log::debug(logcat, "consumed {} bytes from stream {}", ndatalen, stream.id());
it = strs.erase(it); stream.wrote(ndatalen);
++stream_packets;
continue; continue;
} }
if (nwrite == -221) // NGTCP2_ERR_STREAM_SHUT_WR
if (nwrite == 0) // we are probably done, but maybe congested
{ {
log::debug(logcat, "Cannot add to stream {}: stream is shut, proceeding", stream.id()); log::debug(logcat,
assert(ndatalen == -1); "Done stream writing to {} (either stream is congested or we have nothing else to "
it = strs.erase(it); "send right now)",
continue; stream.id());
}
if (nwrite == -210) // NGTCP2_ERR_STREAM_DATA_BLOCKED ngtcp2_conn_stat cstat;
{ ngtcp2_conn_get_conn_stat(conn.get(), &cstat);
log::debug(logcat, "Cannot add to stream {}: stream is blocked", stream.id()); log::debug(logcat, "Current unacked bytes in flight: {}, Congestion window: {}",
cstat.bytes_in_flight, cstat.cwnd);
log::debug(logcat, "Updating pkt tx time at {}" ,__LINE__);
ngtcp2_conn_update_pkt_tx_time(conn.get(), ts);
it = strs.erase(it); it = strs.erase(it);
continue; continue;
} }
log::warning(logcat, "Error writing non-stream data: {}", ngtcp2_strerror(nwrite)); log::debug(logcat, "Sending stream data packet");
break; if (!send_packet(nwrite))
} return;
if (ndatalen >= 0)
{ log::debug(logcat, "Updating pkt tx time at {}" ,__LINE__);
log::debug(logcat, "consumed {} bytes from stream {}", ndatalen, stream.id()); ngtcp2_conn_update_pkt_tx_time(conn.get(), ts); // so far always useful
stream.wrote(ndatalen);
++stream_packets; ++stream_packets;
continue; //std::advance(it, 1);
} //it = strs.erase(it);
if (nwrite == 0) // we are probably done, but maybe congested if (++stream_packets == max_stream_packets)
{ {
log::debug(logcat, log::debug(logcat, "Max stream packets ({}) reached", max_stream_packets);
"Done stream writing to {} (either stream is congested or we have nothing else to " log::debug(logcat, "Updating pkt tx time at {}" ,__LINE__);
"send right now)", ngtcp2_conn_update_pkt_tx_time(conn.get(), ts);
stream.id()); return;
}
ngtcp2_conn_stat cstat;
ngtcp2_conn_get_conn_stat(conn.get(), &cstat);
log::debug(logcat, "Current unacked bytes in flight: {}, Congestion window: {}",
cstat.bytes_in_flight, cstat.cwnd);
log::debug(logcat, "Updating pkt tx time at {}" ,__LINE__);
ngtcp2_conn_update_pkt_tx_time(conn.get(), ts);
it = strs.erase(it);
continue;
}
log::debug(logcat, "Sending stream data packet");
if (!send_packet(nwrite))
return;
log::debug(logcat, "Updating pkt tx time at {}" ,__LINE__);
ngtcp2_conn_update_pkt_tx_time(conn.get(), ts); // so far always useful
++stream_packets;
//std::advance(it, 1);
//it = strs.erase(it);
if (++stream_packets == max_stream_packets)
{
log::debug(logcat, "Max stream packets ({}) reached", max_stream_packets);
log::debug(logcat, "Updating pkt tx time at {}" ,__LINE__);
ngtcp2_conn_update_pkt_tx_time(conn.get(), ts);
return;
} }
} }