惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

F
Full Disclosure
V
Vulnerabilities – Threatpost
Attack and Defense Labs
Attack and Defense Labs
N
News and Events Feed by Topic
SecWiki News
SecWiki News
S
Security @ Cisco Blogs
Schneier on Security
Schneier on Security
B
Blog
TaoSecurity Blog
TaoSecurity Blog
The Last Watchdog
The Last Watchdog
H
Hacker News: Front Page
Hacker News - Newest:
Hacker News - Newest: "LLM"
博客园_首页
D
Docker
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
Y
Y Combinator Blog
W
WeLiveSecurity
N
News and Events Feed by Topic
F
Fortinet All Blogs
PCI Perspectives
PCI Perspectives
WordPress大学
WordPress大学
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
www.infosecurity-magazine.com
www.infosecurity-magazine.com
Recent Announcements
Recent Announcements
Forbes - Security
Forbes - Security
T
Tailwind CSS Blog
Hacker News: Ask HN
Hacker News: Ask HN
爱范儿
爱范儿
腾讯CDC
Last Week in AI
Last Week in AI
月光博客
月光博客
C
Cybersecurity and Infrastructure Security Agency CISA
P
Proofpoint News Feed
Help Net Security
Help Net Security
V
V2EX
C
Cyber Attacks, Cyber Crime and Cyber Security
C
CXSECURITY Database RSS Feed - CXSecurity.com
H
Heimdal Security Blog
L
LINUX DO - 最新话题
GbyAI
GbyAI
The Hacker News
The Hacker News
罗磊的独立博客
S
SegmentFault 最新的问题
H
Hackread – Cybersecurity News, Data Breaches, AI and More
博客园 - 【当耐特】
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
V2EX - 技术
V2EX - 技术
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
O
OpenAI News
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻

I'm OWenT

