proxy内部的运行逻辑是什么

81次阅读
没有评论

共计 9729 个字符,预计需要花费 25 分钟才能阅读完成。

这篇文章主要讲解了“proxy 内部的运行逻辑是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着丸趣 TV 小编的思路慢慢深入,一起来研究和学习“proxy 内部的运行逻辑是什么”吧!

linkerd2 介绍

Linkerd 由控制平面和数据平面组成:

控制平面是在所属的 Kubernetes 命名空间(linkerd 默认情况下)中运行的一组服务,这些服务可以完成汇聚遥测数据,提供面向用户的 API,并向数据平面代理提供控制数据等,它们共同驱动数据平面。

数据平面用 Rust 编写的轻量级代理,该代理安装在服务的每个 pod 中,并成为数据平面的一部分,它接收 Pod 的所有接入流量,并通过 initContainer 配置 iptables 正确转发流量的拦截所有传出流量,因为它是附加工具,并且拦截服务的所有传入和传出流量,所以不需要更改代码,甚至可以将其添加到正在运行的服务中。

借用官方的图:

proxy 由 rust 开发完成,其内部的异步运行时采用了 Tokio 框架,服务组件用到了 tower。

本文主要关注 proxy 与 destination 组件交互相关的整体逻辑,分析 proxy 内部的运行逻辑。

流程分析初始化

proxy 启动后:

app::init 初始化配置

app::Main::new 创建主逻辑 main,

main.run_until 内新加一任务 ProxyParts::build_proxy_task。

在 ProxyParts::build_proxy_task 中会进行一系列的初始化工作,此处只关注 dst_svc,其创建代码为:

 dst_svc = svc::stack(connect::svc(keepalive))
 .push(tls::client::layer(local_identity.clone()))
 .push_timeout(config.control_connect_timeout)
 .push(control::client::layer())
 .push(control::resolve::layer(dns_resolver.clone()))
 .push(reconnect::layer({ let backoff = config.control_backoff.clone();
 move |_| Ok(backoff.stream())
 }))
 .push(http_metrics::layer:: _, classify::Response ( ctl_http_metrics.clone(),
 ))
 .push(proxy::grpc::req_body_as_payload::layer().per_make())
 .push(control::add_origin::layer())
 .push_buffer_pending(
 config.destination_buffer_capacity,
 config.control_dispatch_timeout,
 )
 .into_inner()
 .make(config.destination_addr.clone())

dst_svc 一共有 2 处引用,一是 crate::resolve::Resolver 的创建会涉及;另一个就是 ProfilesClient 的创建。

Resolver

api_resolve::Resolve::new(dst_svc.clone()) 创建 resolver 对象

调用 outbound::resolve 创建 map_endpoint::Resolve 类型对象,并当做参数 resolve 传入 outbound::spawn 函数开启出口线程

在 outbound::spawn 中,resolve 被用于创建负载均衡控制层,并用于后续路由控制:

let balancer_layer = svc::layers()
 .push_spawn_ready()
 .push(discover::Layer::new(
 DISCOVER_UPDATE_BUFFER_CAPACITY,
 resolve,
 ))
 .push(balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY));

在 discover::Layer::layer 中:

let from_resolve = FromResolve::new(self.resolve.clone());
let make_discover = MakeEndpoint::new(make_endpoint, from_resolve);
Buffer::new(self.capacity, make_discover)

Profiles

在 ProfilesClient::new 中调用 api::client::Destination::new(dst_svc) 创建 grpc 的 client 端并存于成员变量 service

接着 profiles_client 对象会被用于 inbound 和 outbound 的创建(省略无关代码):

 let dst_stack = svc::stack(...)...
 .push(profiles::router::layer(
 profile_suffixes,
 profiles_client,
 dst_route_stack,
 ))
 ...

其中 profiles::router::layer 会创建一个 Layer 对象,并将 profiles_client 赋予 get_routes 成员。然后在 service 方法中,会调到 Layer::layer 方法,里面会创建一个 MakeSvc 对象,其 get_routes 成员的值即为 profiles_client。

运行

新的连接过来时,从 listen 拿到连接对象后,会交给 linkerd_proxy::transport::tls::accept::AcceptTls 的 call,然后是 linkerd2_proxy::proxy::server::Server 的 call,并最终分别调用 linkerd2_proxy_http::balance::MakeSvc::call 和 linkerd2_proxy_http::profiles::router::MakeSvc::call 方法。

balance

在 linkerd2_proxy_http::balance::MakeSvc::call 中:

调用 inner.call(target),此处的 inner 即是前面 Buffer::new 的结果。

生成一个新的 linkerd2_proxy_http::balance::MakeSvc 对象,当做 Future 返回

先看 inner.call。它内部经过层层调用,依次触发 Buffer、MakeEndpoint、FromResolve 等结构的 call 方法,最终会触发最开始创建的 resolve.resolve(target),其内部调用 api_resolve::Resolve::call。

