惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

C
Comments on: Blog
S
Schneier on Security
Microsoft Azure Blog
Microsoft Azure Blog
T
Tor Project blog
V
Visual Studio Blog
C
CXSECURITY Database RSS Feed - CXSecurity.com
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
Spread Privacy
Spread Privacy
月光博客
月光博客
罗磊的独立博客
Cisco Talos Blog
Cisco Talos Blog
P
Privacy International News Feed
T
Tenable Blog
阮一峰的网络日志
阮一峰的网络日志
AWS News Blog
AWS News Blog
T
ThreatConnect
博客园 - 三生石上(FineUI控件)
Recorded Future
Recorded Future
Hugging Face - Blog
Hugging Face - Blog
T
Tailwind CSS Blog
博客园 - 叶小钗
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
A
Arctic Wolf
L
LINUX DO - 最新话题
美团技术团队
大猫的无限游戏
大猫的无限游戏
I
Intezer
博客园 - 司徒正美
酷 壳 – CoolShell
酷 壳 – CoolShell
量子位
小众软件
小众软件
T
Threatpost
V
V2EX
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
宝玉的分享
宝玉的分享
The Register - Security
The Register - Security
Project Zero
Project Zero
J
Java Code Geeks
Cyberwarzone
Cyberwarzone
IT之家
IT之家
MyScale Blog
MyScale Blog
T
Threat Research - Cisco Blogs
T
The Blog of Author Tim Ferriss
腾讯CDC
S
SegmentFault 最新的问题
F
Fox-IT International blog
S
Security Archives - TechRepublic
Last Week in AI
Last Week in AI
G
GRAHAM CLULEY
M
MIT News - Artificial intelligence

郑文峰的博客

使用dify对接飞书多维表格 使用n8n对接飞书多维表格 服务启动时出现 OOM Bug 通缉令 一次服务升级时pg表DDL执行超时失败 Go语言高效IO缓冲技术详解 Go语言延迟初始化(Lazy Initialization)最佳实践 Go语言字符串拼接性能对比与优化指南 Go语言结构体内存对齐完全指南 Go语言空结构体:零内存消耗的高效编程 Go语言堆栈分配与逃逸分析深度解析 Go语言原子操作完全指南 Go语言内存预分配完全指南 Go语言不可变数据共享:无锁并发编程实践 Go语言零拷贝技术完全指南 Go语言遍历性能深度解析:从原理到优化实践 Go语言Interface Boxing原理与性能优化指南 Go协程池深度解析:原理、实现与最佳实践 使用etcd分布式锁导致的协程泄露与死锁问题 基于pre-commit的Python代码规范落地实践 初识 MCP Server pulsar阻塞导致logstash无法接入日志 django-prometheus使用及源码分析 kube-proxy源码分析 kubernetes service如何通过iptables转发 django-apschedule定时任务异常停止 理解calico容器网络通信方案原理 理解flannel的三种容器网络方案原理 理解Linux IPIP隧道 理解VXLAN网络 理解Linux TunTap设备 快速了解iptables kafka中listener和advertised.listeners的作用 django rest_framework 分页 django后端服务、logstash和flink接入VictoriaMetrics指标监控 python中import原理 docker容器单机网络 手动实现docker容器bridge网络模型 mysql之MVCC原理 mysql之日志 使用java开发logstash的filter插件 使用python实现单例模式的三种方式 redis之缓存 redis之分片集群 redis之哨兵机制 redis之主从库同步 redis之持久化 redis之五种基本数据类型 go中如何处理error pod中将代码与运行环境分离 友链 ddt源码分析 python装饰器的使用方法 读书笔记:如何阅读一本书 使用ddt实现unittest的参数化测试 分布式锁 使用kubeadm安装k8s 优化gin表单的错误提示信息 gin中validator模块的源码分析 go简单使用grpc python简单使用grpc k8s之PV、PVC和StorageClass k8s之StatefulSet k8s之DaemonSet k8s之Job和CronJob k8s之ConfigMap和Secret k8s之Service k8s之Pod k8s之Deployment 容器的本质 docker容器 python迭代器与生成器 python元编程 python垃圾回收机制 python上下文管理器 django rest_framework使用jwt django rest_framework异常处理 django rest_framework 自定义文档 django压缩文件下载 django rest_framework使用pytest单元测试 django restframework choice 自定义输出数据 django Filtering 使用 django viewset 和 Router 配合使用时报的错 django model的序列化 django中使用AbStractUser django.core.exceptions.ImproperlyConfigured Application labels aren't unique, duplicates users django 中 media配置 django 外键引用自身和on_delete参数 django 警告 while time zone support is active Flask使用flask_socketio实现websocket flask结合mongo tornado 文件上传 tornado 使用jwt完成用户异步认证 tornado 用户密码 bcrypt加密 tornado 结合wtforms使用表单操作 tornado finish和write区别 tornado 使用peewee-async 完成异步orm数据库操作 pyspark streaming简介 和 消费 kafka示例 使用hue创建ozzie的pyspark action workflow count的性能优化
tcp缓存引起的日志丢失
2023-11-09 · via 郑文峰的博客

