Ingester

简要概述

摄取器服务负责将传入的样本数据写入长期存储后端(如对象存储或分布式文件系统等适合长期存储的后端),并在读取路径上返回内存中的系列样本供查询使用。

而在读取路径上,摄取器服务负责响应查询请求并返回内存中的系列样本。这样,对于频繁的查询操作,可以避免从长期存储后端读取数据,而是直接从内存中提供结果,提高查询性能和响应时间。通过将写入和读取功能分开,摄取器服务能够有效地管理数据的写入和查询操作,并提供快速的查询响应速度。

传入的样本数据并不会立即写入存储,而是保留在内存中,并定期刷新到存储中(默认情况下,每隔2小时)。因此,在执行读取路径上的查询时,查询器可能需要从摄取器和长期存储中获取样本。

由于系列样本在内存中被保留一段时间,某些查询可能需要获取最新的样本数据。在这种情况下,查询器将从摄取器中获取最新的内存样本,以便提供准确的查询结果。

然而,对于一些较旧的或不在内存中的样本,查询器将需要从长期存储中获取数据。这样可以确保查询器能够访问更全面的数据集,尤其是对于历史数据或较久远的样本。

通过在读取路径上同时从摄取器和长期存储获取样本,Cortex能够在查询过程中提供完整且准确的数据,同时利用内存中的样本提供更快的查询响应时间。这种结合内存和长期存储的策略可以平衡性能和数据可用性的需求。

生命周期

摄取器包含一个生命周期管理器(lifecycler),用于管理摄取器的生命周期并将摄取器状态存储在哈希环中。

每个摄取器可以处于以下状态之一:

  1. PENDING(待处理):摄取器刚启动。在此状态下,摄取器既不接收写入请求也不接收读取请求。
  2. JOINING(加入中):摄取器正在启动并加入哈希环。在此状态下,摄取器既不接收写入请求也不接收读取请求。摄取器将使用从磁盘加载的令牌(如果配置了-ingester.tokens-file-path)加入环,或者生成一组新的随机令牌。最后,摄取器可以选择观察环以检测令牌冲突,一旦解决任何冲突,摄取器将转移到ACTIVE状态。
  3. ACTIVE(活动状态):摄取器正在运行。在此状态下,摄取器可以同时接收写入请求和读取请求。
  4. LEAVING(离开中):摄取器正在关闭并离开哈希环。在此状态下,摄取器不接收写入请求,但仍可接收读取请求。
  5. UNHEALTHY(不健康状态):摄取器无法向哈希环的KV存储发送心跳信号。在此状态下,分发器在构建传入系列的复制集时会跳过该摄取器,并且摄取器不接收写入或读取请求。

摄取器是半有状态的。

数据丢失

如果进程崩溃或意外退出,所有尚未刷新到长期存储的内存中样本数据将会丢失。

有两种主要方式可以缓解这种故障模式:

  1. 复制(Replication)

使用复制可以在摄取器中保存每个时间序列的多个副本(通常为3个)

如果Cortex集群失去一个摄取器,被丢失的摄取器所持有的内存中系列样本也会至少被复制到另一个摄取器。在单个摄取器故障的情况下,不会丢失任何时间序列样本;但在多个摄取器故障的情况下,如果所有持有特定时间序列副本的摄取器都受到故障影响,那么该时间序列可能会丢失。

  1. 预写日志(Write-ahead log,WAL)

使用预写日志将所有传入的系列样本写入持久性磁盘,直到它们被刷新到长期存储,在摄取器发生故障的情况下,后续的进程重启将重放预写日志并恢复内存中的系列样本。

与仅使用复制不同,由于持久性磁盘数据不会丢失,在多个摄取器故障的情况下,每个摄取器都将在后续重启时从预写日志中恢复内存中的系列样本。仍然建议使用复制,以确保在单个摄取器故障的情况下,读取路径不会出现临时故障。

因此,在Cortex中,复制和预写日志(WAL)是两种主要的故障处理机制。复制提供冗余备份以减少数据丢失的风险,而预写日志提供了数据持久性和恢复能力。这两种方式可以结合使用,以提供更高的数据可靠性和系统的容错性。

写入解放