国产大模型(GLM 5.1、Kimi K2.6)真实场景效果和 Coding Plan 额度测试 新版本libatapp的连接管理——从etcd服务发现到拓扑驱动的自动重连 新版本libatbus的设计变更——从树形路由到拓扑驱动 Protobuf又一坑 - C++标准和ABI兼容性 AI真好用-给Blog主题统一加mermaid,chart.js,excalidraw,draw.io的多种引入方式支持 给内网部署Squid-通用HTTP下载缓存 UE使用CodeChecker和clang-tidy生成静态分析报告 找出UE的循环依赖 C++小协程栈和临时变量及作用域的栈溢出问题分析 游戏服务的可观测性能力建设(C++生态) 协程(libcopp)的Channel功能和CPU命中率优化 通用RPC代码生成器 实现strong_rc_ptr(比shared_ptr更快的引用计数智能指针) 手夯一个STL allocator和对象内存分析组件 std::condition_variable 的信号丢失问题 踩坑一处(GCC)STL std::async 实现BUG导致的crash问题 GCC 14的一个warning to error BUG 给xresloader(Excel导表工具)增强UE读表支持(包含蓝图,Blueprint) Opentelemetry社区在gRPC的几个链接问题(静态库和动态库混用,musl工具链,符号裁剪) Excel转表工具(xresloader)的新验证器(验证外部Excel和文本数据,唯一性和自定义规则) protobuf v22和gRPC v1.55版本升级的依赖变化和upb适配 关于protobuf近期版本(v20/v3.20+)和 gRPC v1.54版本在某些编译环境下的一些链接和编译问题 xresloader-Excel导表工具链的近期变更汇总 打通游戏服务端框架的C++20协程改造的最后一环 Opentelemetry-cpp的Logs模块标准更新(涉及近期版本:1.8-1.9的BREAK CHANGES) 给cmake-toolset和工具链(curl等)加HTTP/2和HTTP/3支持 又开新坑之 coredns 插件: nftables和filter 关于opentelemetry-cpp社区对于C++ Head Only组件单例和符号可见性的讨论小记 填个转表工具 xresloader 去年的坑(数组尾部裁剪) 集成 upb 和 lua binding 的踩坑小记 libcopp对C++20协程的接入和接口设计 再度优化GCC、LLVM、Clang、libc++、libc++abi等套件的构建脚本 游戏服务的分布式事务优化(二)- 事务管理 游戏服务的分布式事务优化(一)- Write Ahead Log(WAL) 模块 记录一些bazel适配用编译选项 测试现代化硬件C++浮点数性能和一致性 适配Boringssl和OpenSSL 3.0 近期cmake-toolset的一些适配问题 C++20 Text Formatting/fmtlib 适配问题小记 再次重构LLVM+Clang+libcxx+libc++abi+其他相关工具的构建流程 重构基于CMake的构建工具链 新版GCC和LLVM+Clang终于Release啦 折腾一下nftables下的双拨 [C++20] Module partitions和符号交叉引用(声明和实现分离) [Rust] 实现一个线程安全且迭代器可以保存的链表 基于protobuf的代码生成 几个使用protobuf中C++接口的Arena的坑 Amazon Aurora DB存储引擎论文阅读小记 近期对libatapp的一些优化调整(增加服务发现和连接管理,支持yaml等) xresloader转表工具链增加了一些新功能(map,oneof支持,输出矩阵,基于模板引擎的加载代码生成等) 在游戏服务器中使用分布式事务 libcopp接入C++20 Coroutine和一些过渡期的设计 libatbus 的大幅优化 nftables初体验 容器配置开发环境小计 PALM Tree - 适合多核并发架构的B+树 - 论文阅读小记 跨平台协程库 - libcopp 简介 C++20 Coroutine 性能测试 (附带和libcopp/libco/libgo/goroutine/linux ucontext对比) 尝鲜Github Action 一些xresloader(转表工具)的改进 protobuf、flatbuffer、msgpack 针对小数据包的简单对比 协程框架(libcopp) 小幅优化 Excel转表工具(xresloader) 增加protobuf插件功能和集成 UnrealEngine 支持 Anna(支持任意扩展和超高性能的KV数据库系统)阅读笔记 C++20 Coroutine libcopp merge boost.context 1.69.0 Google去中心化分布式系统论文三件套(Percolator、Spanner、F1)读后感 Rust玩具-企业微信机器人通用服务 使用ELK辅助监控开发测试环境服务质量和问题定位 2018年的新通用伪随机数算法(xoshiro / xoroshiro)的C++(head only)实现 Webpack+vue+boostrap+ejs构建Web版GM工具 Rust的第二次接触-写个小服务器程序 理解和适配AEAD加密套件 atsf4g-co的进化:协程框架v2、对象路由系统和一些其他细节优化 协程框架(libcopp)v2优化、自适应栈池和同类库的Benchmark对比 可执行文件压缩 初识Rust 使用restructedtext编写xresloader文档 atframework的etcd模块化重构 C++的backtrace ECDH椭圆双曲线(比DH快10倍的密钥交换)算法简介和封装 protobuf-net的动态Message实现 pbc的proto3接入 atgateway内置协议流程优化-加密、算法协商和ECDH 整理一波软件源镜像同步工具+DevOps工具 Blog切换到Hugo libcopp v2的第一波优化完成 libcopp(v2) vs goroutine性能测试 libcopp的线程安全、栈池和merge boost.context 1.64.0 GCC 7和LLVM+Clang+libc++abi 4.0的构建脚本 libatbus的几个藏得很深的bug 用cmake交叉编译到iOS和Android 开源项目得一些小维护 atapp的c binding和c#适配 对象路由系统设计 2016年总结 近期的一个协程流程BUG 重写了llvm+clang+libc++和libc++abi的构建脚本 atsf4g完整游戏工程示例|I'm OWenT atframework基本框架已经完成|I'm OWenT
指标上报的多线程优化和多拉取源点优化
owent · 2025-05-21 · via I'm OWenT

blog-website

前言

我给我们的服务器框架深度集成了一些可观测性的能力。使用 opentelemetry-cpp 作为接入层。 在指标方面,我们允许业务层自由地定制化指标上报和拉取,并以此实现策略控制。上报的时候有Pull模式接口(异步接口),也有Push模式接口(同步接口)。 为了减少 opentelemetry-cpp 内部的视图合并开销,性能最佳,我们尽量使用异步接口。 但是这种情况下由于 opentelemetry-cpp 内部存在后台Processor线程、Exporter线程等,指标的采集往往需要跨线程操作。 这就要求我们上报代码逻辑需要保证线程安全。

而要求所有逻辑代码保证线程安全,一方面对于深层次有复杂关系的数据,代码复杂度比较高,很容易出错;另一方面如果过多无脑加锁也会一种开销和资源浪费。 所以我尝试抽象了一组接口来屏蔽这个细节,让业务层可以无脑接入。

性能和易用性问题

最早我们使用 opentelemetry-cpp 比较粗暴,直接调用同步接口。在上报量稍微大点的时候,因为频繁触发视图的属性比较和Merge计算,导致某些场景的CPU开销能占到 10%。 所以后来进行了一系列优化,第一步骤就是通过一些预统计,减少属性集比较和视图合并。然后采用异步接口上报。