# 背景

logstash从数据源拉取日志,然后通过tcp插件发送到proxy进程中。在业务侧发现日志量明显少了,所以有了这一次的问题排查。

# 问题排查定位

首先从logstash侧开始检查。我们先看logstash的日志,没有明显的报错信息。

然后再查看logstash管道的状态。可以很明显的看到,在output管道中,in远远大于out,也就是logstash拉取的日志已经到了output管道,但是无法输出出去,并且duration_in_millis时间很长,这个代表着发出去的速率很慢,这是什么原因呢?

curl -XGET 'localhost:9600/_node/stats/pipelines/azure_event_hubs?pretty'

{
    ...
"outputs" : [ {
        "id" : "99b12e190d297be5d6113d04cf10089a3dccbaef7eed0cc41515e8e5af5f4595",
        "name" : "tcp",
        "events" : {
        "in" : 341,
        "out" : 69,
        "duration_in_millis" : 519709
        }
    } 
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

要么是发送方的原因,要么是接收方的原因。我先从发送方进行排查,我在output管道中,除了tcp插件外,还添加了stdout插件,也就是日志来了除了会通过tcp发送外,还会打印在标准输出中。

output {

    tcp {
        ...
    }

    stdout {}

}

1
2
3
4
5
6
7
8
9
10

然后等待一段时间,然后再查看该管道的信息,stdout插件的in和out完全相等,但tcp插件in和out还是相差甚大,也就是output管道应该没问题。

我再假设proxy端有问题。日志是可以从logstash端发送到proxy端的,只是很慢,并且还有其他数据源也在往proxy端发送日志,也没有这个问题,所以我突然想到,该数据源的日志很大,会不会是这个原因导致的呢?

我从上面标准输出中抓了一条日志出来,134k大小,然后我手动的用nc命令将日志发送到proxy,因为日志很大,我是将日志写入到文件,然后再用管道的方式发送的

cat test.txt | nc 

1

通过查看proxy的日志发现,其根本没有收到该条日志。那么问题原因找到了,就是因为日志太大,导致日志发生了丢失。

# 代码排查

proxy服务的是golang写的,通过查看代码,这里使用了bufio.NewScanner来循环读取连接中的数据。

	scanner := bufio.NewScanner(conn)

	for scanner.Scan() {
		// 处理数据
		msg := scanner.Text()
        ...

1
2
3
4
5
6

查看NewScanner方法可以看到有一个maxTokenSize参数,然后用的默认值MaxScanTokenSize

func NewScanner(r io.Reader) *Scanner {
	return &Scanner{
		r:            r,
		split:        ScanLines,
		maxTokenSize: MaxScanTokenSize,
	}
}

1
2
3
4
5
6
7

再跳转,有一个初始化缓存大小startBufSize为4k和最大的缓存大小MaxScanTokenSize为64k。但是我们的日志大小为134k,已经大于最大大小了,所以无法接收到该日志,也就是因为这个原因导致了日志发生了丢失。

const (
	MaxScanTokenSize = 64 * 1024

	startBufSize = 4096
)

1
2
3
4
5

我们再看下Scan方法,有一段代码如下,如果拿到的数据的大小大于maxTokenSize,则会使用s.setErr(ErrTooLong)记录错误,然后返回false


func (s *Scanner) Scan() bool {

    ..
    const maxInt = int(^uint(0) >> 1)
    if len(s.buf) >= s.maxTokenSize || len(s.buf) > maxInt/2 {
        s.setErr(ErrTooLong)
        return false
    }
    newSize := len(s.buf) * 2
    if newSize == 0 {
        newSize = startBufSize
    }
    if newSize > s.maxTokenSize {
        newSize = s.maxTokenSize
    }
    newBuf := make([]byte, newSize)
    copy(newBuf, s.buf[s.start:s.end])
    ...

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

但是我们在业务代码中,并没有判断该错误,也就是如果Scan方法虽然返回了false,循环结束了,但是并没有任何错误信息。也就是无法发现该问题。

# 解决方法

  1. 将TCP的最大缓存大小修改为配置文件可配置的,这样如果日志很大,可以修改配置增大缓存上限。库中有提供Buffer方法来设置该上限。

  2. 在Scan发生错误时,打印错误日志,代码如下:


scanner := bufio.NewScanner(conn)

for scanner.Scan() {
    // 处理数据
    msg := scanner.Text()
    ...

if err := scanner.Err(); err != nil {
    log.Errorf("扫描输入时发生错误:%s", err)
}

1
2
3
4
5
6
7
8
9
10
11

# 总结

  1. 要提高自己的排查的手段,熟悉组件提供的排查机制,让你事半功倍。
  2. 每一个提供的参数都至关重要,所以我们都需要有一定的理解,可以减少BUG的发生