惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

酷 壳 – CoolShell
酷 壳 – CoolShell
H
Hacker News: Front Page
P
Palo Alto Networks Blog
T
ThreatConnect
Apple Machine Learning Research
Apple Machine Learning Research
博客园_首页
T
True Tiger Recordings
P
Privacy & Cybersecurity Law Blog
B
Blog
IT之家
IT之家
Last Week in AI
Last Week in AI
F
Full Disclosure
Hacker News: Ask HN
Hacker News: Ask HN
C
Comments on: Blog
Microsoft Azure Blog
Microsoft Azure Blog
C
Cybersecurity and Infrastructure Security Agency CISA
Microsoft Security Blog
Microsoft Security Blog
博客园 - 【当耐特】
N
News and Events Feed by Topic
NISL@THU
NISL@THU
腾讯CDC
雷峰网
雷峰网
Security Latest
Security Latest
李成银的技术随笔
M
Microsoft Research Blog - Microsoft Research
L
LangChain Blog
L
Lohrmann on Cybersecurity
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
C
Check Point Blog
Y
Y Combinator Blog
Recent Announcements
Recent Announcements
博客园 - Franky
N
News | PayPal Newsroom
V
V2EX
A
About on SuperTechFans
The Register - Security
The Register - Security
月光博客
月光博客
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
Google Online Security Blog
Google Online Security Blog
MyScale Blog
MyScale Blog
Cisco Talos Blog
Cisco Talos Blog
Vercel News
Vercel News
WordPress大学
WordPress大学
C
Cyber Attacks, Cyber Crime and Cyber Security
The Hacker News
The Hacker News
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
爱范儿
爱范儿
A
Arctic Wolf
L
LINUX DO - 最新话题
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More

博客园 - 时空穿越者

java并发:深入解析 ThreadPoolExecutor.addWorker() 流水线技术解析:处理器重排序的硬件基础 java并发:synchronized 揭秘 java并发:再次认识一下Java中的锁 —— 类级别的锁是否存在? LangGraph:add_conditional_edges详解 Spring异步机制:@Async Spring BeanDefinition Spring Resource Spring之ApplicationContext Spring之BeanFactory:解析getBean()方法 Spring之IoC容器 Spring的整体架构 Spring Data JPA:解析CriteriaQuery Spring Data JPA:解析CriteriaBuilder Spring Data JPA:解析JpaSpecificationExecutor & Specification Spring Data JPA:解析SimpleJpaRepository java并发:线程池之Executors(ScheduledExecutorService篇) - 时空穿越者 java并发:线程池之饱和策略 java并发:线程池之ThreadPoolExecutor
java并发:管道流(Piped Streams)的应用场景
时空穿越者 · 2026-02-08 · via 博客园 - 时空穿越者

管道流(Piped Streams)是Java中用于线程间通信的特殊I/O流,它们通过创建"生产-消费"通道实现数据传输。

核心组件

image

连接方式

方式1:通过构造函数连接(推荐)

// 创建时直接连接
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream(pos); // 建立连接

// 或者反向连接
PipedInputStream pis2 = new PipedInputStream();
PipedOutputStream pos2 = new PipedOutputStream(pis2);

方式2:通过 connect() 方法连接

PipedWriter writer = new PipedWriter();
PipedReader reader = new PipedReader();

// 显式建立连接(必须在读写前调用)
reader.connect(writer); 
// 或 writer.connect(reader);

// 现在可以安全使用
writer.write("Data");
char[] buf = new char[4];
reader.read(buf); // 读取 "Data"

连接机制底层原理

管道流的连接在JVM层创建了一个循环缓冲区

graph LR
    Producer[生产者线程] -->|写入数据| Buffer[循环缓冲区]
    Buffer -->|读取数据| Consumer[消费者线程]

典型应用场景

(1)线程间数据流传递

当两个线程需要顺序处理数据流时,管道流可替代共享变量:

public class PipeConnectionDemo {
    public static void main(String[] args) throws IOException {
        // 创建未连接的流
        PipedWriter writer = new PipedWriter();
        PipedReader reader = new PipedReader();
        
        // 显式建立连接(关键步骤)
        writer.connect(reader); 
        
        Thread producer = new Thread(() -> {
            try {
                writer.write("Hello Pipes!");
                writer.close(); // 关闭流通知结束
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        
        Thread consumer = new Thread(() -> {
            try {
                int charRead;
                while ((charRead = reader.read()) != -1) {
                    System.out.print((char) charRead);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        
        producer.start();
        consumer.start();
    }
}

特点:

避免显式同步,通过流阻塞机制自然协调生产消费节奏

(2)自定义过滤器链

public class FilterChain {
    public static void main(String[] args) throws IOException {
        PipedWriter source = new PipedWriter();
        PipedReader sink = new PipedReader();
        
        // 创建处理链:A → B → C
        Filter filterA = new Filter(source, new PipedWriter());
        Filter filterB = new Filter(filterA.getOutputReader(), new PipedWriter());
        Filter filterC = new Filter(filterB.getOutputReader(), sink);
        
        // 启动过滤器线程
        filterA.start();
        filterB.start();
        filterC.start();
        
        // 写入原始数据
        source.write("Original data");
        source.close();
        
        // 从sink读取最终结果
        System.out.println(sink.read());
    }
}

class Filter extends Thread {
    private final PipedReader in;
    private final PipedWriter out;
    
    public Filter(PipedReader in, PipedWriter out) {
        this.in = in;
        this.out = out;
    }
    
    @Override
    public void run() {
        try {
            int c;
            while ((c = in.read()) != -1) {
                out.write(transform((char) c)); // 自定义转换逻辑
            }
            out.close();
        } catch (IOException e) { e.printStackTrace(); }
    }
    
    private char transform(char c) { ... } // 过滤逻辑
}

多管道连接模式

一对多广播

// 主生产者
PipedOutputStream mainOutput = new PipedOutputStream();

// 创建多个消费者管道
PipedInputStream consumer1 = new PipedInputStream(mainOutput);
PipedInputStream consumer2 = new PipedInputStream(mainOutput);

// 生产者写入的数据会被所有消费者读取
mainOutput.write("Broadcast".getBytes());

备注:实际应用中需考虑线程安全,建议配合BroadcastStream自定义实现

注意事项

必须显式连接:通过构造函数或connect()方法建立管道

缓冲区大小:默认1024字节,可通过PipedInputStream(int)构造函数调整

资源管理:使用try-with-resources确保关闭

try (PipedOutputStream pos = new PipedOutputStream();
     PipedInputStream pis = new PipedInputStream(pos)) {
    // 自动关闭双向流
}

总结

Java管道流的显式连接机制(构造函数或connect())是线程间流式通信的基础。

在多线程场景中,务必在启动线程前完成连接,并通过资源管理确保可靠关闭。