大致的采集流程如下:

flowchart LR
  subgraph PullExporter["Pull模式驱动器"]
    direction LR
        PPull("Prometheus Pull Exporter
            (MetricReader)
            【fa:fa-globe 外部请求触发】")
        PEMR("PeriodicExportingMetricReader
            【fa:fa-clock 定时器触发】")
  end
  subgraph PrometheusExporter["Prometheus和其他"]
    direction LR
        PPush("fa:fa-file-export Prometheus Push Exporter")
        PFile("fa:fa-file-export Prometheus File Exporter")
        OtherPushExporter("fa:fa-file-export 其他Push模式Exporter...")
  end
  subgraph OTLPExporter["OTLP"]
    direction LR
        OTLPGRPC("fa:fa-file-export OTLP gRPC Exporter")
        OTLPHTTP("fa:fa-file-export OTLP HTTP Exporter")
        OTLPFile("fa:fa-file-export OTLP File Exporter")
  end
  subgraph PushExporter["Push模式输出"]
    direction TB
        PrometheusExporter
        OTLPExporter
  end
  subgraph Meters["多个指标"]
    direction LR
        Meter1("Meter 1")
        Meter2("Meter 2")
        Meter3("Meter 3")
        Meter4("Meter ...")
  end
  classDef callback stroke:#f00
  subgraph MeterCallbacks["指标回调(每个指标)"]
    direction TB
        Callback["Callback"]:::callback
        ObservableRegistry["ObservableRegistry"]       
  end
  subgraph SyncMeterAPI["同步指标接口(每个指标)"]
    direction LR
        Counter["fa:fa-paper-plane Counter"]
        Gauge["fa:fa-paper-plane Gauge"]
        Histogram["fa:fa-paper-plane Histogram"]
  end
  subgraph MetricStorageLayer["存储层(每个视图)"]
    direction TB
        SyncMetricStorage["fa:fa-layer-group 同步存储层"]
        AsyncMetricStorage["fa:fa-layer-group 异步存储层"]
  end
  subgraph MeterCollect["采集层"]
        MC("MetricCollector")
        MetricProducer("MetricProducer")
        Meters
        MeterCallbacks
        SyncMeterAPI
        MetricStorageLayer
  end
    PullExporter -- 驱动触发采集 --> MetricProducer
    MetricProducer --> MC
    MC --> Meters
    ObservableRegistry --> Callback
    Meters --> MeterCallbacks
    SyncMeterAPI --> MetricStorageLayer
    MeterCallbacks --> MetricStorageLayer
    MetricStorageLayer -- 通知指标提取 --> Meters
    Meters -- Collect获取结果后 --> PushExporter
    SyncMetricStorage@{ shape: lin-cyl}
    AsyncMetricStorage@{ shape: lin-cyl}

但是异步接口调用有一定复杂度,一个最简单的注册指标的流程如下:

using otel_observer_result_int64 = opentelemetry::metrics::ObserverResultT<int64_t>;
using otel_observer_result_double = opentelemetry::metrics::ObserverResultT<double>;

auto meter = provider->GetMeter("meter name");
auto instrument = meter->CreateInt64ObservableGauge("instrument name", "instrument description", "instrument unit");

instrument->AddCallback([](opentelemetry::metrics::ObserverResult result, void* /*private_data*/) {
    if (opentelemetry::nostd::holds_alternative<opentelemetry::nostd::shared_ptr<otel_observer_result_int64>>(result)) {
        auto type_result = opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<otel_observer_result_int64>>(result);
        if (type_result) {
            type_result->Observe(static_cast<int64_t>(get_result_value()), {} /* attributes */);
        }
    } else if (opentelemetry::nostd::holds_alternative<opentelemetry::nostd::shared_ptr<otel_observer_result_double>>(result)) {
        auto type_result = opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<otel_observer_result_double>>(result);
        type_result->Observe(static_cast<double>(result_value), {} /* attributes */);
    }
} /*, void* private_data*/);

这个接口有几个需要注意的地方:

  • 上报一个指标,有meter和instrument的概念。目前v1版本是不支持删除callback、instrument和meter的。目前即便v2版本也删不干净。所以如果reload,最好是重新创建provider,那么这里这个meter、instrument和callback都要重新创建和注册。

由于最后一个Provider引用释放的时候,otel-cpp会自动调用一次Flush吧所有已经导出的数据强制刷出。这会导致线程Block,所以为了不影响业务主线程。这里还要处理一次Reload的时候另起线程来执行Flush。

  • 指标类型和调用Observe的传入类型要匹配,如果涉及浮点和整数转换的话,还要考虑 epsilon
  • 回调函数的签名是 using ObservableCallbackPtr = void (*)(ObserverResult, void *); 只能透传一个 void* ,如果要包装更复杂的数据透传需要自己封装。
  • 回调执行会跨线程,所以数据上报要保证线程安全。
  • 多源拉取时可能会反复触发回调,所以不能简单地根据回调时间差来计算增量部分,否则可能导致误差。

我们通过抽象了一层来一次性解决这些问题。每次业务层采集从缓存里读数据,并且底层直接处理掉类型转换。比如:

for (auto& record : metrics_item->collected_records) {
    if (!record) {
    continue;
    }
    ++export_record_count;

    if (opentelemetry::nostd::holds_alternative<
            opentelemetry::nostd::shared_ptr<opentelemetry::metrics::ObserverResultT<int64_t>>>(result)) {
    auto real_observer = opentelemetry::nostd::get<
        opentelemetry::nostd::shared_ptr<opentelemetry::metrics::ObserverResultT<int64_t>>>(result);
    if (real_observer) {
        real_observer->Observe(get_opentelemetry_utility_metrics_record_value_as_int64(record->value),
                                opentelemetry_utility::get_attributes(record->attributes));
    }
    } else if (opentelemetry::nostd::holds_alternative<
                    opentelemetry::nostd::shared_ptr<opentelemetry::metrics::ObserverResultT<double>>>(result)) {
    auto real_observer = opentelemetry::nostd::get<
        opentelemetry::nostd::shared_ptr<opentelemetry::metrics::ObserverResultT<double>>>(result);
    if (real_observer) {
        real_observer->Observe(get_opentelemetry_utility_metrics_record_value_as_double(record->value),
                                opentelemetry_utility::get_attributes(record->attributes));
    }
    }
}

这里 metrics_item->collected_records 什么时候刷新呢?每次触发采集的时候,我们会检查采集版本号,当超出采集周期的时候会自增需要采集的版本号。

if (now - metrics_item->collected_timepoint > metrics_item->collect_interval) {
  metrics_item->export_version.store(metrics_item->collect_version.load(std::memory_order_acquire),
                                     std::memory_order_release);
  callback_data_set.second.collecting_version.fetch_add(1, std::memory_order_release);
  metrics_item->collected_timepoint = now;
  std::lock_guard<std::recursive_mutex> collected_lock_guard{metrics_item->collected_lock};
  metrics_item->collected_records.clear();
} else if (now < metrics_item->collected_timepoint) {
  metrics_item->collected_timepoint = now;
}

然后外部Tick的时候,发现这个collecting_version大于导出数据版本的时候,才去触发用户层采集接口。 在多源抓取的时候,可能在一个采集周期内多次触发采集回调。这时候就能直接命中缓存,直接返回。

这里有个小细节是因为指标数量可能比较多,如果一帧里全量采集可能会导致卡顿。所以有必要在一次采集消耗过高时分针增量采集。 我们现在是以一个指标为单位,比如有2000个指标,采集200个就话费了比较长时间了,就让出时间片,下一个Tick从第201个开始。 这里也要限制每个Tick的采集量,尽可能公平调度,防止引起其他功能模块饥饿。

由于希望业务层不要关心线程安全的复杂性,所以我们的OTEL层采集在专门的采集线程上,但是业务层的采集总是在主线程执行。中间用无锁队列串起来,大致结构如下:

2503-01.png

自动重注册

在Reload的时候,我们需要支持重新加载所有的配置资源、processor、exporter等组件。但是整个OTEL SDK里是没有移除接口的。整个处理组件和IO组件一旦搭配好就处于immutable状态。 那么为了实现Reload,最简单的方式就是从最上游开始,也就是从Provider层开始整个重建。这里涉及两个问题。

问题一:在创建完新Provider后,销毁前一个Provider时,会触发对正在导出的数据的Flush操作。这有可能是个延迟操作,会阻塞调用方。显然我们不能接受足额阻塞我们业务线程。 所以这里的解决方法就是创建一个专门的销毁线程。

std::shared_ptr<global_service_data_type> current_service_cache = get_global_service_data();
if (!current_service_cache) {
    return;
}

// 我们的系统存在多分组,所以要处理一下默认分组和命名分组
std::shared_ptr<group_type> previous_group = group;
if (group->group_name.empty()) {
    current_service_cache->default_group.swap(previous_group);
} else {
    current_service_cache->named_groups[group->group_name].swap(previous_group);
}

// Shutdown in another thread to avoid blocking
do {
    if (!previous_group || previous_group == group) {
        break;
    }
    // 这里引用provider是为了续上provider的生命周期传给自线程内执行Flush,否则最后一个引用计数结束会立即执行Flush阻塞。
    if (previous_group->logs_handle.provider == group->logs_handle.provider) {
        previous_group->logs_handle.reset();
    }
    if (previous_group->metrics_handle.provider == group->metrics_handle.provider) {
        previous_group->metrics_handle.reset();
    }
    if (previous_group->tracer_handle.provider == group->tracer_handle.provider) {
        previous_group->tracer_handle.reset();
    }
    std::thread cleanup_thread([previous_group, current_service_cache]() {
        // 子线程执行Flush和销毁操作,这里面是执行一些线程安全相关的设置和Flush
        _opentelemetry_cleanup_group(previous_group, current_service_cache);
    });
    cleanup_thread.detach();
} while (false);

static void _opentelemetry_cleanup_group(std::shared_ptr<::mx::telemetry::group_type> group,
                                         const std::shared_ptr<global_service_data_type> &current_service_cache) {
  std::list<std::shared_ptr<std::thread>> shutdown_threads;

  if (current_service_cache && group && group->initialized) {
    atfw::util::lock::read_lock_holder<atfw::util::lock::spin_rw_lock> lock_guard{
        current_service_cache->on_group_destroy_callback_lock};
    for (auto &callback_fn : current_service_cache->on_group_destroy_callbacks) {
      if (callback_fn) {
        callback_fn(group);
      }
    }

    group->initialized = false;
  }

  if (group) {
    // Provider must be destroy before logger
    {
      atfw::util::lock::write_lock_holder<atfw::util::lock::spin_rw_lock> lock_guard{group->logger_lock};
      if (group->logs_handle.provider) {
        if (group->logs_handle.shutdown_callback) {
          auto handle = group->logs_handle;
          // 每个组件分别再在子线程内销毁,减少远端配置错误或者远端延迟导致累计延迟
          shutdown_threads.push_back(
              atfw::memory::stl::make_shared<std::thread>([handle]() { handle.shutdown_callback(handle.provider); }));
        }
        group->logs_handle.reset();
      }
      group->default_logger = opentelemetry::nostd::shared_ptr<opentelemetry::logs::Logger>();
    }

    // ...
}

// 这里是其中一个Shutdown回调的代码
ret.shutdown_callback =
    [processors_ptr](const opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider> &provider) {
    if (!provider) {
        return;
    }

    auto trace_provider = opentelemetry::trace::Provider::GetTracerProvider();
    if (trace_provider == provider) {
        opentelemetry::trace::Provider::SetTracerProvider(
            opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
                new opentelemetry::trace::NoopTracerProvider()));
    }

    // 处理优雅退出时无限等待的问题
    if (processors_ptr && !processors_ptr->empty()) {
        std::chrono::microseconds timeout =
            std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::seconds{10});
        auto app = atapp::app::get_last_instance();
        if (nullptr != app) {
        timeout = std::chrono::duration_cast<std::chrono::microseconds>(
            std::chrono::seconds{app->get_origin_configure().timer().stop_timeout().seconds() / 2});
        timeout += std::chrono::duration_cast<std::chrono::microseconds>(
            std::chrono::nanoseconds{app->get_origin_configure().timer().stop_timeout().nanos() / 2});
        }
        // 某些老版本Provider层调用Shutdown没有透传timeout到processor,会导致Flush时间过长。这里显示调用一次来解决
        for (auto &processor : *processors_ptr) {
            processor->Shutdown(timeout);
        }
        processors_ptr->clear();
    }

    static_cast<opentelemetry::sdk::trace::TracerProvider *>(provider.get())->Shutdown();
    };

问题二:在重新创建后所有的指标监听都需要重建。我们有不希望把这个重复注册的复杂性暴露给用户层,所以底层做了另外一层缓存。在重建Provider之后也自动重新注册。

因为我们之前采集层面已经做了数据层缓存分离了,所以我们这里在那个缓存层之上顺便把注册也缓存了就行了。

SERVER_FRAME_API bool opentelemetry_utility::add_global_metics_observable_int64(
    metrics_observable_type type, opentelemetry::nostd::string_view meter_name, meter_instrument_key metrics_key,
    std::function<void(metrics_observer&)> fn) {
  if (!fn) {
    return false;
  }

  // opentelemetry only use metrics name as key of metric storage
  std::string key = atfw::util::log::format("{}:{}", gsl::string_view{meter_name.data(), meter_name.size()},
                                            gsl::string_view{metrics_key.name.data(), metrics_key.name.size()});

  std::pair<std::recursive_mutex&, opentelemetry_utility_global_metrics_set&> data_set = get_global_metrics_set();
  {
    std::lock_guard<std::recursive_mutex> lock_guard{data_set.first};
    auto iter = data_set.second.int64_observable_by_key.find(key);
    if (data_set.second.int64_observable_by_key.end() != iter && iter->second) {
      iter->second->callback = fn;
      return true;
    }
  }

  auto handle = atfw::memory::stl::make_shared<opentelemetry_utility::metrics_observer>();
  if (!handle) {
    return false;
  }
  handle->key = key;
  handle->type = type;
  handle->meter_name = static_cast<std::string>(meter_name);
  handle->meter_instrument_name = static_cast<std::string>(metrics_key.name);
  handle->meter_instrument_description = static_cast<std::string>(metrics_key.description);
  handle->meter_instrument_unit = static_cast<std::string>(metrics_key.unit);
  handle->collect_interval = protobuf_to_chrono_duration<>(
      logic_config::me()->get_logic().telemetry().opentelemetry().metrics().reader().export_interval());
  if (handle->collect_interval < std::chrono::seconds{2}) {
    handle->collect_interval = std::chrono::seconds{15};
  } else {
    handle->collect_interval -= std::chrono::seconds{1};
  }
  handle->callback = std::move(fn);
  handle->origin_callback = nullptr;
  if (false == data_set.second.initialized.load(std::memory_order_acquire) ||
      internal_add_global_metrics_observable_int64(*handle)) {
    std::lock_guard<std::recursive_mutex> lock_guard{data_set.first};
    data_set.second.int64_observable_by_key[key] = handle;
    data_set.second.int64_observable_by_pointer[reinterpret_cast<void*>(handle.get())] = handle;
    ++data_set.second.collecting_version;
    return true;
  } else {
    return false;
  }
}

这里面 internal_add_global_metrics_observable_int64 是实际执行注册observable。Reload的时候重新调用一次就行了。

上报数据转换

虽然我们的通信接口都是OTEL规范,但是有时候会上传到其他平台。比如目前指标领域的事实标准-Prometheus

Prometheus 其实数据结构是比较简单的,比如指标名只允许 [a-zA-Z_:][a-zA-Z0-9_:]* 这个规范。只有一层标签的概念,标签名必须满足 [a-zA-Z_][a-zA-Z0-9_]* ,并且只有一层。

参考: https://prometheus.io/docs/concepts/data_model/

OTEL就复杂多了,指标的结构如下:

// MetricsData
// └─── ResourceMetrics
//   ├── Resource
//   ├── SchemaURL
//   └── ScopeMetrics
//      ├── Scope
//      ├── SchemaURL
//      └── Metric
//         ├── Name
//         ├── Description
//         ├── Unit
//         └── data
//            ├── Gauge
//            ├── Sum
//            ├── Histogram
//            ├── ExponentialHistogram
//            └── Summary

里面的属性结构更加复杂:

//    Metric
//  +------------+
//  |name        |
//  |description |
//  |unit        |     +------------------------------------+
//  |data        |---> |Gauge, Sum, Histogram, Summary, ... |
//  +------------+     +------------------------------------+
//
//    Data [One of Gauge, Sum, Histogram, Summary, ...]
//  +-----------+
//  |...        |  // Metadata about the Data.
//  |points     |--+
//  +-----------+  |
//                 |      +---------------------------+
//                 |      |DataPoint 1                |
//                 v      |+------+------+   +------+ |
//              +-----+   ||label |label |...|label | |
//              |  1  |-->||value1|value2|...|valueN| |
//              +-----+   |+------+------+   +------+ |
//              |  .  |   |+-----+                    |
//              |  .  |   ||value|                    |
//              |  .  |   |+-----+                    |
//              |  .  |   +---------------------------+
//              |  .  |                   .
//              |  .  |                   .
//              |  .  |                   .
//              |  .  |   +---------------------------+
//              |  .  |   |DataPoint M                |
//              +-----+   |+------+------+   +------+ |
//              |  M  |-->||label |label |...|label | |
//              +-----+   ||value1|value2|...|valueN| |
//                        |+------+------+   +------+ |
//                        |+-----+                    |
//                        ||value|                    |
//                        |+-----+                    |
//                        +---------------------------+

那为什么要这么复杂呢? 我们举例一个场景,比如我们自己接入了指标上报,我们依赖的gRPC或者其他第三方组件也接入了可观测性。 那么怎么保证我们的上报点不冲突?这可以通过不同的Scope来区分。 然后同样在一个进程里,上报的数据会包含比如host信息,进程名字、业务类型等等和进程相关的信息。但是我们有2000个指标,那么用原始的 Prometheus 我们不得不把这些信息重复上报 2000 份。 而在OTEL里,只要在Resource去上报一次就行了。同理还有指标级共享和每个Point不同的数据区,也是能减少不必要的重复上报。

实际上对于进程级资源共享,PrometheusOpenCensus 都有一个方案。定义了一个 target_info 类型的特殊指标来承载这些信息。 同一个连接,target_info 只要上报一次即可。后面上报的指标自动关联这些属性。但是这个并不是所有平台都兼容,将具体是否可用还要咨询使用的平台。 有兴趣可以阅读 https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#resource-attributes-1

这里涉及数据标签的问题。另外,规范里还定义了一些特殊的单位转换,比如:

// Time
{"d", "days"},
{"h", "hours"},
{"min", "minutes"},
{"s", "seconds"},
{"ms", "milliseconds"},
{"us", "microseconds"},
{"ns", "nanoseconds"},
// Bytes
{"By", "bytes"},
{"KiBy", "kibibytes"},
{"MiBy", "mebibytes"},
{"GiBy", "gibibytes"},
{"TiBy", "tibibytes"},
{"KBy", "kilobytes"},
{"MBy", "megabytes"},
{"GBy", "gigabytes"},
{"TBy", "terabytes"},
{"By", "bytes"},
{"KBy", "kilobytes"},
{"MBy", "megabytes"},
{"GBy", "gigabytes"},
{"TBy", "terabytes"},
// SI
{"m", "meters"},
{"V", "volts"},
{"A", "amperes"},
{"J", "joules"},
{"W", "watts"},
{"g", "grams"},
// Misc
{"Cel", "celsius"},
{"Hz", "hertz"},
{"1", ""},
{"%", "percent"}

比如我们定义的一个OTEL的指标为 {name="abc", description="XXX", unit="%"} ,最后输出的 Prometheus 指标名是 abc_percent 。 这些规则还在演进变化中,另外早期的OTEL-CPP SDK有个BUG,没有设置单位的属性多余输出了下划线。 比如 {name="abc", description="XXX", unit=""} 对应的 Prometheus 指标名应该是 abc, 但是早期版本会使用 abc_

那为了抹平拉取聚合这一侧用户使用的复杂性,我们抽象了 SanitizePrometheusName 接口,保持和OTEL-CPP里一样的转换规则。 而提取指标名则是搞了个“奇技淫巧”,先创建一个虚假的指标。执行一次OTEL里的指标转换,在提取生成的指标名。代码如下:

  opentelemetry::sdk::metrics::ResourceMetrics fake_resource_metrics;
  opentelemetry::sdk::metrics::MetricData fake_metrics_data;
  opentelemetry::sdk::metrics::PointDataAttributes fake_point_data;
  auto fake_scope = opentelemetry::sdk::instrumentationscope::InstrumentationScope::Create("none");
  fake_metrics_data.aggregation_temporality = opentelemetry::sdk::metrics::AggregationTemporality::kCumulative;
  fake_metrics_data.instrument_descriptor.name_ = metrics_name_;
  fake_metrics_data.instrument_descriptor.description_ = metrics_description_;
  fake_metrics_data.instrument_descriptor.unit_ = metrics_unit_;
  fake_metrics_data.instrument_descriptor.value_type_ = opentelemetry::sdk::metrics::InstrumentValueType::kLong;

  // @see OtlpMetricUtils::GetAggregationType in otel-cpp
  switch (metrics_type_) {
    case PROJECT_NAMESPACE_ID::config::EN_HPA_POLICY_METRICS_TYPE_COUNTER: {
      fake_metrics_data.instrument_descriptor.type_ = opentelemetry::sdk::metrics::InstrumentType::kObservableCounter;
      fake_point_data.point_data = opentelemetry::sdk::metrics::SumPointData{};
      break;
    }
    default: {
      fake_metrics_data.instrument_descriptor.type_ = opentelemetry::sdk::metrics::InstrumentType::kObservableGauge;
      fake_point_data.point_data = opentelemetry::sdk::metrics::LastValuePointData{};
      break;
    }
  }
  fake_metrics_data.point_data_attr_.push_back(fake_point_data);
#if OPENTELEMETRY_VERSION_MAJOR * 1000 + OPENTELEMETRY_VERSION_MINOR >= 1012
  fake_resource_metrics.scope_metric_data_.push_back(
      {fake_scope.get(), std::vector<opentelemetry::sdk::metrics::MetricData>{fake_metrics_data}});
#else
  fake_resource_metrics.scope_metric_data_.push_back({fake_scope.get(), {fake_metrics_data}});
#endif
#if OPENTELEMETRY_VERSION_MAJOR * 1000 + OPENTELEMETRY_VERSION_MINOR < 1012
  auto prometheus_family =
      opentelemetry::exporter::metrics::PrometheusExporterUtils::TranslateToPrometheus(fake_resource_metrics);
#else
  auto prometheus_family =
      opentelemetry::exporter::metrics::PrometheusExporterUtils::TranslateToPrometheus(fake_resource_metrics, false);
#endif

最终我们提供给用户层的配置大概是这样。

- metrics_name: "ds_fps_gauge"
  metrics_unit: us
  metrics_description: "DS Runtime Fps Metrics"
  metrics_type: gauge
  aggregation: count
  aggregation_parameter:
  by:
    labels:
    - region_group
    - StandAlone
  simple_function:
  - last_over_time: 1m
  selectors:
  "PlayerInit": "true"
  "service_name": "DsMonitor"
  without_auto_selectors:
  - service_name
  - hpa_target_name
- metrics_name: "product_sales_order"
  metrics_unit: count
  metrics_description: "OrderSvr Product Sales Order Count"
  metrics_type: gauge
  aggregation: sum
  aggregation_parameter:
  by:
    labels:
    - area_code
    - market_id
    - product_id
  simple_function:
  - last_over_time: 1m
  without_auto_selectors:
  - service_name
  - hpa_target_name

Grafana的启发,我们也提供了一些基础的函数包装和聚合转换,并且自动注入一些聚合标签来做多环境隔离,避免用户直接配置PromQL,防止出错的可能性。

基础的部分采样方案

最后,我们提供给使用者一方的的指标接口就相当简单,比如交易行服务的IO毛刺监控:

// 一秒内IO数
rpc::telemetry::opentelemetry_utility::add_global_metics_observable_int64(
    rpc::telemetry::metrics_observable_type::kGauge, "trade_order", {"trade_order_second_max_io_count", "", ""},
    [](rpc::telemetry::opentelemetry_utility::metrics_observer& result) {
    if (global_order_manager::is_instance_destroyed()) {
        return;
    }
    metrics_global_data& metrics_data = global_order_manager::me()->metrics_data_;
    int64_t value = metrics_data.second_max_io_count.load(std::memory_order_acquire);
    metrics_data.second_max_io_count.store(metrics_data.second_current_max_io_count, std::memory_order_release);
    rpc::telemetry::opentelemetry_utility::global_metics_observe_record(result, value);
    });

2503-02.png

再展示个更复杂的动态多层级指标:

// 更新视图索引索引CPU消耗
rpc::telemetry::opentelemetry_utility::add_global_metics_observable_int64(
    rpc::telemetry::metrics_observable_type::kGauge, "trade_order", {"trade_order_slot_distribution", "", ""},
    [](rpc::telemetry::opentelemetry_utility::metrics_observer& result) {
    if (global_order_manager::is_instance_destroyed()) {
        return;
    }

    std::shared_ptr<::rpc::telemetry::group_type> lifetime;

    for (auto& area_data : global_order_manager::me()->metrics_data_.area_metric_data) {
        if (!area_data.second) {
        continue;
        }
        for (auto& order_type_data : area_data.second->order_type_metric) {
        if (!order_type_data.second) {
            continue;
        }

        for (auto& slot_dist : order_type_data.second->slot_distribution) {
            rpc::telemetry::trace_attribute_pair_type internal_attributes[] = {
                {"trade_area_code", area_data.first},
                {"trade_order_type", order_type_data.first},
                {"trade_order_slot_id", slot_dist}};

            rpc::telemetry::opentelemetry_utility::global_metics_observe_record_extend_attrubutes(
                result, 1, lifetime, internal_attributes);
        }
        }
    }
});

2503-03.png

最后

可观测性领域还有很多其他的优化和探索,后面有空慢慢分享吧。 也欢迎有兴趣的小伙伴们互相交流探讨。