Distributor

简要概述

分发器(Distributor)服务负责处理来自Prometheus的样本,它是无状态服务,可弹性扩缩容。

它是性能指标写入路径的第一站。一旦分发器接收到来自Prometheus的样本,每个样本都会被验证其正确性,并确保它在配置的租户限制范围内,如果特定租户没有覆盖限制,则回退到默认限制。通过验证的样本然后被分割成批次,并并行地发送到多个摄取器。

分发器进行的验证包括:

  1. 指标标签名称的形式正确性。
  2. 符合配置的每个指标标签的最大数量。
  3. 符合配置的标签名称和值的最大长度。
  4. 时间戳不早于/晚于配置的最小/最大时间范围。

多副本去重

见配置项 HATrackerConfig

分发器具有高可用性跟踪器,启用后,分发器会对来自冗余的Prometheus服务器的样本进行去重。

这允许您在Cortex中拥有多个相同的Prometheus服务器的高可用性副本,将相同的系列写入Cortex,然后在Cortex分发器中进行去重处理这些系列。

HA跟踪器根据集群和副本标签对传入的样本进行去重。集群标签在给定租户中唯一标识冗余的Prometheus服务器集群,而副本标签则唯一标识Prometheus集群内的副本。如果任何副本(不是当前集群中的主副本)接收到重复的传入样本,则认为这些样本是重复的(因此会被丢弃)。

HA跟踪器需要一个键值存储(KV存储)来协调当前选举的副本是哪个。分发器只会接受当前领导者的样本。默认情况下,只有一个或没有副本和集群标签的样本将被接受,并且永远不会被去重处理。

支持的建值存储有:

  1. consul
  2. etcd

这里均以 etcd 为标准。

一致性哈希

分发器使用一致性哈希算法结合可配置的复制因子来确定哪个摄取器实例应接收给定的系列样本。

支持两种哈希策略:

  1. 指标名称与租户ID
  2. 指标名、标签名、租户ID

默认是第一种策略,通过配置 -distributor.shard-by-all-labels=true 使用第二种策略。

第二种策略在于指标写入在多个摄取器之间更加平衡,但每个查询需要与所有摄取器进行通信,因为一个指标可能在多个摄取器之间分布,根据不同的标签集。

  • 哈希环

通过哈希环实现对样本数据在摄取器之间的一致性哈希分片和复制,所有摄取器都会使用一组自己拥有的令牌将自己注册到哈希环中,每个令牌都是一个随机的无符号32位数字,每个传入的样本数据在分发器中进行哈希计算,然后将其推送到拥有该系列样本哈希数字范围及哈希环中后续N-1个摄取器的拥有者,其中N为复制因子。

哈希环存储在健值存储中,支持的有:

  1. consul
  2. etcd
  3. Gossip memberlist

这里均以 etcd 为标准。

一致性模型

一致性法定人数(Quorum consistency)是分布式系统中一种确保读写操作满足特定法定人数要求的一致性模型。

在这个模型中,法定人数指的是在特定操作上需要达成一致的最小节点或副本数。

由于所有分发器共享对同一哈希环的访问,写请求可以发送到任何一个分发器,并且您可以在其前面设置一个无状态负载均衡器。

为了确保一致的查询结果,Cortex在读写操作上使用了Dynamo风格的法定人数一致性。这意味着在成功响应Prometheus写入请求之前,分发器将等待至少一半加一的摄取器给出积极响应,并将样本发送给它们。

负载均衡

建议对分发器实例进行随机负载均衡,以平衡写入请求。例如,如果您在Kubernetes集群中运行Cortex,可以将分发器作为一个Kubernetes服务来运行。

通过将分发器配置为Kubernetes服务,Kubernetes负责将写入请求随机地分发给可用的分发器实例,这样可以确保负载在不同的实例之间均衡分布,提高系统的可伸缩性和容错性。

配置Kubernetes服务时,可以使用负载均衡策略,例如使用随机算法来分配请求,以实现随机负载均衡。这样可以确保每个分发器实例都能够处理适当的写入请求,从而提高整体性能和可靠性。

配置示例

target: distributor

#auth_enabled: false