在 api_resolve::Resolve::call 中:

 fn call(mut self, target: T) -  Self::Future { let path = target.to_string();
 trace!(resolve {:?} , path);
 self.service
 // GRPC 请求,获取 k8s 的 endpoint
 .get(grpc::Request::new(api::GetDestination {
 path,
 scheme: self.scheme.clone(),
 context_token: self.context_token.clone(),
 }))
 .map(|rsp| { debug!(metadata = ?rsp.metadata());
 //  拿到结果 stream
 Resolution { inner: rsp.into_inner(),
 }
 })
 }

将返回的 Resolution 再次放入 MakeSvc 中,然后看其 poll:

 fn poll(mut self) -  Poll Self::Item, Self::Error  {
 //  这个 poll 会依次调用:
 // linkerd2_proxy_api_resolve::resolve::Resolution::poll
 // linkerd2_proxy_discover::from_resolve::DiscoverFuture::poll
 // linkerd2_proxy_discover::make_endpoint::DiscoverFuture::poll
 //  最终获得 Poll Change SocketAddr, Endpoint  
 let discover = try_ready!(self.inner.poll());
 let instrument = PendingUntilFirstData::default();
 let loaded = PeakEwmaDiscover::new(discover, self.default_rtt, self.decay, instrument);
 let balance = Balance::new(loaded, self.rng.clone());
 Ok(Async::Ready(balance))
 }

最终返回 service Balance。

当具体请求过来后,先会判断 Balance::poll_ready:

 fn poll_ready(mut self) -  Poll (), Self::Error  {
 //  获取 Update Endpoint 
 //  将 Remove 的从 self.ready_services 中删掉
 //  将 Insert 的构造 UnreadyService 结构加到 self.unready_services
 self.poll_discover()?;
 //  对 UnreadyService,调用其 poll,内部会调用到 svc 的 poll_ready 判断 endpoint 是否可用
 //  可用时,将其加入 self.ready_services
 self.poll_unready();
 
 loop { if let Some(index) = self.next_ready_index {
 //  找到对应的 endpoint,可用则返回
 if let Ok(Async::Ready(())) = self.poll_ready_index_or_evict(index) { return Ok(Async::Ready(()));
 }
 }
 //  选择负载比较低的 endpoint
 self.next_ready_index = self.p2c_next_ready_index();
 if self.next_ready_index.is_none() {
 // 
 return Ok(Async::NotReady);
 }
 }
 }

