惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

F
Full Disclosure
V2EX - 技术
V2EX - 技术
The Register - Security
The Register - Security
H
Help Net Security
S
SegmentFault 最新的问题
宝玉的分享
宝玉的分享
Recorded Future
Recorded Future
GbyAI
GbyAI
Recent Announcements
Recent Announcements
T
Tailwind CSS Blog
MyScale Blog
MyScale Blog
L
LangChain Blog
D
DataBreaches.Net
M
MIT News - Artificial intelligence
雷峰网
雷峰网
WordPress大学
WordPress大学
Google DeepMind News
Google DeepMind News
Y
Y Combinator Blog
Apple Machine Learning Research
Apple Machine Learning Research
H
Hackread – Cybersecurity News, Data Breaches, AI and More
博客园 - 司徒正美
C
Check Point Blog
T
The Blog of Author Tim Ferriss
F
Fortinet All Blogs
Microsoft Security Blog
Microsoft Security Blog
T
The Exploit Database - CXSecurity.com
G
Google Developers Blog
博客园 - 聂微东
MongoDB | Blog
MongoDB | Blog
Blog — PlanetScale
Blog — PlanetScale
D
Darknet – Hacking Tools, Hacker News & Cyber Security
P
Palo Alto Networks Blog
有赞技术团队
有赞技术团队
Attack and Defense Labs
Attack and Defense Labs
N
News | PayPal Newsroom
V
V2EX
T
Troy Hunt's Blog
N
News and Events Feed by Topic
The GitHub Blog
The GitHub Blog
Webroot Blog
Webroot Blog
The Hacker News
The Hacker News
I
InfoQ
L
LINUX DO - 最新话题
AWS News Blog
AWS News Blog
美团技术团队
博客园 - 叶小钗
SecWiki News
SecWiki News
G
GRAHAM CLULEY
Vercel News
Vercel News
A
About on SuperTechFans

DEV Community