distributor:
  pool:
    client_cleanup_period: 15s
    health_check_ingesters: true
  ha_tracker:
    enable_ha_tracker: false
    ha_tracker_update_timeout: 15s
    ha_tracker_update_timeout_jitter_max: 5s
    ha_tracker_failover_timeout: 30s
    kvstore:
      store: etcd
      prefix: ha-tracker/
      etcd:
        endpoints: []
        dial_timeout: 10s
        max_retries: 10
        tls_enabled: false
        tls_cert_path: ""
        tls_key_path: ""
        tls_ca_path: ""
        tls_server_name: ""
        tls_insecure_skip_verify: false
        username: ""
        password: ""
  max_recv_msg_size: 104857600
  remote_timeout: 2s
  extra_queue_delay: 0s
  sharding_strategy: default
  shard_by_all_labels: true
  extend_writes: true
  ring:
    kvstore:
      store: etcd
      prefix: collectors/
      etcd:
        endpoints: []
        dial_timeout: 10s
        max_retries: 10
        tls_enabled: false
        tls_cert_path: ""
        tls_key_path: ""
        tls_ca_path: ""
        tls_server_name: ""
        tls_insecure_skip_verify: false
        username: ""
        password: ""
    heartbeat_period: 5s
    heartbeat_timeout: 1m0s
    instance_id: limiqi-mac2016-01.local
    instance_interface_names:
    - eth0
    - en0
    instance_port: 0
    instance_addr: ""
  instance_limits:
    max_ingestion_rate: 0
    max_inflight_push_requests: 0

应用接口

Remote Write

POST /api/v1/push

X-Prometheus-Remote-Write-Version: 0.1.0

<snappy <protoc buffer> >

用于在 prometheus 中配置 remote write 上传数据。

消息体使用 Protocol Buffer 序列化且使用 Snappy 压缩。

message WriteRequest {
  repeated TimeSeries timeseries = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "PreallocTimeseries"];
  enum SourceEnum {
    API = 0;
    RULE = 1;
  }
  SourceEnum Source = 2;
  repeated MetricMetadata metadata = 3 [(gogoproto.nullable) = true];

  bool skip_label_name_validation = 1000; //set intentionally high to keep WriteRequest compatible with upstream Prometheus
}

Distributor ring status

GET /distributor/ring

Tenants stats

GET /distributor/all_user_stats

HA tracker status

GET /distributor/ha_tracker

数据结构

github.com/cortexproject/cortex/pkg/distributor/distributor.go

Config

// Config contains the configuration required to
// create a Distributor
type Config struct {
    PoolConfig PoolConfig `yaml:"pool"`

    HATrackerConfig HATrackerConfig `yaml:"ha_tracker"`

    MaxRecvMsgSize  int           `yaml:"max_recv_msg_size"`
    RemoteTimeout   time.Duration `yaml:"remote_timeout"`
    ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`

    ShardingStrategy string `yaml:"sharding_strategy"`
    ShardByAllLabels bool   `yaml:"shard_by_all_labels"`
    ExtendWrites     bool   `yaml:"extend_writes"`

    // Distributors ring
    DistributorRing RingConfig `yaml:"ring"`

    // for testing and for extending the ingester by adding calls to the client
    IngesterClientFactory ring_client.PoolFactory `yaml:"-"`

    // when true the distributor does not validate the label name, Cortex doesn't directly use
    // this (and should never use it) but this feature is used by other projects built on top of it
    SkipLabelNameValidation bool `yaml:"-"`

    // This config is dynamically injected because defined in the querier config.
    ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`

    // Limits for distributor
    InstanceLimits InstanceLimits `yaml:"instance_limits"`
}

PoolConfig

// PoolConfig is config for creating a Pool.
type PoolConfig struct {
    ClientCleanupPeriod  time.Duration `yaml:"client_cleanup_period"`
    HealthCheckIngesters bool          `yaml:"health_check_ingesters"`
    RemoteTimeout        time.Duration `yaml:"-"`
}

HATrackerConfig

// HATrackerConfig contains the configuration require to
// create a HA Tracker.
type HATrackerConfig struct {
    EnableHATracker bool `yaml:"enable_ha_tracker"`
    // We should only update the timestamp if the difference
    // between the stored timestamp and the time we received a sample at
    // is more than this duration.
    UpdateTimeout          time.Duration `yaml:"ha_tracker_update_timeout"`
    UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max"`
    // We should only failover to accepting samples from a replica
    // other than the replica written in the KVStore if the difference
    // between the stored timestamp and the time we received a sample is
    // more than this duration
    FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"`

    KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes."`
}

kv.Config

github.com/cortexproject/cortex/pkg/ring/kv/client.go

// Config is config for a KVStore currently used by ring and HA tracker,
// where store can be consul or inmemory.
type Config struct {
    Store       string `yaml:"store"`
    Prefix      string `yaml:"prefix"`
    StoreConfig `yaml:",inline"`

    Mock Client `yaml:"-"`
}

