前言
Envoy 是一款面向 Service Mesh 的高性能网络代理服务。它与应用程序并行运行,通过以平台无关的方式提供通用功能来抽象网络。当基础架构中的所有服务流量都通过 Envoy 网格时,通过一致的可观测性,很容易地查看问题区域,调整整体性能。
Envoy也是istio的核心组件之一,以 sidecar 的方式与服务运行在一起,对服务的流量进行拦截转发,具有路由,流量控制等等强大特性。本系列文章,我们将不局限于istio,envoy的官方文档,从源码级别切入,分享Envoy启动、流量劫持、http 请求处理流程的进阶应用实例,深度分析Envoy架构。
本篇将是Envoy请求流程源码解析的第三篇,主要分享Envoy的outbound方向下篇,包含:接收请求、发送请求、接收响应、返回响应。注:本文中所讨论的issue和pr基于21年12月。
outbound方向
接收请求
- client开始向socket写入请求数据
- eventloop在触发read event后,
transport_socket_.doRead
中会循环读取加入read_buffer_
,直到返回EAGAIN
void ConnectionImpl::onReadReady() {
ENVOY_CONN_LOG(trace, "read ready. dispatch_buffered_data={}", *this, dispatch_buffered_data_);
const bool latched_dispatch_buffered_data = dispatch_buffered_data_;
dispatch_buffered_data_ = false;
ASSERT(!connecting_);
// We get here while read disabled in two ways.
// 1) There was a call to setTransportSocketIsReadable(), for example if a raw buffer socket ceded
// due to shouldDrainReadBuffer(). In this case we defer the event until the socket is read
// enabled.
// 2) The consumer of connection data called readDisable(true), and instead of reading from the
// socket we simply need to dispatch already read data.
if (read_disable_count_ != 0) {
// Do not clear transport_wants_read_ when returning early; the early return skips the transport
// socket doRead call.
if (latched_dispatch_buffered_data && filterChainWantsData()) {
onRead(read_buffer_->length());
}
return;
}
// Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that
// the transport socket read resumption happens as requested; onReadReady() returns early without
// reading from the transport if the read buffer is above high watermark at the start of the
// method.
transport_wants_read_ = false;
IoResult result = transport_socket_->doRead(*read_buffer_);
uint64_t new_buffer_size = read_buffer_->length();
updateReadBufferStats(result.bytes_processed_, new_buffer_size);
// If this connection doesn't have half-close semantics, translate end_stream into
// a connection close.
if ((!enable_half_close_ && result.end_stream_read_)) {
result.end_stream_read_ = false;
result.action_ = PostIoAction::Close;
}
read_end_stream_ |= result.end_stream_read_;
if (result.bytes_processed_ != 0 || result.end_stream_read_ ||
(latched_dispatch_buffered_data && read_buffer_->length() > 0)) {
// Skip onRead if no bytes were processed unless we explicitly want to force onRead for
// buffered data. For instance, skip onRead if the connection was closed without producing
// more data.
onRead(new_buffer_size);
}
// The read callback may have already closed the connection.
if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) {
ENVOY_CONN_LOG(debug, "remote close", *this);
closeSocket(ConnectionEvent::RemoteClose);
}
}
- 把buffer传入
Envoy::Http::ConnectionManagerImpl::onData
进行HTTP请求的处理
Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
if (!codec_) {
// Http3 codec should have been instantiated by now.
createCodec(data);
}
bool redispatch;
do {
redispatch = false;
const Status status = codec_->dispatch(data);
if (isBufferFloodError(status) || isInboundFramesWithEmptyPayloadError(status)) {
handleCodecError(status.message());
return Network::FilterStatus::StopIteration;
} else if (isCodecProtocolError(status)) {
stats_.named_.downstream_cx_protocol_error_.inc();
handleCodecError(status.message());
return Network::FilterStatus::StopIteration;
}
ASSERT(status.ok());
- 如果
codec_type
是AUTO(HTTP1,2,3目前还不支持,在计划中)的情况下,会判断请求是否以PRI * HTTP/2为开始
来判断是否http2
Http::ServerConnectionPtr
HttpConnectionManagerConfig::createCodec(Network::Connection& connection,
const Buffer::Instance& data,
Http::ServerConnectionCallbacks& callbacks) {
switch (codec_type_) {
case CodecType::HTTP1: {
return std::make_unique<Http::Http1::ServerConnectionImpl>(
connection, Http::Http1::CodecStats::atomicGet(http1_codec_stats_, context_.scope()),
callbacks, http1_settings_, maxRequestHeadersKb(), maxRequestHeadersCount(),
headersWithUnderscoresAction());
}
case CodecType::HTTP2: {
return std::make_unique<Http::Http2::ServerConnectionImpl>(
connection, callbacks,
Http::Http2::CodecStats::atomicGet(http2_codec_stats_, context_.scope()),
context_.api().randomGenerator(), http2_options_, maxRequestHeadersKb(),
maxRequestHeadersCount(), headersWithUnderscoresAction());
}
case CodecType::HTTP3:
#ifdef ENVOY_ENABLE_QUIC
return std::make_unique<Quic::QuicHttpServerConnectionImpl>(
dynamic_cast<Quic::EnvoyQuicServerSession&>(connection), callbacks,
Http::Http3::CodecStats::atomicGet(http3_codec_stats_, context_.scope()), http3_options_,
maxRequestHeadersKb(), headersWithUnderscoresAction());
#else
// Should be blocked by configuration checking at an earlier point.
NOT_REACHED_GCOVR_EXCL_LINE;
#endif
case CodecType::AUTO:
return Http::ConnectionManagerUtility::autoCreateCodec(
connection, data, callbacks, context_.scope(), context_.api().randomGenerator(),
http1_codec_stats_, http2_codec_stats_, http1_settings_, http2_options_,
maxRequestHeadersKb(), maxRequestHeadersCount(), headersWithUnderscoresAction());
}
NOT_REACHED_GCOVR_EXCL_LINE;
}
std::string ConnectionManagerUtility::determineNextProtocol(Network::Connection& connection,
const Buffer::Instance& data) {
if (!connection.nextProtocol().empty()) {
return connection.nextProtocol();
}
// See if the data we have so far shows the HTTP/2 prefix. We ignore the case where someone sends
// us the first few bytes of the HTTP/2 prefix since in all public cases we use SSL/ALPN. For
// internal cases this should practically never happen.
if (data.startsWith(Http2::CLIENT_MAGIC_PREFIX)) {
return Utility::AlpnNames::get().Http2;
}
return "";
}
const std::string CLIENT_MAGIC_PREFIX = "PRI * HTTP/2";
- 利用
http_parser
进行http解析的callback,ConnectionImpl::settings_
静态初始化了parse各个阶段的callbacks
http_parser_settings ConnectionImpl::settings_{
[](http_parser* parser) -> int {
static_cast<ConnectionImpl*>(parser->data)->onMessageBeginBase();
return 0;
},
[](http_parser* parser, const char* at, size_t length) -> int {
static_cast<ConnectionImpl*>(parser->data)->onUrl(at, length);
return 0;
},
nullptr, // on_status
[](http_parser* parser, const char* at, size_t length) -> int {
static_cast<ConnectionImpl*>(parser->data)->onHeaderField(at, length);
return 0;
},
[](http_parser* parser, const char* at, size_t length) -> int {
static_cast<ConnectionImpl*>(parser->data)->onHeaderValue(at, length);
return 0;
},
[](http_parser* parser) -> int {
return static_cast<ConnectionImpl*>(parser->data)->onHeadersCompleteBase();
},
[](http_parser* parser, const char* at, size_t length) -> int {
static_cast<ConnectionImpl*>(parser->data)->onBody(at, length);
return 0;
},
[](http_parser* parser) -> int {
static_cast<ConnectionImpl*>(parser->data)->onMessageCompleteBase();
return 0;
},
nullptr, // on_chunk_header
nullptr // on_chunk_complete
};
envoy社区有讨论会将协议解析器从http_parser换成llhttp
if (pos != absl::string_view::npos) {
// Include \r or \n
new_data = new_data.substr(0, pos + 1);
ssize_t rc = http_parser_execute(&parser_, &settings_, new_data.data(), new_data.length());
ENVOY_LOG(trace, "http inspector: http_parser parsed {} chars, error code: {}", rc,
HTTP_PARSER_ERRNO(&parser_));
// Errors in parsing HTTP.
if (HTTP_PARSER_ERRNO(&parser_) != HPE_OK && HTTP_PARSER_ERRNO(&parser_) != HPE_PAUSED) {
return ParseState::Error;
}
if (parser_.http_major == 1 && parser_.http_minor == 1) {
protocol_ = Http::Headers::get().ProtocolStrings.Http11String;
} else {
// Set other HTTP protocols to HTTP/1.0
protocol_ = Http::Headers::get().ProtocolStrings.Http10String;
}
return ParseState::Done;
} else {
ssize_t rc = http_parser_execute(&parser_, &settings_, new_data.data(), new_data.length());
ENVOY_LOG(trace, "http inspector: http_parser parsed {} chars, error code: {}", rc,
HTTP_PARSER_ERRNO(&parser_));
// Errors in parsing HTTP.
if (HTTP_PARSER_ERRNO(&parser_) != HPE_OK && HTTP_PARSER_ERRNO(&parser_) != HPE_PAUSED) {
return ParseState::Error;
} else {
return ParseState::Continue;
}
return {http_parser_execute(&parser_, &settings_, slice, len), HTTP_PARSER_ERRNO(&parser_)};
onMessageBeginBase
current_header_map_ = std::make_unique<HeaderMapImpl>();
header_parsing_state_ = HeaderParsingState::Field;
Status ConnectionImpl::onMessageBegin() {
ENVOY_CONN_LOG(trace, "message begin", connection_);
// Make sure that if HTTP/1.0 and HTTP/1.1 requests share a connection Envoy correctly sets
// protocol for each request. Envoy defaults to 1.1 but sets the protocol to 1.0 where applicable
// in onHeadersCompleteBase
protocol_ = Protocol::Http11;
processing_trailers_ = false;
header_parsing_state_ = HeaderParsingState::Field;
allocHeaders(statefulFormatterFromSettings(codec_settings_));
return onMessageBeginBase();
}
Status ServerConnectionImpl::onMessageBeginBase() {
if (!resetStreamCalled()) {
ASSERT(!active_request_.has_value());
active_request_.emplace(*this);
auto& active_request = active_request_.value();
if (resetStreamCalled()) {
return codecClientError("cannot create new streams after calling reset");
}
active_request.request_decoder_ = &callbacks_.newStream(active_request.response_encoder_);
// Check for pipelined request flood as we prepare to accept a new request.
// Parse errors that happen prior to onMessageBegin result in stream termination, it is not
// possible to overflow output buffers with early parse errors.
RETURN_IF_ERROR(doFloodProtectionChecks());
}
return okStatus();
}
- 创建
ActiveStream
, 保存downstream的信息,和对应的route信息 - 对于https,会把TLS握手的时候保存的SNI写入
ActiveStream.requested_server_name_
void setRequestedServerName(absl::string_view requested_server_name) override {
requested_server_name_ = std::string(requested_server_name);
}
void Filter::onServername(absl::string_view name) {
if (!name.empty()) {
config_->stats().sni_found_.inc();
cb_->socket().setRequestedServerName(name);
ENVOY_LOG(debug, "tls:onServerName(), requestedServerName: {}", name);
} else {
config_->stats().sni_not_found_.inc();
}
clienthello_success_ = true;
}
-
onHeaderField
,onHeaderValue
迭代添加header到current_header_map_
中 - 解析完最后一个请求头后会执行
onHeadersComplete
把request中的一些字段(method, path, host )加入headers中
const Http::HeaderValues& header_values = Http::Headers::get();
active_request.response_encoder_.setIsResponseToHeadRequest(parser_->methodName() ==
header_values.MethodValues.Head);
active_request.response_encoder_.setIsResponseToConnectRequest(
parser_->methodName() == header_values.MethodValues.Connect);
RETURN_IF_ERROR(handlePath(*headers, parser_->methodName()));
ASSERT(active_request.request_url_.empty());
headers->setMethod(parser_->methodName());
headers->setScheme("http");
- 回调
onHeadersComplete
, 依次回调onMessageComplete,onMessageCompleteBase,ServerConnectionImpl::onMessageComplete
- 这个请求解码是Envoy上下文的,它会执行Envoy的核心代理逻辑 —— 遍历HTTP过滤器链、进行路由选择
- 此过滤器当中判断请求过载
- 通过route上的cluster name从ThreadLocalClusterManager中查找cluster, 缓存在
cached_cluster_info_
中 - 根据配置构造在route上的filterChain (具体的filter实现是通过
registerFactory
方法注册进去,在createFilterChain
的时候根据名称构造,比如istio-proxy的stats) - 如果对应http connection manager上有trace配置
if (connection_manager_.config_.tracingConfig()) {
traceRequest();
}
- request header中有trace,就创建子span, sampled跟随parent span
- 如果header中没有trace,就创建root span, 并设置sampled
void ConnectionManagerImpl::ActiveStream::refreshCachedRoute(const Router::RouteCallback& cb) {
Router::RouteConstSharedPtr route;
if (request_headers_ != nullptr) {
if (connection_manager_.config_.isRoutable() &&
connection_manager_.config_.scopedRouteConfigProvider() != nullptr) {
// NOTE: re-select scope as well in case the scope key header has been changed by a filter.
snapScopedRouteConfig();
}
if (snapped_route_config_ != nullptr) {
route = snapped_route_config_->route(cb, *request_headers_, filter_manager_.streamInfo(),
stream_id_);
}
}
setRoute(route);
}
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,
bool end_stream) {
ScopeTrackerScopeState scope(this,
connection_manager_.read_callbacks_->connection().dispatcher());
request_headers_ = std::move(headers);
filter_manager_.requestHeadersInitialized();
if (request_header_timer_ != nullptr) {
request_header_timer_->disableTimer();
request_header_timer_.reset();
}
Upstream::HostDescriptionConstSharedPtr upstream_host =
connection_manager_.read_callbacks_->upstreamHost();
if (upstream_host != nullptr) {
Upstream::ClusterRequestResponseSizeStatsOptRef req_resp_stats =
upstream_host->cluster().requestResponseSizeStats();
if (req_resp_stats.has_value()) {
req_resp_stats->get().upstream_rq_headers_size_.recordValue(request_headers_->byteSize());
}
}
// Both saw_connection_close_ and is_head_request_ affect local replies: set
// them as early as possible.
const Protocol protocol = connection_manager_.codec_->protocol();
state_.saw_connection_close_ = HeaderUtility::shouldCloseConnection(protocol, *request_headers_);
// We need to snap snapped_route_config_ here as it's used in mutateRequestHeaders later.
if (connection_manager_.config_.isRoutable()) {
if (connection_manager_.config_.routeConfigProvider() != nullptr) {
snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->config();
} else if (connection_manager_.config_.scopedRouteConfigProvider() != nullptr) {
snapped_scoped_routes_config_ =
connection_manager_.config_.scopedRouteConfigProvider()->config<Router::ScopedConfig>();
snapScopedRouteConfig();
}
} else {
snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->config();
}
ENVOY_STREAM_LOG(debug, "request headers complete (end_stream={}):\n{}", *this, end_stream,
*request_headers_);
// We end the decode here only if the request is header only. If we convert the request to a
// header only, the stream will be marked as done once a subsequent decodeData/decodeTrailers is
// called with end_stream=true.
filter_manager_.maybeEndDecode(end_stream);
// Drop new requests when overloaded as soon as we have decoded the headers.
if (connection_manager_.random_generator_.bernoulli(
connection_manager_.overload_stop_accepting_requests_ref_.value())) {
// In this one special case, do not create the filter chain. If there is a risk of memory
// overload it is more important to avoid unnecessary allocation than to create the filters.
filter_manager_.skipFilterChainCreation();
connection_manager_.stats_.named_.downstream_rq_overload_close_.inc();
sendLocalReply(Grpc::Common::isGrpcRequestHeaders(*request_headers_),
Http::Code::ServiceUnavailable, "envoy overloaded", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().Overload);
return;
}
if (!connection_manager_.config_.proxy100Continue() && request_headers_->Expect() &&
request_headers_->Expect()->value() == Headers::get().ExpectValues._100Continue.c_str()) {
// Note in the case Envoy is handling 100-Continue complexity, it skips the filter chain
// and sends the 100-Continue directly to the encoder.
chargeStats(continueHeader());
response_encoder_->encode100ContinueHeaders(continueHeader());
// Remove the Expect header so it won't be handled again upstream.
request_headers_->removeExpect();
}
connection_manager_.user_agent_.initializeFromHeaders(*request_headers_,
connection_manager_.stats_.prefixStatName(),
connection_manager_.stats_.scope_);
// Make sure we are getting a codec version we support.
if (protocol == Protocol::Http10) {
// Assume this is HTTP/1.0. This is fine for HTTP/0.9 but this code will also affect any
// requests with non-standard version numbers (0.9, 1.3), basically anything which is not
// HTTP/1.1.
//
// The protocol may have shifted in the HTTP/1.0 case so reset it.
filter_manager_.streamInfo().protocol(protocol);
if (!connection_manager_.config_.http1Settings().accept_http_10_) {
// Send "Upgrade Required" if HTTP/1.0 support is not explicitly configured on.
sendLocalReply(false, Code::UpgradeRequired, "", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().LowVersion);
return;
}
if (!request_headers_->Host() &&
!connection_manager_.config_.http1Settings().default_host_for_http_10_.empty()) {
// Add a default host if configured to do so.
request_headers_->setHost(
connection_manager_.config_.http1Settings().default_host_for_http_10_);
}
}
if (!request_headers_->Host()) {
// Require host header. For HTTP/1.1 Host has already been translated to :authority.
sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::BadRequest, "",
nullptr, absl::nullopt, StreamInfo::ResponseCodeDetails::get().MissingHost);
return;
}
// Verify header sanity checks which should have been performed by the codec.
ASSERT(HeaderUtility::requestHeadersValid(*request_headers_).has_value() == false);
// Check for the existence of the :path header for non-CONNECT requests, or present-but-empty
// :path header for CONNECT requests. We expect the codec to have broken the path into pieces if
// applicable. NOTE: Currently the HTTP/1.1 codec only does this when the allow_absolute_url flag
// is enabled on the HCM.
if ((!HeaderUtility::isConnect(*request_headers_) || request_headers_->Path()) &&
request_headers_->getPathValue().empty()) {
sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::NotFound, "", nullptr,
absl::nullopt, StreamInfo::ResponseCodeDetails::get().MissingPath);
return;
}
// Currently we only support relative paths at the application layer.
if (!request_headers_->getPathValue().empty() && request_headers_->getPathValue()[0] != '/') {
connection_manager_.stats_.named_.downstream_rq_non_relative_path_.inc();
sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::NotFound, "", nullptr,
absl::nullopt, StreamInfo::ResponseCodeDetails::get().AbsolutePath);
return;
}
// Path sanitization should happen before any path access other than the above sanity check.
const auto action =
ConnectionManagerUtility::maybeNormalizePath(*request_headers_, connection_manager_.config_);
// gRPC requests are rejected if Envoy is configured to redirect post-normalization. This is
// because gRPC clients do not support redirect.
if (action == ConnectionManagerUtility::NormalizePathAction::Reject ||
(action == ConnectionManagerUtility::NormalizePathAction::Redirect &&
Grpc::Common::hasGrpcContentType(*request_headers_))) {
connection_manager_.stats_.named_.downstream_rq_failed_path_normalization_.inc();
sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::BadRequest, "",
nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
return;
} else if (action == ConnectionManagerUtility::NormalizePathAction::Redirect) {
connection_manager_.stats_.named_.downstream_rq_redirected_with_normalized_path_.inc();
sendLocalReply(
false, Code::TemporaryRedirect, "",
[new_path = request_headers_->Path()->value().getStringView()](
Http::ResponseHeaderMap& response_headers) -> void {
response_headers.addReferenceKey(Http::Headers::get().Location, new_path);
},
absl::nullopt, StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
return;
}
ASSERT(action == ConnectionManagerUtility::NormalizePathAction::Continue);
ConnectionManagerUtility::maybeNormalizeHost(*request_headers_, connection_manager_.config_,
localPort());
if (!state_.is_internally_created_) { // Only sanitize headers on first pass.
// Modify the downstream remote address depending on configuration and headers.
filter_manager_.setDownstreamRemoteAddress(ConnectionManagerUtility::mutateRequestHeaders(
*request_headers_, connection_manager_.read_callbacks_->connection(),
connection_manager_.config_, *snapped_route_config_, connection_manager_.local_info_));
}
ASSERT(filter_manager_.streamInfo().downstreamAddressProvider().remoteAddress() != nullptr);
ASSERT(!cached_route_);
refreshCachedRoute();
if (!state_.is_internally_created_) { // Only mutate tracing headers on first pass.
filter_manager_.streamInfo().setTraceReason(
ConnectionManagerUtility::mutateTracingRequestHeader(
*request_headers_, connection_manager_.runtime_, connection_manager_.config_,
cached_route_.value().get()));
}
filter_manager_.streamInfo().setRequestHeaders(*request_headers_);
const bool upgrade_rejected = filter_manager_.createFilterChain() == false;
// TODO if there are no filters when starting a filter iteration, the connection manager
// should return 404. The current returns no response if there is no router filter.
if (hasCachedRoute()) {
// Do not allow upgrades if the route does not support it.
if (upgrade_rejected) {
// While downstream servers should not send upgrade payload without the upgrade being
// accepted, err on the side of caution and refuse to process any further requests on this
// connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload
// contains a smuggled HTTP request.
state_.saw_connection_close_ = true;
connection_manager_.stats_.named_.downstream_rq_ws_on_non_ws_route_.inc();
sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::Forbidden, "",
nullptr, absl::nullopt, StreamInfo::ResponseCodeDetails::get().UpgradeFailed);
return;
}
// Allow non websocket requests to go through websocket enabled routes.
}
if (hasCachedRoute()) {
const Router::RouteEntry* route_entry = cached_route_.value()->routeEntry();
if (route_entry != nullptr && route_entry->idleTimeout()) {
// TODO(mattklein123): Technically if the cached route changes, we should also see if the
// route idle timeout has changed and update the value.
idle_timeout_ms_ = route_entry->idleTimeout().value();
response_encoder_->getStream().setFlushTimeout(idle_timeout_ms_);
if (idle_timeout_ms_.count()) {
// If we have a route-level idle timeout but no global stream idle timeout, create a timer.
if (stream_idle_timer_ == nullptr) {
stream_idle_timer_ =
connection_manager_.read_callbacks_->connection().dispatcher().createScaledTimer(
Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout,
[this]() -> void { onIdleTimeout(); });
}
} else if (stream_idle_timer_ != nullptr) {
// If we had a global stream idle timeout but the route-level idle timeout is set to zero
// (to override), we disable the idle timer.
stream_idle_timer_->disableTimer();
stream_idle_timer_ = nullptr;
}
}
}
// Check if tracing is enabled at all.
if (connection_manager_.config_.tracingConfig()) {
traceRequest();
}
filter_manager_.decodeHeaders(*request_headers_, end_stream);
// Reset it here for both global and overridden cases.
resetIdleTimer();
}
void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHeaderMap& headers,
bool end_stream) {
// Headers filter iteration should always start with the next filter if available.
std::list<ActiveStreamDecoderFilterPtr>::iterator entry =
commonDecodePrefix(filter, FilterIterationStartState::AlwaysStartFromNext);
std::list<ActiveStreamDecoderFilterPtr>::iterator continue_data_entry = decoder_filters_.end();
for (; entry != decoder_filters_.end(); entry++) {
(*entry)->maybeEvaluateMatchTreeWithNewData(
[&](auto& matching_data) { matching_data.onRequestHeaders(headers); });
if ((*entry)->skipFilter()) {
continue;
}
- 根据http connection manager上配置的filters (
envoy.cors
,envoy.fault
,envoy.router
),一个个执行decodeHeaders
这里主要写一下和envoy.router
envoy.router
- 在构造
RouteMatcher
的时候会遍历virtual_hosts
下的domains,并根据通配符的位置和domain的长度分为4个map<domain_len, std::unordered_map<domain, virtualHost>, std::greater<int64_t>>
- default_virtual_host_`domain就是一个通配符(只允许存在一个)
-
wildcard_virtual_host_suffixes_
domain中通配符在开头 -
wildcard_virtual_host_prefixes_
domain中通配符在结尾 -
virtual_hosts_
不包含通配
RouteMatcher::RouteMatcher(const envoy::config::route::v3::RouteConfiguration& route_config,
const ConfigImpl& global_route_config,
Server::Configuration::ServerFactoryContext& factory_context,
ProtobufMessage::ValidationVisitor& validator, bool validate_clusters)
: vhost_scope_(factory_context.scope().scopeFromStatName(
factory_context.routerContext().virtualClusterStatNames().vhost_)) {
absl::optional<Upstream::ClusterManager::ClusterInfoMaps> validation_clusters;
if (validate_clusters) {
validation_clusters = factory_context.clusterManager().clusters();
}
for (const auto& virtual_host_config : route_config.virtual_hosts()) {
VirtualHostSharedPtr virtual_host(new VirtualHostImpl(virtual_host_config, global_route_config,
factory_context, *vhost_scope_, validator,
validation_clusters));
for (const std::string& domain_name : virtual_host_config.domains()) {
const std::string domain = Http::LowerCaseString(domain_name).get();
bool duplicate_found = false;
if ("*" == domain) {
if (default_virtual_host_) {
throw EnvoyException(fmt::format("Only a single wildcard domain is permitted in route {}",
route_config.name()));
}
default_virtual_host_ = virtual_host;
} else if (!domain.empty() && '*' == domain[0]) {
duplicate_found = !wildcard_virtual_host_suffixes_[domain.size() - 1]
.emplace(domain.substr(1), virtual_host)
.second;
} else if (!domain.empty() && '*' == domain[domain.size() - 1]) {
duplicate_found = !wildcard_virtual_host_prefixes_[domain.size() - 1]
.emplace(domain.substr(0, domain.size() - 1), virtual_host)
.second;
} else {
duplicate_found = !virtual_hosts_.emplace(domain, virtual_host).second;
}
if (duplicate_found) {
throw EnvoyException(fmt::format("Only unique values for domains are permitted. Duplicate "
"entry of domain {} in route {}",
domain, route_config.name()));
}
}
}
}
- 按照
virtual_hosts_
=>wildcard_virtual_host_suffixes_
=>wildcard_virtual_host_prefixes_
=>default_virtual_host_
的顺序查找
同时按照map的迭代顺序(domain len降序)查找最先除去通配符后能匹配到的virtualhost,如果没有直接返回 404
const VirtualHostImpl* RouteMatcher::findVirtualHost(const Http::RequestHeaderMap& headers) const {
// Fast path the case where we only have a default virtual host.
if (virtual_hosts_.empty() && wildcard_virtual_host_suffixes_.empty() &&
wildcard_virtual_host_prefixes_.empty()) {
return default_virtual_host_.get();
}
// There may be no authority in early reply paths in the HTTP connection manager.
if (headers.Host() == nullptr) {
return nullptr;
}
// TODO (@rshriram) Match Origin header in WebSocket
// request with VHost, using wildcard match
// Lower-case the value of the host header, as hostnames are case insensitive.
const std::string host = absl::AsciiStrToLower(headers.getHostValue());
const auto& iter = virtual_hosts_.find(host);
if (iter != virtual_hosts_.end()) {
return iter->second.get();
}
if (!wildcard_virtual_host_suffixes_.empty()) {
const VirtualHostImpl* vhost = findWildcardVirtualHost(
host, wildcard_virtual_host_suffixes_,
[](const std::string& h, int l) -> std::string { return h.substr(h.size() - l); });
if (vhost != nullptr) {
return vhost;
}
}
if (!wildcard_virtual_host_prefixes_.empty()) {
const VirtualHostImpl* vhost = findWildcardVirtualHost(
host, wildcard_virtual_host_prefixes_,
[](const std::string& h, int l) -> std::string { return h.substr(0, l); });
if (vhost != nullptr) {
return vhost;
}
}
return default_virtual_host_.get();
}
- 在一个virtualhost上查找对应route和cluster
- 在通过domain匹配到virtualhost,会在那个virtualhost上匹配查找cluster,如果没匹配上,会直接返回404
- match可以根据配置分为
prefix
,regex
,path
三种route进行匹配 - 如果存在
weighted_clusters
,会根据stream_id
, 和clusters的weight进行分发,stream_id
本身是每个请求独立随机生成,所以weighted_clusters
的权重分发可以视为随机分发
- 没有route能匹配请求,返回 404
no cluster match for URL
- 有配置
directResponseEntry
,直接返回 - route上的clustername在clustermanager上找不到对应cluster,返回配置的
clusterNotFoundResponseCode
- 当前处于
maintenanceMode (和主动健康检查相关)
// See if we are supposed to immediately kill some percentage of this cluster's traffic.
if (cluster_->maintenanceMode()) {
callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true);
callbacks_->sendLocalReply(
Http::Code::ServiceUnavailable, "maintenance mode",
[modify_headers, this](Http::ResponseHeaderMap& headers) {
if (!config_.suppress_envoy_headers_) {
headers.addReference(Http::Headers::get().EnvoyOverloaded,
Http::Headers::get().EnvoyOverloadedValues.True);
}
// Note: append_cluster_info does not respect suppress_envoy_headers.
modify_headers(headers);
},
absl::nullopt, StreamInfo::ResponseCodeDetails::get().MaintenanceMode);
cluster_->stats().upstream_rq_maintenance_mode_.inc();
return Http::FilterHeadersStatus::StopIteration;
}
- 调用createConnPool获取upstream conn pool
std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);
if (!generic_conn_pool) {
sendNoHealthyUpstreamResponse();
return Http::FilterHeadersStatus::StopIteration;
}
- 根据 cluster上的
features
配置和USE_DOWNSTREAM_PROTOCOL
来确定使用http1还是http2协议向上游发送请求
std::vector<Http::Protocol>
ClusterInfoImpl::upstreamHttpProtocol(absl::optional<Http::Protocol> downstream_protocol) const {
if (downstream_protocol.has_value() &&
features_ & Upstream::ClusterInfo::Features::USE_DOWNSTREAM_PROTOCOL) {
return {downstream_protocol.value()};
} else if (features_ & Upstream::ClusterInfo::Features::USE_ALPN) {
ASSERT(!(features_ & Upstream::ClusterInfo::Features::HTTP3));
return {Http::Protocol::Http2, Http::Protocol::Http11};
} else {
if (features_ & Upstream::ClusterInfo::Features::HTTP3) {
return {Http::Protocol::Http3};
}
return {(features_ & Upstream::ClusterInfo::Features::HTTP2) ? Http::Protocol::Http2
: Http::Protocol::Http11};
}
}
- 在ThreadLocalClusterManager上根据cluster name查询cluster
Http::ConnectionPool::Instance*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool(
ResourcePriority priority, absl::optional<Http::Protocol> downstream_protocol,
LoadBalancerContext* context, bool peek) {
HostConstSharedPtr host = (peek ? lb_->peekAnotherHost(context) : lb_->chooseHost(context));
if (!host) {
if (!peek) {
ENVOY_LOG(debug, "no healthy host for HTTP connection pool");
cluster_info_->stats().upstream_cx_none_healthy_.inc();
}
return nullptr;
}
- 根据loadbalancer算法挑选节点(此处worker之间的负载均衡根据不同的负载均衡算法有的是独立的,比如round robin,只有同一个Worker上的才是严格的顺序)
HostConstSharedPtr LoadBalancerBase::chooseHost(LoadBalancerContext* context) {
HostConstSharedPtr host;
const size_t max_attempts = context ? context->hostSelectionRetryCount() + 1 : 1;
for (size_t i = 0; i < max_attempts; ++i) {
host = chooseHostOnce(context);
// If host selection failed or the host is accepted by the filter, return.
// Otherwise, try again.
// Note: in the future we might want to allow retrying when chooseHostOnce returns nullptr.
if (!host || !context || !context->shouldSelectAnotherHost(*host)) {
return host;
}
}
// If we didn't find anything, return the last host.
return host;
}
- 根据节点和协议拿到连接池 (连接池由ThreadLocalClusterManager管理,各个Worker不共享)
- 没有做直接503,中止解析链
std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);
if (!generic_conn_pool) {
sendNoHealthyUpstreamResponse();
return Http::FilterHeadersStatus::StopIteration;
}
- 根据配置(timeout, perTryTimeout)确定本次请求的timeout
timeout_ = FilterUtility::finalTimeout(*route_entry_, headers, !config_.suppress_envoy_headers_,
grpc_request_, hedging_params_.hedge_on_per_try_timeout_,
config_.respect_expected_rq_timeout_);
imeoutData timeout;
if (!route.usingNewTimeouts()) {
if (grpc_request && route.maxGrpcTimeout()) {
const std::chrono::milliseconds max_grpc_timeout = route.maxGrpcTimeout().value();
auto header_timeout = Grpc::Common::getGrpcTimeout(request_headers);
std::chrono::milliseconds grpc_timeout =
header_timeout ? header_timeout.value() : std::chrono::milliseconds(0);
if (route.grpcTimeoutOffset()) {
// We only apply the offset if it won't result in grpc_timeout hitting 0 or below, as
// setting it to 0 means infinity and a negative timeout makes no sense.
const auto offset = *route.grpcTimeoutOffset();
if (offset < grpc_timeout) {
grpc_timeout -= offset;
}
}
// Cap gRPC timeout to the configured maximum considering that 0 means infinity.
if (max_grpc_timeout != std::chrono::milliseconds(0) &&
(grpc_timeout == std::chrono::milliseconds(0) || grpc_timeout > max_grpc_timeout)) {
grpc_timeout = max_grpc_timeout;
}
timeout.global_timeout_ = grpc_timeout;
} else {
timeout.global_timeout_ = route.timeout();
}
}
timeout.per_try_timeout_ = route.retryPolicy().perTryTimeout();
- 把之前生成的trace写入request header
- 对request做一些最终的修改,
headers_to_remove``headers_to_add``host_rewrite``rewritePathHeader(路由的配置)
route_entry_->finalizeRequestHeaders(headers, callbacks_->streamInfo(),
!config_.suppress_envoy_headers_);
- 构造 retry和shadowing的对象
retry_state_ = createRetryState(
route_entry_->retryPolicy(), headers, *cluster_, request_vcluster_, config_.runtime_,
config_.random_, callbacks_->dispatcher(), config_.timeSource(), route_entry_->priority());
// Determine which shadow policies to use. It's possible that we don't do any shadowing due to
// runtime keys.
for (const auto& shadow_policy : route_entry_->shadowPolicies()) {
const auto& policy_ref = *shadow_policy;
if (FilterUtility::shouldShadow(policy_ref, config_.runtime_, callbacks_->streamId())) {
active_shadow_policies_.push_back(std::cref(policy_ref));
}
}
发送请求
发送请求部分也是在envoy.router
中的逻辑
- 查看当前conn pool是否有空闲client
if (!ready_clients_.empty()) {
ActiveClient& client = *ready_clients_.front();
ENVOY_CONN_LOG(debug, "using existing connection", client);
attachStreamToClient(client, context);
// Even if there's a ready client, we may want to preconnect to handle the next incoming stream.
tryCreateNewConnections();
如果存在空闲连接文章来源:https://www.toymoban.com/news/detail-737412.html
- 根据downstream request和tracing等配置构造发往upstream的请求buffer
- 把buffer一次性移入
write_buffer_
, 立即触发Write Event -
ConnectionImpl::onWriteReady
随后会被触发 - 把
write_ buffer_
的内容写入socket发送出去
如果不存在空闲连接文章来源地址https://www.toymoban.com/news/detail-737412.html
if (host_->cluster().resourceManager(priority_).pendingRequests().canCreate()) {
ConnectionPool::Cancellable* pending = newPendingStream(context);
ENVOY_LOG(debug, "trying to create new connection");
ENVOY_LOG(trace, fmt::format("{}", *this));
auto old_capacity = connecting_stream_capacity_;
// This must come after newPendingStream() because this function uses the
// length of pending_streams_ to determine if a new connection is needed.
const ConnectionResult result = tryCreateNewConnections();
// If there is not enough connecting capacity, the only reason to not
// increase capacity is if the connection limits are exceeded.
ENVOY_BUG(pending_streams_.size() <= connecting_stream_capacity_ ||
connecting_stream_capacity_ > old_capacity ||
result == ConnectionResult::NoConnectionRateLimited,
fmt::format("Failed to create expected connection: {}", *this));
return pending;
} else {
ENVOY_LOG(debug, "max pending streams overflow");
onPoolFailure(nullptr, absl::string_view(), ConnectionPool::PoolFailureReason::Overflow,
context);
host_->cluster().stats().upstream_rq_pending_overflow_.inc();
return nullptr;
}
- 根据
max_pending_requests
和max_connections
判断是否可以创建新的连接(此处的指标为worker间共享),但是每个线程会向上游最少建立一条连接,也就是极端策略可能需要和工作线程数相关 - 根据配置设置新连接的socket options, 使用
dispatcher.createClientConnection
创建连接上游的连接,并绑定到eventloop - 新建
PendingRequest
并加到pending_requests_
头部 - 当连接成功建立的时候,会触发
ConnectionImpl::onFileEvent
- 在
onConnected
的回调中- 停止
connect_timer_
- 复用存在空闲连接时的逻辑,发送请求
- 停止
- 在
onRequestComplete
里调用maybeDoShadowing
进行流量复制
ASSERT(!request->headers().getHostValue().empty());
// Switch authority to add a shadow postfix. This allows upstream logging to make more sense.
auto parts = StringUtil::splitToken(request->headers().getHostValue(), ":");
ASSERT(!parts.empty() && parts.size() <= 2);
request->headers().setHost(parts.size() == 2
? absl::StrJoin(parts, "-shadow:")
: absl::StrCat(request->headers().getHostValue(), "-shadow"));
// This is basically fire and forget. We don't handle cancelling.
thread_local_cluster->httpAsyncClient().send(std::move(request), *this, options);
- shadowing流量并不会返回错误
- shadowing 流量为asynclient发送,不会阻塞downstream,timeout也为
global_timeout_
- shadowing 会修改request header里的host 和 authority 添加
-shadow
后缀
- 根据
global_timeout_
启动响应超时的定时器
接收响应
- eventloop 触发
ClientConnectionImpl.ConnectionImpl
上的onFileEvent
的read ready事件 - 经过http_parser execute后触发
onHeadersComplete
后执行到UpstreamRequest::decodeHeaders
-
upstream_request_->upstream_host_->outlierDelector().putHttpResponseCode
写入status code,更新外部检测的状态
external_origin_sr_monitor_.incTotalReqCounter();
if (Http::CodeUtility::is5xx(response_code)) {
std::shared_ptr<DetectorImpl> detector = detector_.lock();
if (!detector) {
// It's possible for the cluster/detector to go away while we still have a host in use.
return;
}
if (Http::CodeUtility::isGatewayError(response_code)) {
if (++consecutive_gateway_failure_ ==
detector->runtime().snapshot().getInteger(
ConsecutiveGatewayFailureRuntime, detector->config().consecutiveGatewayFailure())) {
detector->onConsecutiveGatewayFailure(host_.lock());
}
} else {
consecutive_gateway_failure_ = 0;
}
if (++consecutive_5xx_ == detector->runtime().snapshot().getInteger(
Consecutive5xxRuntime, detector->config().consecutive5xx())) {
detector->onConsecutive5xx(host_.lock());
}
} else {
external_origin_sr_monitor_.incSuccessReqCounter();
consecutive_5xx_ = 0;
consecutive_gateway_failure_ = 0;
}
if (grpc_status.has_value()) {
upstream_request.upstreamHost()->outlierDetector().putHttpResponseCode(grpc_to_http_status);
} else {
upstream_request.upstreamHost()->outlierDetector().putHttpResponseCode(response_code);
}
- 根据返回结果、配置和
retries_remaining_
判断是否应该retry
- 根据
internal_redirect_action
的配置和response来确定是否需要redirect到新的host
InternalRedirectPolicyImpl RouteEntryImplBase::buildInternalRedirectPolicy(
const envoy::config::route::v3::RouteAction& route_config,
ProtobufMessage::ValidationVisitor& validator, absl::string_view current_route_name) const {
if (route_config.has_internal_redirect_policy()) {
return InternalRedirectPolicyImpl(route_config.internal_redirect_policy(), validator,
current_route_name);
}
envoy::config::route::v3::InternalRedirectPolicy policy_config;
switch (route_config.internal_redirect_action()) {
case envoy::config::route::v3::RouteAction::HANDLE_INTERNAL_REDIRECT:
break;
case envoy::config::route::v3::RouteAction::PASS_THROUGH_INTERNAL_REDIRECT:
FALLTHRU;
default:
return InternalRedirectPolicyImpl();
}
if (route_config.has_max_internal_redirects()) {
*policy_config.mutable_max_internal_redirects() = route_config.max_internal_redirects();
}
return InternalRedirectPolicyImpl(policy_config, validator, current_route_name);
}
if (num_internal_redirect.value() >= policy.maxInternalRedirects()) {
config_.stats_.passthrough_internal_redirect_too_many_redirects_.inc();
return false;
}
返回响应
- 停止
request_timer
, 重置idle_timer
- 和向upstream发送请求一样的逻辑,发送响应给downstream
阅读源码总结
- envoy当中各种继承,模板,组合使用的非常多,子类初始化时需要关注父类的构造函数做了什么
- 可以根据请求日志的信息,通过日志的顺序再到代码走一遍大体过程
- 善用各种调试工具,例如抓包,gdb,放开指标等,个人的经验 百分之90的问题日志+抓包+部分源码的阅读可以解决
到了这里,关于Istio实战(十一)-Envoy 请求解析(下)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!