Authentication Security Deep Dive: From Brute Force to Salted Hashing (With Java Examples) Why AI Systems Don’t Fail — They Drift Spilling beans for how i learn for exam😁"Reinforcement Learning Cheat Sheet" I Replaced Chrome with Safari for AI Browser Automation. Here's What Broke (and What Finally Worked) How Python Borrows Other People's Work The $40 Architecture: Processing 1 Billion API Requests with 99.99% Uptime Vibe Coding: A Workflow Guide (From Zero to SaaS) Most webhook security guides protect the wrong side. The scary part is delivery. Headless CMS for TanStack Start: Build a Blog with Cosmic EU Age Verification App "Hacked in 2 Minutes" — What Actually Happened Comfy Cloud’s delete function does not actually remove files Running AI Models on GPU Cloud Servers: A Beginner Guide Event-driven media intelligence with AWS Step Functions and Bedrock I scored 500 AI prompts across 8 quality dimensions — here's what broke How to Call Google Gemini API from Next.js (Free Tier, No Backend Needed) The Portal Protocol: Reclaiming Human Connection in the Age of AI How to Fix Your Team's Scattered Knowledge Problem With a Self-Hosted Forum Intro to tc Cloud Functors: A Graph-First Mental Model for the Modern Cloud Designing Multi-Tenant Backends With Both Ownership and Team Access I Built a Neumorphic CSS Library with 77+ Components — Here's What I Learned PostgreSQL Performance Optimization: Why Connection Pooling Is Critical at Scale Cómo construí un SaaS multi-rubro para gestionar expensas en Argentina con FastAPI + Vue 3 🚀 I Built an Ethical Hacking Scanner Tool – Open Source Project I Replaced /usage and /context in Claude Code With a Single Statusline A Pythonic Way to Handle Emails (IMAP/SMTP) with Auto-Discovery and AI-Ready Design I Collected 8.9 Million Polymarket Price Points — Here's What I Found About How Markets Really Move EcoTrack AI — Carbon Footprint Tracker & Dashboard Everyone's Using AI. No One Agrees How. 5 self-hosted ebook managers worth trying in 2026 Building Your First AI Agent with LangChain: From Chatbot to Autonomous Assistant Common SOC 2 Failures (Real World) Stop Vibe-Checking Your AI App: A Practical Guide to Evals How to Use SonarQube and SonarScanner Locally to Level Up Your Code Quality Your Next To-Do App Is Dead — I Replaced Mine with an OpenClaw AI Sign a Nostr event in 60 lines of Python using coincurve — no nostr-sdk, no nbxplorer, no rust toolchain ITGC Audit Explained Like You’re in Big 4 Patch Tuesday abril 2026: Microsoft parcha 163 vulnerabilidades y un zero-day en SharePoint Stop scraping everything: a better way to track competitor price changes Listing on MCPize + the Official MCP Registry while routing payments OUTSIDE the marketplace — how I kept 100% of my x402 revenue Building an AI-Powered Risk Intelligence System Using Serverless Architecture Why We Ripped Function Overloading Out of Our AI Toolchain Testing AI-Generated Code: How to Actually Know If It Works SaaS Churn Is Killing Your Business. Here Is What to Do About It (Without a Support Team) The Speed of AI Is No Longer Linear - And Self-Improving Models Are Why How to Implement RBAC for MCP Tools: A Practical Guide for Engineering Teams From Standard Quote to Persuasive Proposal: AI Automation for Arborists I built a CLI that scaffolds complete multi-tenant SaaS apps Axios CVE-2025–62718: The Silent SSRF Bug That Could Be Hiding in Your Node.js App Right Now The dashboard that ended our friendship Data Pipelines Explained Simply (and How to Build Them with Python) The Hidden Cost of AI Systems Nobody Talks About. undefined vs undeclared, and how typeof behaves Switching from file-based jobs to NATS/Kafka in Rust without changing code io_uring Adventures: Rust Servers That Love Syscalls Why Agentic AI is Killing the Traditional Database The POUR principles of web accessibility for developers and designers Quantum Neural Network 3D — A Deep Dive into Interactive WebGL Visualization How To Install Caveman In Codex On macOS And Windows Automation Pipeline Reliability: Why Your Workflow Breaks When Nobody Is Watching I Built an 'Open World' AI Coding Agent — It Works From ANY Folder From Freelancing to Product: A Tech Service Company's SaaS Transformation China's AI Giants: Adding Tencent Hunyuan & ByteDance Doubao to AI University (74 Providers) On the Vibe Coders and Their Lies clerk: Auto-Summarize Your Claude Code Sessions AI Weekly — 2026/04/10–04/17 | The Model Lockdown Is Here, but the Toolchain Is the Real Battleground AI 週報 — 2026/04/10–2026/04/17 模型封鎖潮來了,但工具鏈才是真戰場 Maybe this is how Open-Source apps are born... 🚀 Fine-Tune LLMs with LoRA and QLoRA: 2026 Guide tRPC v11 + Next.js App Router: End-to-End Type Safety Without the Boilerplate ShadCN UI in 2026: Why I Stopped Installing Component Libraries and Started Owning My Components SaaS Billing in React Server Components: Stripe + Supabase Without a Single `useEffect` Join our DEV Weekend Challenge — $1,000 in Prizes Across TEN winners! Submissions Due April 20 at 6:59 AM UTC. Implementing FSRS Spaced Repetition in Flutter + Supabase — Adding Memory Science to an AI Learning App "I Texted My Localhost From the Train — Claude Code Fixed the Bug Before I Got Home" I Built a Sales Prep AI and It Went Deeper Than Expected Design to Code #2: One JSON, Eleven Outputs Solving the 100M-Row Problem: A Summary Table Pattern for High-Volume Push Notification Logs Flutter Web With Wasm: What Actually Changes For Developers I Built 50 Royalty-Free Soundtracks for My Side Project in a Weekend Using AI Music Generation The Vibe Coding Security Checklist: 7 Things to Check Before You Ship Stop Letting Googlebot Guess Fix Your React App's SEO Right Desconstruindo o Streaming do LinkedIn: Como Criar um Engine de Extração de Vídeo de Alta Performance com HLS e FFmpeg (EDA Part-1) EDA (Exploratory Data Analysis) Explained With Real Life — Why Looking at Your Data Is the Most Important Step in Machine Learning Brand Relationship Management at Scale: Our 4-Touch Outreach System for 200+ Brands Why String.fromEnvironment() Might Return an Empty String in Dart JGuardrails 1.0.0 — Hardening Java LLM Apps Against Jailbreaks, Toxicity, and Prompt Injection Plan and Schedule a Full Week of Threads Content From One Claude Conversation Coding Cat Oran Ep3, Five Tables Changed Everything Updated: BFF Pattern I'm done watching freelancers get buried by 200 proposals. So I'm building the alternative. This is my first post BFS Algorithm in Java Step by Step Tutorial with Examples Tracking LLM Pricing Monthly: An Open Dataset for 22 AI Models How We Measure Content ROI on a Comparison Site: Revenue Attribution Without Perfect Data Introducing Nova AI Ops: The AI-Native Operating System for SRE Teams I built a free desktop video downloader for Windows — Grabbit How Talkie OCR Helps Vision-Impaired & Dyslexic Users Read the World Around Them VRCFaceTracking安装和iPhone面捕配置教程,有bug Even CrowdStrike Can't See Your Agents The Automation Gold Rush: What n8n Workflows and Claude Are Opening Up for Developers Right Now
Building Fail-Safes for Incomplete LLM Responses in Laravel Echo
Dewald Hugo · 2026-05-15 · via DEV Community