kv.StoreConfig

// StoreConfig is a configuration used for building single store client, either
// Consul, Etcd, Memberlist or MultiClient. It was extracted from Config to keep
// single-client config separate from final client-config (with all the wrappers)
type StoreConfig struct {
    DynamoDB dynamodb.Config `yaml:"dynamodb"`
    Consul   consul.Config   `yaml:"consul"`
    Etcd     etcd.Config     `yaml:"etcd"`
    Multi    MultiConfig     `yaml:"multi"`

    // Function that returns memberlist.KV store to use. By using a function, we can delay
    // initialization of memberlist.KV until it is actually required.
    MemberlistKV func() (*memberlist.KV, error) `yaml:"-"`
}

etcd.Config

github.com/cortexproject/cortex/pkg/ring/kv/etcd/etcd.go

// Config for a new etcd.Client.
type Config struct {
    Endpoints   []string               `yaml:"endpoints"`
    DialTimeout time.Duration          `yaml:"dial_timeout"`
    MaxRetries  int                    `yaml:"max_retries"`
    EnableTLS   bool                   `yaml:"tls_enabled"`
    TLS         cortextls.ClientConfig `yaml:",inline"`

    UserName string `yaml:"username"`
    Password string `yaml:"password"`
}

RingConfig

// RingConfig masks the ring lifecycler config which contains
// many options not really required by the distributors ring. This config
// is used to strip down the config to the minimum, and avoid confusion
// to the user.
type RingConfig struct {
    KVStore          kv.Config     `yaml:"kvstore"`
    HeartbeatPeriod  time.Duration `yaml:"heartbeat_period"`
    HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`

    // Instance details
    InstanceID             string   `yaml:"instance_id" doc:"hidden"`
    InstanceInterfaceNames []string `yaml:"instance_interface_names"`
    InstancePort           int      `yaml:"instance_port" doc:"hidden"`
    InstanceAddr           string   `yaml:"instance_addr" doc:"hidden"`

    // Injected internally
    ListenPort int `yaml:"-"`
}

InstanceLimits

type InstanceLimits struct {
    MaxIngestionRate        float64 `yaml:"max_ingestion_rate"`
    MaxInflightPushRequests int     `yaml:"max_inflight_push_requests"`
}

memberlist.KVConfig

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

// KVConfig is a config for memberlist.KV
type KVConfig struct {
    // Memberlist options.
    NodeName            string        `yaml:"node_name"`
    RandomizeNodeName   bool          `yaml:"randomize_node_name"`
    StreamTimeout       time.Duration `yaml:"stream_timeout"`
    RetransmitMult      int           `yaml:"retransmit_factor"`
    PushPullInterval    time.Duration `yaml:"pull_push_interval"`
    GossipInterval      time.Duration `yaml:"gossip_interval"`
    GossipNodes         int           `yaml:"gossip_nodes"`
    GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time"`
    DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time"`
    EnableCompression   bool          `yaml:"compression_enabled"`

    // ip:port to advertise other cluster members. Used for NAT traversal
    AdvertiseAddr string `yaml:"advertise_addr"`
    AdvertisePort int    `yaml:"advertise_port"`

    // List of members to join
    JoinMembers      flagext.StringSlice `yaml:"join_members"`
    MinJoinBackoff   time.Duration       `yaml:"min_join_backoff"`
    MaxJoinBackoff   time.Duration       `yaml:"max_join_backoff"`
    MaxJoinRetries   int                 `yaml:"max_join_retries"`
    AbortIfJoinFails bool                `yaml:"abort_if_cluster_join_fails"`
    RejoinInterval   time.Duration       `yaml:"rejoin_interval"`

    // Remove LEFT ingesters from ring after this timeout.
    LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout"`

    // Timeout used when leaving the memberlist cluster.
    LeaveTimeout time.Duration `yaml:"leave_timeout"`

    // How much space to use to keep received and sent messages in memory (for troubleshooting).
    MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes"`

    TCPTransport TCPTransportConfig `yaml:",inline"`

    // Where to put custom metrics. Metrics are not registered, if this is nil.
    MetricsRegisterer prometheus.Registerer `yaml:"-"`
    MetricsNamespace  string                `yaml:"-"`

    // Codecs to register. Codecs need to be registered before joining other members.
    Codecs []codec.Codec `yaml:"-"`
}



最后修改 2023.07.06: refactor: update some (5fe4b38)