摄取器的写入解放(Write de-amplification)是一种机制,用于减少写入操作对长期存储的压力和负载,从而降低系统的成本和复杂性。

摄取器在内存中存储最近接收的样本,以执行写入解放(write de-amplification)。如果摄取器立即将接收到的样本写入长期存储,系统将面临存储压力过大,难以实现扩展性的问题。因此,摄取器会在内存中对样本进行批处理和压缩,并定期将它们刷新到长期存储中。

写入解放是Cortex系统低总拥有成本(Total Cost of Ownership,TCO)的主要原因之一。通过在内存中对样本进行批处理和压缩,摄取器减少了写入长期存储的次数和数据量。这种批处理和压缩的操作减少了对存储系统的负载,降低了硬件资源需求和运维成本。

通过写入解放,Cortex系统能够更有效地利用存储资源,提高了系统的性能和扩展性。通过批处理和压缩,摄取器在将数据刷新到长期存储之前,减少了存储访问的频率和数据量。这不仅减轻了存储系统的负担,还降低了存储成本。写入解放机制是Cortex系统设计中的关键策略,有助于实现更经济高效的数据处理和存储管理,从而降低了系统的总拥有成本。

配置示例

ingester_config

官方文档

ingester:
  lifecycler:
    ring:
      kvstore:
        store: etcd
        prefix: collectors/
        etcd:
          endpoints: []
          dial_timeout: 10s
          max_retries: 10
          tls_enabled: false
          tls_cert_path: ""
          tls_key_path: ""
          tls_ca_path: ""
          tls_server_name: ""
          tls_insecure_skip_verify: false
          username: ""
          password: ""
      heartbeat_timeout: 1m0s
      replication_factor: 1
      zone_awareness_enabled: false
      excluded_zones: ""
    num_tokens: 512
    heartbeat_period: 5s
    observe_period: 0s
    join_after: 0s
    min_ready_duration: 0s
    interface_names:
    - eth0
    - en0
    final_sleep: 0s
    tokens_file_path: ""
    availability_zone: ""
    unregister_on_shutdown: true
    readiness_check_ring_health: true
    address: ""
    port: 0
    id: limiqi-mac2016-01.local
  metadata_retain_period: 10m0s
  rate_update_period: 15s
  active_series_metrics_enabled: true
  active_series_metrics_update_period: 1m0s
  active_series_metrics_idle_timeout: 10m0s
  instance_limits:
    max_ingestion_rate: 0
    max_tenants: 0
    max_series: 0
    max_inflight_push_requests: 0
  ignore_series_limit_for_metric_names: ""

ingester_client_config

官方文档

ingester_client:
  grpc_client_config:
    max_recv_msg_size: 104857600
    max_send_msg_size: 104857600
    grpc_compression: gzip
    rate_limit: 0
    rate_limit_burst: 0
    backoff_on_ratelimits: false
    backoff_config:
      min_period: 100ms
      max_period: 10s
      max_retries: 10
    tls_enabled: false
    tls_cert_path: ""
    tls_key_path: ""
    tls_ca_path: ""
    tls_server_name: ""
    tls_insecure_skip_verify: false

数据结构

github.com/cortexproject/cortex/pkg/ingester/ingester.go

Config

// Config for an Ingester.
type Config struct {
    LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler"`

    // Config for metadata purging.
    MetadataRetainPeriod time.Duration `yaml:"metadata_retain_period"`

    RateUpdatePeriod            time.Duration `yaml:"rate_update_period"`
    UserTSDBConfigsUpdatePeriod time.Duration `yaml:"user_tsdb_configs_update_period"`

    ActiveSeriesMetricsEnabled      bool          `yaml:"active_series_metrics_enabled"`
    ActiveSeriesMetricsUpdatePeriod time.Duration `yaml:"active_series_metrics_update_period"`
    ActiveSeriesMetricsIdleTimeout  time.Duration `yaml:"active_series_metrics_idle_timeout"`

    // Use blocks storage.
    BlocksStorageConfig cortex_tsdb.BlocksStorageConfig `yaml:"-"`

    // Injected at runtime and read from the distributor config, required
    // to accurately apply global limits.
    DistributorShardingStrategy string `yaml:"-"`
    DistributorShardByAllLabels bool   `yaml:"-"`

    // Injected at runtime and read from querier config.
    QueryStoreForLabels  bool          `yaml:"-"`
    QueryIngestersWithin time.Duration `yaml:"-"`

    DefaultLimits    InstanceLimits         `yaml:"instance_limits"`
    InstanceLimitsFn func() *InstanceLimits `yaml:"-"`

    IgnoreSeriesLimitForMetricNames string `yaml:"ignore_series_limit_for_metric_names"`

    // For testing, you can override the address and ID of this ingester.
    ingesterClientFactory func(addr string, cfg client.Config) (client.HealthAndIngesterClient, error)
}