Broadcasting LLM token streams through Laravel Echo feels elegant right up until the moment it silently dies halfway through a response. No error. No terminal event. Just a client sitting there, waiting for tokens that will never arrive.

We hit this in production during a multi-user document generation feature. The Pusher connection degraded during a particularly long Anthropic response. The queue job had no idea the client had disconnected, and the user stared at a spinner for three minutes before refreshing. The partial response was gone. No retry surface. No recovery path. The incident report had three action items and none of them were obvious beforehand.

That experience shaped the *Laravel LLM streaming fail-safe* architecture this article covers. Every pattern below is oriented toward the specific failure modes that broadcasting-based LLM streams introduce, and in several cases those failure modes are different from what you encounter with SSE.

One framing note before we start. Laravel Echo names a client-side JavaScript library for subscribing to broadcast channels. It is not an SSE wrapper. The architecture here is: a queued job calls the LLM API, iterates the stream, and broadcasts each token as a private channel event. Echo subscribes on the client. This pattern earns its weight when you need multiple subscribers on the same stream (team collaboration, agent monitoring, admin oversight), or when you are already running Reverb or Pusher in your stack. The tradeoffs between Livewire, SSE, and WebSockets for AI streaming are worth understanding before committing to this path. If you only have one client per stream, SSE is simpler and has materially lower infrastructure overhead.

Why Incomplete Streams Are More Dangerous in Echo

SSE gives you a persistent, unidirectional HTTP connection with browser-native reconnect semantics. When the connection drops, the browser reconnects and sends Last-Event-ID. You have backpressure. You know whether the connection is alive.

Echo over WebSockets gives you none of that for free. The WebSocket channel stays open even when the underlying LLM stream on the server has failed. The client shows “connected” while the queue job is dead or retrying. Pusher and Reverb will accept broadcast events and deliver them on arrival, but they will not tell your client that no more events are coming.

That gap is exactly where incomplete responses live. Four categories to prepare for.

Token limit truncation. The LLM hits max_tokens and stops mid-sentence. The finish_reason comes back as max_tokens, not end_turn. If your client only listens for a generic “done” signal without inspecting the finish reason, a truncated response renders as complete. Users won’t know. You won’t know until they complain.

Queue job failure. The job throws after token 47. Laravel retries it. The client has already rendered tokens 1 through 47 and will now receive them again from the retry run, duplicated, in the same Echo channel.

Silent connection drop. The WebSocket drops between the server and Pusher, or between Pusher and the client. Events broadcast during the gap are gone. The job may complete successfully on the server and broadcast a terminal event that the client never receives.

Orphaned streams. The queue worker is killed mid-flight by an OOM event or a forced deployment restart. The stream record in your database stays in streaming status indefinitely, because nothing transitions it out.

Each one needs a different fix. Build the layers in order.

The Architecture Baseline

The core pattern is a queued job that streams from the LLM API and broadcasts each token as a private channel event scoped to a unique stream_id. Every event carries a monotonically increasing sequence number. That sequence number is the foundation of every fail-safe that follows.

Start with the broadcast event. The Laravel Broadcasting documentation covers private channel authorization in full, but the event structure below is what drives the client-side recovery logic.

<?php

namespace App\Events;

use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;

