Compactor
3 分钟阅读
简要概述
组件 Compactor 用于将上传至对象存储中的多个数据块合并为一个更大的优化块,帮助减少存储成本并提高查询速度。通过去除重复数据和减小索引大小,能够有效地减少存储需求。同时,通过压缩数据块,查询过程中需要查询的数据块数量也减少,从而提高了查询的效率和速度。
此外,该组件还负责更新每个租户的桶索引。桶索引是 Queriers 服务、Store-gateway 和 rulers 等组件用于发现存储中新数据块的重要工具。通过保持桶索引的更新,同时确保其他组件能够及时发现新的数据块,以便进行查询、存储和规则处理等操作。
注意:使用该组件时,对 alertmanager、ruler 与 block 不能使用相同的 s3 bucket 名称,因为需要从 block 中下载数据解析,如果无法解析程序会异常退出。
如何工作
块压缩有两个主要好处:
- 垂直压缩:相同时间范围内由所有 Ingester 组件上传的块被压缩;
- 水平压缩:将具有较小时间范围的块水平压缩为一个较大的块。
垂直压缩
将同一时间范围(默认为2小时)内由 Ingester 组件上传的各个块分别按照租户划分合并为单个块,并且通过去重复样本将原本写入 N 个块(由于复制而产生)。这一步将每个租户在单个2小时时间范围内的块数从 #(Ingester 的数量) 减少到1。
水平压缩
水平压缩在垂直压缩之后触发,将相邻的两个2小时时间范围内的多个块压缩为一个更大的块。尽管此压缩后块块块的总大小不会改变,但它仍然可以显著减少存储网关在内存中保留的索引和索引头的大小。
分片支持
启用分片后,多个实例可以协作,通过租户对块进行分片和分配工作负载。在任何给定时间,一个租户的所有块由单个块压缩器实例处理,但不同租户的压缩可以同时在不同的块压缩器实例上运行。
每当块压缩器池增加或减少时(例如扩容或缩容),租户将在可用的块压缩器实例之间进行重新分片,无需手动干预。
块压缩器的分片功能基于哈希环。在启动时,块压缩器生成随机令牌并将自身注册到环中。运行时,它定期扫描存储桶(每隔 -compactor.compaction-interval)以发现存储中的租户列表,并压缩与其自身在环中分配的令牌范围匹配的每个租户的块。
此功能可以通过设置 -compactor.sharding-enabled=true 启用,并通过 -compactor.ring. 标志(或它们各自的 YAML 配置选项)配置后端哈希环。
数据块删除
当 Compactor 组件成功将几个源数据块压缩成一个大块时,源数据块将从对象存储中删除,这个删除不是立即进行的,按照以下两个步骤:
- 将数据块标记为删除(软删除);
- 当数据块标记删除时间超过
compactor.deletion-delay
参数设置值时,才真正从对象存储中移除(硬删除)。
块压缩器负责标记块进行删除,并在删除延迟到期后进行硬删除。软删除是基于存储桶中块位置内的一个名为 deletion-mark.json 的小文件进行的,查询器和存储网关都会查找该文件。
软删除机制用于给查询器和存储网关足够的时间在旧的源块被删除之前发现新的压缩块。如果块压缩器立即对源块进行硬删除,那么涉及压缩块的一些查询可能会失败,直到查询器和存储网关重新扫描存储桶并找到被删除的源块和新的压缩块。
压缩磁盘评估
当开始压缩时 Compactor 组件需要从对象存储中下载源数据至本地,同时临时在本地生成单一的大块文件,磁盘空间依赖最大的一个租户与参数 compactor.block-ranges
决定。
语法:
min_disk_space_required = compactor.compaction-concurrency * max_compaction_range_blocks_size * 2
大约 1千万 的活跃序列需要 150GB 的存储空间,那么假设现在最大租户活跃序列 3千万 同时仅允许一个线程 compactor.compaction-concurrency=1
进行操作,同时 compactor.block-ranges=24h
则所需空间为:
450GB = 1 * 24 * 2
配置示例
compactor:
block_ranges:
- 2h0m0s
- 12h0m0s
- 24h0m0s
block_sync_concurrency: 20
meta_sync_concurrency: 20
consistency_delay: 0s
data_dir: /data/cortex/compactor
compaction_interval: 1h0m0s
compaction_retries: 3
compaction_concurrency: 1
cleanup_interval: 15m0s
cleanup_concurrency: 20
deletion_delay: 12h0m0s
tenant_cleanup_delay: 6h0m0s
skip_blocks_with_out_of_order_chunks_enabled: false
block_files_concurrency: 10
blocks_fetch_concurrency: 3
block_deletion_marks_migration_enabled: false
sharding_enabled: true
sharding_strategy: default
sharding_ring:
kvstore:
store: "etcd"
prefix: "/cortex/collectors/"
etcd:
endpoints: ["192.168.31.201:2379","192.168.31.202:2379","192.168.31.203:2379"]
dial_timeout: 10s
max_retries: 10
tls_enabled: true
tls_cert_path: "/opt/cortex/pki/server.crt"
tls_key_path: "/opt/cortex/pki/server.key"
tls_ca_path: "/opt/cortex/pki/ca.crt"
tls_insecure_skip_verify: true
heartbeat_period: 5s
heartbeat_timeout: 1m0s
instance_id: node1
instance_port: 9005
instance_addr: "192.168.31.201"
数据结构
Config
// Config holds the Compactor config.
type Config struct {
BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"`
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
ConsistencyDelay time.Duration `yaml:"consistency_delay"`
DataDir string `yaml:"data_dir"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
CompactionRetries int `yaml:"compaction_retries"`
CompactionConcurrency int `yaml:"compaction_concurrency"`
CleanupInterval time.Duration `yaml:"cleanup_interval"`
CleanupConcurrency int `yaml:"cleanup_concurrency"`
DeletionDelay time.Duration `yaml:"deletion_delay"`
TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"`
SkipBlocksWithOutOfOrderChunksEnabled bool `yaml:"skip_blocks_with_out_of_order_chunks_enabled"`
BlockFilesConcurrency int `yaml:"block_files_concurrency"`
BlocksFetchConcurrency int `yaml:"blocks_fetch_concurrency"`
// Whether the migration of block deletion marks to the global markers location is enabled.
BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"`
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
// Compactors sharding.
ShardingEnabled bool `yaml:"sharding_enabled"`
ShardingStrategy string `yaml:"sharding_strategy"`
ShardingRing RingConfig `yaml:"sharding_ring"`
// No need to add options to customize the retry backoff,
// given the defaults should be fine, but allow to override
// it in tests.
retryMinBackoff time.Duration `yaml:"-"`
retryMaxBackoff time.Duration `yaml:"-"`
// Allow downstream projects to customise the blocks compactor.
BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"`
BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"`
// Block visit marker file config
BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"`
BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`
}
cortex_tsdb.DurationList
github.com/cortexproject/cortex/pkg/storage/tsdb
// DurationList is the block ranges for a tsdb
type DurationList []time.Duration
RingConfig
// RingConfig masks the ring lifecycler config which contains
// many options not really required by the compactors ring. This config
// is used to strip down the config to the minimum, and avoid confusion
// to the user.
type RingConfig struct {
KVStore kv.Config `yaml:"kvstore"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
// Wait ring stability.
WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration"`
WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration"`
// Instance details
InstanceID string `yaml:"instance_id" doc:"hidden"`
InstanceInterfaceNames []string `yaml:"instance_interface_names"`
InstancePort int `yaml:"instance_port" doc:"hidden"`
InstanceAddr string `yaml:"instance_addr" doc:"hidden"`
TokensFilePath string `yaml:"tokens_file_path"`
UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"`
// Injected internally
ListenPort int `yaml:"-"`
WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"`
ObservePeriod time.Duration `yaml:"-"`
}
kv.Config
github.com/cortexproject/cortex/pkg/ring/kv