ring.LifecyclerConfig

github.com/cortexproject/cortex/pkg/ring/ring.go

// LifecyclerConfig is the config to build a Lifecycler.
type LifecyclerConfig struct {
    RingConfig Config `yaml:"ring"`

    // Config for the ingester lifecycle control
    NumTokens                int           `yaml:"num_tokens"`
    HeartbeatPeriod          time.Duration `yaml:"heartbeat_period"`
    ObservePeriod            time.Duration `yaml:"observe_period"`
    JoinAfter                time.Duration `yaml:"join_after"`
    MinReadyDuration         time.Duration `yaml:"min_ready_duration"`
    InfNames                 []string      `yaml:"interface_names"`
    FinalSleep               time.Duration `yaml:"final_sleep"`
    TokensFilePath           string        `yaml:"tokens_file_path"`
    Zone                     string        `yaml:"availability_zone"`
    UnregisterOnShutdown     bool          `yaml:"unregister_on_shutdown"`
    ReadinessCheckRingHealth bool          `yaml:"readiness_check_ring_health"`

    // For testing, you can override the address and ID of this ingester
    Addr string `yaml:"address" doc:"hidden"`
    Port int    `doc:"hidden"`
    ID   string `doc:"hidden"`

    // Injected internally
    ListenPort int `yaml:"-"`
}
// Config for a Ring
type Config struct {
    KVStore              kv.Config              `yaml:"kvstore"`
    HeartbeatTimeout     time.Duration          `yaml:"heartbeat_timeout"`
    ReplicationFactor    int                    `yaml:"replication_factor"`
    ZoneAwarenessEnabled bool                   `yaml:"zone_awareness_enabled"`
    ExcludedZones        flagext.StringSliceCSV `yaml:"excluded_zones"`

    // Whether the shuffle-sharding subring cache is disabled. This option is set
    // internally and never exposed to the user.
    SubringCacheDisabled bool `yaml:"-"`
}

kv.Config

TODO;

flagext.StringSliceCSV

// StringSliceCSV is a slice of strings that is parsed from a comma-separated string
// It implements flag.Value and yaml Marshalers
type StringSliceCSV []string

InstanceLimits

// InstanceLimits describes limits used by ingester. Reaching any of these will result in Push method to return
// (internal) error.
type InstanceLimits struct {
    MaxIngestionRate        float64 `yaml:"max_ingestion_rate"`
    MaxInMemoryTenants      int64   `yaml:"max_tenants"`
    MaxInMemorySeries       int64   `yaml:"max_series"`
    MaxInflightPushRequests int64   `yaml:"max_inflight_push_requests"`
}

client.Config

// Config is the configuration struct for the ingester client
type Config struct {
    GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
}

grpcclient.Config

// Config for a gRPC client.
type Config struct {
    MaxRecvMsgSize  int     `yaml:"max_recv_msg_size"`
    MaxSendMsgSize  int     `yaml:"max_send_msg_size"`
    GRPCCompression string  `yaml:"grpc_compression"`
    RateLimit       float64 `yaml:"rate_limit"`
    RateLimitBurst  int     `yaml:"rate_limit_burst"`

    BackoffOnRatelimits bool           `yaml:"backoff_on_ratelimits"`
    BackoffConfig       backoff.Config `yaml:"backoff_config"`

    TLSEnabled bool             `yaml:"tls_enabled"`
    TLS        tls.ClientConfig `yaml:",inline"`
}



最后修改 2023.06.19: docs: 完善 cortex 文档 (eaf7c84)