class LlmTokenReceived implements ShouldBroadcast
{
    public function __construct(
        public readonly string  $streamId,
        public readonly string  $token,
        public readonly int     $sequence,
        public readonly string  $status,       // 'streaming' | 'complete' | 'truncated' | 'error' | 'dead'
        public readonly ?string $finishReason = null,
    ) {}

    public function broadcastOn(): PrivateChannel
    {
        return new PrivateChannel("stream.{$this->streamId}");
    }

    public function broadcastAs(): string
    {
        return 'token.received';
    }

    public function broadcastWith(): array
    {
        return [
            'stream_id'     => $this->streamId,
            'token'         => $this->token,
            'sequence'      => $this->sequence,
            'status'        => $this->status,
            'finish_reason' => $this->finishReason,
        ];
    }
}

Enter fullscreen mode Exit fullscreen mode

The status field carries the terminal state explicitly. complete means the LLM finished naturally (end_turn). truncated means it hit a token limit. error and dead mean infrastructure or retry failure. The client does not guess. The server tells it.

The database layer makes recovery possible at all:

<?php

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class extends Migration
{
    public function up(): void
    {
        Schema::create('llm_streams', function (Blueprint $table) {
            $table->id();
            $table->string('stream_id')->unique();
            $table->foreignId('user_id')->constrained()->cascadeOnDelete();
            $table->text('prompt');
            $table->enum('status', ['pending', 'streaming', 'complete', 'truncated', 'error', 'dead'])
                  ->default('pending');
            $table->longText('partial_content')->nullable();
            $table->longText('final_content')->nullable();
            $table->unsignedInteger('last_sequence')->default(0);
            $table->string('finish_reason')->nullable();
            $table->text('error_message')->nullable();
            $table->timestamp('started_at')->nullable();
            $table->timestamp('last_checkpoint_at')->nullable();
            $table->timestamp('completed_at')->nullable();
            $table->timestamp('failed_at')->nullable();
            $table->timestamps();

            $table->index(['status', 'last_checkpoint_at']); // orphan detection
            $table->index(['user_id', 'status']);
        });
    }
};

Enter fullscreen mode Exit fullscreen mode

Server-Side Fail-Safes

The queue job carries most of the server-side protection. Three responsibilities: stream tokens and broadcast them with sequence numbers, write periodic checkpoints so recovery is possible, and guarantee a terminal broadcast event regardless of how the job ends.

The Anthropic Messages Streaming documentation defines the event types used in the iterator below. The message_stop event is the authoritative signal that the LLM has finished, and its stop_reason field is what you map to your own status enum.

<?php

namespace App\Jobs;

use App\Events\LlmTokenReceived;
use App\Models\LlmStream;
use Anthropic\Laravel\Facades\Anthropic;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;

class StreamLlmResponseJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $timeout = 120;
    public int $tries   = 3;
    public int $backoff = 10;

    public function __construct(
        private readonly string $streamId,
        private readonly string $prompt,
        private readonly int    $userId,
    ) {}

    public function handle(): void
    {
        $sequence  = 0;
        $buffer    = '';
        $terminated = false;

        LlmStream::where('stream_id', $this->streamId)
            ->update(['status' => 'streaming', 'started_at' => now()]);

        try {
            $stream = Anthropic::messages()->stream([
                'model'      => 'claude-sonnet-4-5',
                'max_tokens' => 4096,
                'messages'   => [
                    ['role' => 'user', 'content' => $this->prompt],
                ],
            ]);

            foreach ($stream as $event) {
                if ($event->type === 'content_block_delta' && isset($event->delta->text)) {
                    $token   = $event->delta->text;
                    $buffer .= $token;
                    $sequence++;

                    broadcast(new LlmTokenReceived(
                        streamId: $this->streamId,
                        token:    $token,
                        sequence: $sequence,
                        status:   'streaming',
                    ));

                    if ($sequence % 50 === 0) {
                        $this->checkpoint($buffer, $sequence);
                    }
                }

                if ($event->type === 'message_stop') {
                    $finishReason = $event->message?->stop_reason ?? 'unknown';
                    $status       = $finishReason === 'end_turn' ? 'complete' : 'truncated';
                    $terminated   = true;
                    $sequence++;

                    broadcast(new LlmTokenReceived(
                        streamId:     $this->streamId,
                        token:        '',
                        sequence:     $sequence,
                        status:       $status,
                        finishReason: $finishReason,
                    ));

                    LlmStream::where('stream_id', $this->streamId)->update([
                        'status'        => $status,
                        'finish_reason' => $finishReason,
                        'final_content' => $buffer,
                        'last_sequence' => $sequence,
                        'completed_at'  => now(),
                    ]);
                }
            }
        } catch (Throwable $e) {
            if (! $terminated) {
                $sequence++;

                broadcast(new LlmTokenReceived(
                    streamId:     $this->streamId,
                    token:        '',
                    sequence:     $sequence,
                    status:       'error',
                    finishReason: 'exception',
                ));

                LlmStream::where('stream_id', $this->streamId)->update([
                    'status'          => 'error',
                    'error_message'   => $e->getMessage(),
                    'last_sequence'   => $sequence,
                    'partial_content' => $buffer,
                    'failed_at'       => now(),
                ]);
            }

            throw $e; // preserve queue retry behaviour
        }
    }

    private function checkpoint(string $buffer, int $sequence): void
    {
        LlmStream::where('stream_id', $this->streamId)->update([
            'partial_content'    => $buffer,
            'last_sequence'      => $sequence,
            'last_checkpoint_at' => now(),
        ]);
    }

    public function failed(Throwable $exception): void
    {
        LlmStream::where('stream_id', $this->streamId)->update([
            'status'        => 'dead',
            'error_message' => $exception->getMessage(),
            'failed_at'     => now(),
        ]);

        // Last-resort broadcast after all retries are exhausted.
        // PHP_INT_MAX sequence guarantees the client accepts this regardless
        // of how many gaps have accumulated.
        broadcast(new LlmTokenReceived(
            streamId:     $this->streamId,
            token:        '',
            sequence:     PHP_INT_MAX,
            status:       'dead',
            finishReason: 'retries_exhausted',
        ));
    }
}

Enter fullscreen mode Exit fullscreen mode

Three decisions in this code are worth calling out explicitly.

The $terminated flag prevents double-broadcasting. If message_stop fires and then the foreach cleanup throws, you do not want a second terminal event with status: 'error' landing after you have already broadcast status: 'complete'. Clients will not handle that gracefully.

The throw $e at the end of the catch block is intentional. Removing it tells Laravel the job succeeded, killing retry behaviour. The partial broadcast for error is informational to the client. The throw is what keeps the queue system honest.

The failed() hook runs after all retry attempts are exhausted and is your last-resort client notification. This method is called by Laravel’s Queue Worker, not by your own code.

[Edge Case Alert] If you are running Laravel Reverb rather than Pusher, broadcasts inside failed() may not deliver if Reverb is also under load or restarting at the same time as your queue worker. failed() is best-effort in that scenario. The orphan detection command below is your backstop.

Client-Side Fail-Safes

Most teams spend three lines on the Echo subscription and zero lines on what happens when it goes quiet. The watchdog and sequence tracking below are the part that actually makes this production-safe.

class LlmStreamClient {
    constructor(streamId, options = {}) {
        this.streamId  = streamId;
        this.tokens    = [];
        this.lastSeq   = 0;
        this.gaps      = [];
        this.watchdog  = null;
        this.silenceMs = options.silenceMs  ?? 10000;
        this.onToken   = options.onToken    ?? (() => {});
        this.onComplete = options.onComplete ?? (() => {});
        this.onError   = options.onError    ?? (() => {});
        this.channel   = null;
    }

    subscribe() {
        this.channel = window.Echo
            .private(`stream.${this.streamId}`)
            .listen('.token.received', (e) => {
                this.resetWatchdog();
                this.handleEvent(e);
            });

        this.startWatchdog();
        return this;
    }

    handleEvent({ sequence, token, status, finish_reason: finishReason }) {
        if (sequence > this.lastSeq + 1) {
            this.gaps.push({ expected: this.lastSeq + 1, received: sequence });
        }
        this.lastSeq = Math.max(this.lastSeq, sequence);

        if (status === 'streaming') {
            this.tokens.push(token);
            this.onToken(token, this.tokens.join(''));
            return;
        }

        this.clearWatchdog();
        const partial = this.tokens.join('');

        if (status === 'complete') {
            this.onComplete({ content: partial, gaps: this.gaps, finishReason });
        } else {
            this.onError({ type: status, partial, gaps: this.gaps, finishReason });
        }

        this.teardown();
    }