就绪后,对请求 req 调用 call:

 fn call(mut self, request: Req) -  Self::Future {
 //  找到下一个可用的 svc,并将其从 ready_services 中删除
 let index = self.next_ready_index.take().expect( not ready 
 let (key, mut svc) = self
 .ready_services
 .swap_remove_index(index)
 .expect( invalid ready index 
 //  将请求转过去
 let fut = svc.call(request);
 //  加到 unready
 self.push_unready(key, svc);
 fut.map_err(Into::into)
 }

profiles

在 linkerd2_proxy_http::profiles::router::MakeSvc::call 中:

 // Initiate a stream to get route and dst_override updates for this
 // destination.
 let route_stream = match target.get_destination() { Some(ref dst) =  { if self.suffixes.iter().any(|s| s.contains(dst.name())) { debug!( fetching routes for {:?} , dst);
 self.get_routes.get_routes(dst)
 } else { debug!( skipping route discovery for dst={:?} , dst);
 None
 }
 }
 None =  {
 debug!( no destination for routes 
 None
 }
 };

经过若干判断后,会调用 ProfilesClient::get_routes 并将结果存于 route_stream。

进入 get_routes:

 fn get_routes(self, dst:  NameAddr) -  Option Self::Stream  {
 //  创建通道
 let (tx, rx) = mpsc::channel(1);
 // This oneshot allows the daemon to be notified when the Self::Stream
 // is dropped.
 let (hangup_tx, hangup_rx) = oneshot::channel();
 //  创建 Daemon 对象(Future 任务) let daemon = Daemon {
 tx,
 hangup: hangup_rx,
 dst: format!({} , dst),
 state: State::Disconnected,
 service: self.service.clone(),
 backoff: self.backoff,
 context_token: self.context_token.clone(),
 };
 //  调用 Daemon::poll
 let spawn = DefaultExecutor::current().spawn(Box::new(daemon.map_err(|_| ())));
 //  将通道接收端传出
 spawn.ok().map(|_| Rx {
 rx,
 _hangup: hangup_tx,
 })
 }

接着看 Daemon::poll:

 fn poll(mut self) -  Poll Self::Item, Self::Error  {
 loop {
 //  遍历 state 成员状态
 self.state = match self.state {
 //  未连接时
 State::Disconnected =  { match self.service.poll_ready() { Ok(Async::NotReady) =  return Ok(Async::NotReady),
 Ok(Async::Ready(())) =  {}
 Err(err) =  {
 error!(  profile service unexpected error (dst = {}): {:?} ,
 self.dst, err,
 );
 return Ok(Async::Ready(()));
 }
 };
 //  构造 grpc 请求
 let req = api::GetDestination { scheme:  k8s .to_owned(),
 path: self.dst.clone(),
 context_token: self.context_token.clone(),
 };
 debug!(getting profile: {:?} , req);
 //  获取请求任务
 let rspf = self.service.get_profile(grpc::Request::new(req));
 State::Waiting(rspf)
 }
 //  正在请求时,从请求中获取回复
 State::Waiting(ref mut f) =  match f.poll() { Ok(Async::NotReady) =  return Ok(Async::NotReady),
 //  正常回复
 Ok(Async::Ready(rsp)) =  {
 trace!( response received 
 //  流式回复
 State::Streaming(rsp.into_inner())
 }
 Err(e) =  { warn!( error fetching profile for {}: {:?} , self.dst, e);
 State::Backoff(Delay::new(clock::now() + self.backoff))
 }
 },
 //  接收回复
 State::Streaming(ref mut s) =  {
 //  处理回复流
 //  注意此处,参数 1 是 get_profile 请求的回复流, //  参数 2 是之前创建的通道发送端
 match Self::proxy_stream(s,  mut self.tx,  mut self.hangup) { Async::NotReady =  return Ok(Async::NotReady),
 Async::Ready(StreamState::SendLost) =  return Ok(().into()),
 Async::Ready(StreamState::RecvDone) =  { State::Backoff(Delay::new(clock::now() + self.backoff))
 }
 }
 }
 //  异常,结束请求
 State::Backoff(ref mut f) =  match f.poll() { Ok(Async::NotReady) =  return Ok(Async::NotReady),
 Err(_) | Ok(Async::Ready(())) =  State::Disconnected,
 },
 };
 }
 }

接着 proxy_stream:

 fn proxy_stream(
 rx:  mut grpc::Streaming api::DestinationProfile, T::ResponseBody ,
 tx:  mut mpsc::Sender profiles::Routes ,
 hangup:  mut oneshot::Receiver Never ,
 ) -  Async StreamState  {
 loop {
 //  发送端是否就绪
 match tx.poll_ready() { Ok(Async::NotReady) =  return Async::NotReady,
 Ok(Async::Ready(())) =  {}
 Err(_) =  return StreamState::SendLost.into(),
 }
 //  从 grpc stream 中取得一条数据
 match rx.poll() { Ok(Async::NotReady) =  match hangup.poll() { Ok(Async::Ready(never)) =  match never {}, // unreachable!
 Ok(Async::NotReady) =  {
 // We are now scheduled to be notified if the hangup tx
 // is dropped.
 return Async::NotReady;
 }
 Err(_) =  {
 // Hangup tx has been dropped.
 debug!( profile stream cancelled 
 return StreamState::SendLost.into();
 }
 },
 Ok(Async::Ready(None)) =  return StreamState::RecvDone.into(),
 //  正确取得 profile 结构
 Ok(Async::Ready(Some(profile))) =  { debug!( profile received: {:?} , profile);
 //  解析数据
 let retry_budget = profile.retry_budget.and_then(convert_retry_budget);
 let routes = profile
 .routes
 .into_iter()
 .filter_map(move |orig| convert_route(orig, retry_budget.as_ref()))
 .collect();
 let dst_overrides = profile
 .dst_overrides
 .into_iter()
 .filter_map(convert_dst_override)
 .collect();
 //  构造 profiles::Routes 结构并推到发送端
 match tx.start_send(profiles::Routes {
 routes,
 dst_overrides,
 }) { Ok(AsyncSink::Ready) =  {} // continue
 Ok(AsyncSink::NotReady(_)) =  {
 info!( dropping profile update due to a full buffer 
 // This must have been because another task stole
 // our tx slot? It seems pretty unlikely, but possible?
 return Async::NotReady;
 }
 Err(_) =  { return StreamState::SendLost.into();
 }
 }
 }
 Err(e) =  { warn!( profile stream failed: {:?} , e);
 return StreamState::RecvDone.into();
 }
 }
 }
 }

回到 MakeSvc::call 方法,前面创建的 route_stream 会被用于创建一个 linkerd2_proxy::proxy::http::profiles::router::Service 任务对象,并在其 poll_ready 方法中通过 poll_route_stream 从 route_steam 获取 profiles::Routes 并调用 update_routes 创建具体可用的路由规则 linkerd2_router::Router,至此,路由规则已建好,就等具体的请求过来然后在 call 中调用 linkerd2_router::call 进行对请求的路由判断。

图示 profile

感谢各位的阅读,以上就是“proxy 内部的运行逻辑是什么”的内容了,经过本文的学习后,相信大家对 proxy 内部的运行逻辑是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是丸趣 TV,丸趣 TV 小编将为大家推送更多相关知识点的文章,欢迎关注!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计9729字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)