    startWatchdog() {
        this.watchdog = setTimeout(() => {
            this.onError({
                type:    'client_timeout',
                partial: this.tokens.join(''),
                message: `No event received for ${this.silenceMs}ms`,
                gaps:    this.gaps,
            });
            this.teardown();
        }, this.silenceMs);
    }

    resetWatchdog() { this.clearWatchdog(); this.startWatchdog(); }

    clearWatchdog() {
        if (this.watchdog) { clearTimeout(this.watchdog); this.watchdog = null; }
    }

    teardown() {
        this.clearWatchdog();
        if (this.channel) {
            window.Echo.leave(`stream.${this.streamId}`);
            this.channel = null;
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Usage with the full error surface handled:

const client = new LlmStreamClient('abc-123', {
    silenceMs: 10000,

    onToken: (token, full) => {
        document.getElementById('output').textContent = full;
    },

    onComplete: ({ content, gaps, finishReason }) => {
        if (gaps.length > 0) {
            console.warn('Stream gaps detected — some tokens may be missing.', gaps);
        }
        saveResponse(content);
    },

    onError: ({ type, partial, finishReason }) => {
        savePartial(partial); // always persist what arrived

        if (type === 'truncated') {
            showRetryButton('Response was cut off. Retry to continue.');
        } else if (type === 'client_timeout') {
            showRetryButton('Connection timed out. Retry when ready.');
        } else {
            showError('Something went wrong. Your partial response has been saved.');
        }
    },
});

client.subscribe();

Enter fullscreen mode Exit fullscreen mode

The watchdog fires after silenceMs of silence. Ten seconds without a token means something is wrong: the job died, the Pusher connection degraded, or the LLM API paused for an unusually long time between tokens. All three warrant user feedback.

[Production Pitfall] Do not set silenceMs below 8000. Claude’s API pauses several seconds between tokens during complex structured outputs before streaming resumes. We set ours to 5000 initially and got false timeout fires on longer document generation tasks. 10000 is the minimum safe floor for production. If you are generating very long documents, consider 15000.

The gaps array is diagnostic, not restorative. Pusher and Reverb do not offer event replay for private channels. Gap detection tells you whether a broadcast delivery problem contributed to an incomplete response, which helps you distinguish API truncation from infrastructure failure. Log it to your analytics or error tracker on every onComplete call.

Detecting and Recovering Orphaned Streams

The orphan case does not surface at runtime. The queue worker is killed. The stream stays in streaming status. The client timed out via the watchdog and showed the retry UI. But nothing tells the database the stream is dead.

Checkpointing every 50 tokens gives you a heartbeat. An active stream always updates last_checkpoint_at within a predictable window. Silence beyond that window is evidence of death.

Register the cleanup command in routes/console.php:

<?php

use Illuminate\Support\Facades\Schedule;

Schedule::command('streams:reap-orphans')->everyFiveMinutes();

Enter fullscreen mode Exit fullscreen mode

The command itself:

<?php

namespace App\Console\Commands;

use App\Events\LlmTokenReceived;
use App\Models\LlmStream;
use Illuminate\Console\Command;

class ReapOrphanedStreams extends Command
{
    protected $signature   = 'streams:reap-orphans';
    protected $description = 'Mark streams with no checkpoint activity as orphaned.';

    public function handle(): void
    {
        $orphans = LlmStream::query()
            ->where('status', 'streaming')
            ->where(function ($q) {
                $q->where('last_checkpoint_at', '<', now()->subMinutes(2))
                  ->orWhere(function ($q2) {
                      $q2->whereNull('last_checkpoint_at')
                         ->where('started_at', '<', now()->subMinutes(2));
                  });
            })
            ->get();

        foreach ($orphans as $stream) {
            $stream->update([
                'status'        => 'error',
                'error_message' => 'Orphaned: no checkpoint activity for 2+ minutes.',
                'failed_at'     => now(),
            ]);

            broadcast(new LlmTokenReceived(
                streamId:     $stream->stream_id,
                token:        '',
                sequence:     $stream->last_sequence + 1,
                status:       'error',
                finishReason: 'orphaned',
            ));

            $this->line("Reaped: {$stream->stream_id}");
        }

        $this->info("Reaped {$orphans->count()} orphaned stream(s).");
    }
}

Enter fullscreen mode Exit fullscreen mode

Two minutes is conservative. On a fast model with short prompts, 50 tokens arrive in seconds. If you are streaming long documents, increase the reap window to match, or reduce the checkpoint interval from 50 to 20 tokens.

[Architect’s Note] The last_checkpoint_at column is a poor man’s distributed heartbeat, and that is not a criticism. This is the same liveness-detection pattern event-driven systems have used for decades. If you later move to Laravel Horizon with job event hooks, you can correlate orphan detection directly with actual queue worker failures and remove the scheduler command. For most setups, the scheduled Artisan command approach costs nothing and requires no additional infrastructure.

Retry Strategy

When a stream ends in error, truncated, or dead, your user needs a retry button. Always generate a new stream_id for each retry. Reusing the original stream_id means the retry job broadcasts into the same channel the client already abandoned, and the partial content from attempt one gets mixed with attempt two in your database record. Neither is recoverable cleanly.

<?php

namespace App\Http\Controllers;

use App\Jobs\StreamLlmResponseJob;
use App\Models\LlmStream;
use Illuminate\Http\JsonResponse;
use Illuminate\Http\Request;
use Illuminate\Support\Str;

class StreamRetryController extends Controller
{
    public function __invoke(Request $request, string $streamId): JsonResponse
    {
        $original = LlmStream::query()
            ->where('stream_id', $streamId)
            ->where('user_id', $request->user()->id)
            ->whereIn('status', ['error', 'truncated', 'dead'])
            ->firstOrFail();

        $newStreamId = (string) Str::uuid();

        LlmStream::create([
            'stream_id' => $newStreamId,
            'user_id'   => $request->user()->id,
            'prompt'    => $original->prompt,
            'status'    => 'pending',
        ]);

        StreamLlmResponseJob::dispatch($newStreamId, $original->prompt, $request->user()->id);

        return response()->json(['stream_id' => $newStreamId]);
    }
}

Enter fullscreen mode Exit fullscreen mode

Route registration:

<?php

use App\Http\Controllers\StreamRetryController;
use Illuminate\Support\Facades\Route;

Route::middleware('auth:sanctum')->group(function () {
    Route::post('/streams/{streamId}/retry', StreamRetryController::class);
});

Enter fullscreen mode Exit fullscreen mode

The original stream record stays in the database with its status and partial_content intact for audit purposes. The retry creates a clean record. The client receives the new stream_id in the JSON response and calls client.subscribe() fresh.

If your streaming endpoints are not locked down with proper token-based authentication, the retry endpoint becomes an unauthenticated job dispatch surface. Sanctum’s API token middleware is the right guard here, not session auth, because the retry call typically originates from JavaScript, not a form submission.

The Service Layer Is Not This Job’s Problem

The job above handles the transport layer. It does not handle token budget management, provider fallback, cost attribution, or prompt construction. Those belong one layer up.

If you are building this into a larger system, the production AI architecture for Laravel covers the contracts and driver abstraction layer that should wrap around the streaming job described here. The job should be thin: stream, checkpoint, broadcast. Everything else belongs in the service it calls.

That separation also matters when your streamed output is structured. Schema validation against LLM hallucinations belongs at the service layer. A truncated JSON response is harder to recover from than truncated prose. Catching a max_tokens truncation in a partial JSON structure before it reaches your client is far cheaper than handling the parse error downstream.

Monitoring What Actually Matters

Before going live, instrument three metrics from your llm_streams table:

Stream completion rate. status = 'complete' divided by total streams initiated, measured hourly. Below 95% in production warrants investigation. Below 90% is an incident.

Orphan rate. Streams reaped by the Artisan command as a percentage of total streams. A spike here usually correlates with deployment restarts or queue worker OOM events. Check your Horizon metrics or system logs alongside it.

Gap frequency. Log the gaps array from client-side onComplete callbacks via a lightweight analytics endpoint. Persistent gaps from specific geographic regions point to Pusher or Reverb delivery problems, not LLM API issues. The distinction matters: one is your infrastructure, the other is your vendor’s.

Connecting these to Laravel’s AI middleware for token tracking gives you cost visibility alongside reliability metrics. That combination is what you need to have an honest conversation about whether Echo-based streaming is worth its operational complexity for your specific use case.