惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

SecWiki News
SecWiki News
I
InfoQ
The Cloudflare Blog
人人都是产品经理
人人都是产品经理
博客园 - Franky
T
Tailwind CSS Blog
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
量子位
博客园_首页
罗磊的独立博客
V
V2EX
李成银的技术随笔
大猫的无限游戏
大猫的无限游戏
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
T
True Tiger Recordings
Vercel News
Vercel News
Cyberwarzone
Cyberwarzone
Cisco Talos Blog
Cisco Talos Blog
F
Fox-IT International blog
D
Darknet – Hacking Tools, Hacker News & Cyber Security
M
Microsoft Research Blog - Microsoft Research
Know Your Adversary
Know Your Adversary
爱范儿
爱范儿
The Register - Security
The Register - Security
G
Google Developers Blog
The Hacker News
The Hacker News
Malwarebytes
Malwarebytes
S
Securelist
博客园 - 三生石上(FineUI控件)
Jina AI
Jina AI
T
Threat Research - Cisco Blogs
T
The Exploit Database - CXSecurity.com
S
SegmentFault 最新的问题
博客园 - 叶小钗
F
Fortinet All Blogs
Apple Machine Learning Research
Apple Machine Learning Research
宝玉的分享
宝玉的分享
博客园 - 聂微东
T
Threatpost
博客园 - 【当耐特】
D
Docker
P
Privacy & Cybersecurity Law Blog
www.infosecurity-magazine.com
www.infosecurity-magazine.com
G
GRAHAM CLULEY
V
Visual Studio Blog
C
Cisco Blogs
IT之家
IT之家
S
Security Archives - TechRepublic
Latest news
Latest news
阮一峰的网络日志
阮一峰的网络日志

jdhao's digital space

Manage uv.lock file with Renovate Set up Python Provider for Neovim Ripgrep Config to Search Hidden Files Pre-commit Setup for Your Project I read the nvim v0.12 release note so you don't have to Return Different Values for Each Call of A Mock Migrate Python Project from Pip to Uv 德语常用不规则动词 葱油鸡腿制作 Check Trailing White Spaces in Your Project 菜谱:茄子肉丁 object vs nested type in data mapping in Elasticsearch Node, Index, Shard in Elasticsearch Logging setup for Pytest Select fields in Elasticsearch: _source, fields and stored_fields 中式葱花饼制作 菜谱: 凉拌苤蓝(卜留克/kohlrabi) 我也有高考 PTSD Garmin Course Syncing Not Working? Prevent Accidental Index Delete in Elasticsearch How to Import GPX File into Garmin Watch Python system PATH issues When We Use Pytest 菜谱:泰式打抛牛肉 菜谱:烤箱羊肉串 How to Filter Warnings in Python/pytest 家常烤箱烤鸡腿 Comparison between Several Desktop Speakers How to Use LuaRocks Package in Neovim Macbook 外接显示器 家常萝卜炖羊排 Run the Job Immediately after Starting Scheduler in Python APScheduler Retry for Google Cloud Client 菜谱:土豆金枪鱼沙拉 菜谱:椰香咖喱鸡 凉拌绿豆宽粉制作 Make Python logging Work in GCP Liveness and Readiness Check in Kubernetes Notes on Using GCP Logging 西班牙土豆饼制作 Elasticsearch Version Conflict Error How to Use the Elasticsearch task API Index refresh issue in Elasticsearch Google Cloud Storage Usage 家常煎羊排制作 凉拌茄子制作 Configure Python logging with dictConfig Debugging Wezterm Issues Black Formatter Setup for Python Project Git line ending config Garmin Forerunner 965 Essential Tips and Setups How to Download Files from Google Cloud Storage in the Databricks Workspace Notebook Databricks Cli Usage Working with Databricks Workspace Files 手抓羊肉饭制作 Databricks Init Scripts Using Virutal Environment in Python with venv File Systems in Databricks LATERAL VIEW EXPLODE in Spark 菜谱:麻婆豆腐 在德国做台湾卤肉饭 FastAPI testing and OpenAPI doc generation Change Timezone in Databricks Spark How to Profile Your Python Script/Module 菜谱:茄子肉沫 Migrating from Packer.nvim to Lazy.nvim How to Extract PDF file on macOS How to Deploy Fastapi Application with Docker Nerdfont Icon Missing after Wezterm Upgrade Pylsp setup for Neovim in 2023 How to Parse Query Param With Multiple Values in FastAPI 菜谱:土豆胡萝卜烧牛肉 Zsh Startup Files in macOS PATH Variable Changed inside Tmux on macOS? Work with JSON File in Neovim Running/importing Python code/module in Databricks Agile and Scrum 菜谱:凉拌牛肉 Awesome Command Line Tools Written in Rust How to get or set Databricks spark configuration Set Up German Version macOS Add A Custom Search Engine for Vimium 中国大陆小米手机如何使用 Google Pay 春节回乡记 滇西之行 2023 贵阳行 2023 程序员海外工作---语言篇 2023 长沙行 2023 西安行 德国工签申请指南 2022 年博客回顾 感染 omicron 记录 How to Override Default Options in Neovim Variadic Arguments in Lua How to Enable Method Autocompletion for OpenCV How to Read Local CSV File to Table in MySQL I read the nvim v0.8 release note so you do not have to Creating A Trigger in PostgreSQL Cost of Living in Shenzhen You Do Not Need a Plugin for This Feature Ctrl-left and Ctrl-right Not Working in macOS?
Speed up document indexing in Elasticsearch via bulk indexing
2024-07-26 · via jdhao's digital space

In Elasticsearch, there is index API where you can index a single document to an index.

However, if you have a lot of documents and index them with this API one by one, the indexing speed is quite slow.

For large number of document indexing, it is better to use the bulk API from the elasticsearch.helpers submodule. It has three different bulk method:

  • helpers.bulk()
  • helpers.streaming_bulk()
  • helpers.parallel_bulk()

If you have a lot of products, the official doc recommends using of streaming_bulk():

When errors are being collected original document data is included in the error dictionary which can lead to an extra high memory usage. If you need to process a lot of data and want to ignore/collect errors please consider using the streaming_bulk() helper which will just return the errors and not store them in memory.

Usually the bulk API is way faster than the sequential indexing method. Here is a short code snippet to benchmark the different ways of indexing:

import time
from contextlib import contextmanager

from elasticsearch.helpers import streaming_bulk, parallel_bulk


@contextmanager
def report_time(procedure_name):
    start = time.time()
    yield
    end = time.time()
    print(f"time spent for {procedure_name}: {end - start}")


def sequential_index(client, docs, index_name):
    for doc in docs:
        client.index(index=index_name, document=doc)


def bulk_index(client, docs, index_name, bulk_type):
    def gen_actions(docs):
        for i, doc in enumerate(docs):
            action = {"_id": i, "_index": index_name, "_source": doc}
            yield action

    if bulk_type == "streaming":
        for success, status in streaming_bulk(
            client, actions=gen_actions(docs), chunk_size=500
        ):
            if not success:
                print(status)

    if bulk_type == "parallel":
        for success, status in parallel_bulk(
            client, actions=gen_actions(docs), chunk_size=500
        ):
            if not success:
                print(status)


def recreate_index(client, index_name):
    if client.indices.exists(index=index_name):
        client.indices.delete(index=index_name)
    client.indices.create(index=index_name)


if __name__ == "__main__":
    n = 1000
    index_name = "my_index"
    # set up the client
    es_client = Elasticsearch(...)

    docs = []
    for i in range(n):
        doc = {"title": f"this is document {i}", "date": "2024-07-25"}
        docs.append(doc)

    recreate_index(es_client, index_name)
    with report_time("sequential indexing"):
        sequential_index(es_client, docs, index_name)

    recreate_index(es_client, index_name)
    with report_time("streaming bulk"):
        bulk_index(es_client, docs, index_name, "streaming")

    recreate_index(es_client, index_name)
    with report_time("parallel bulk"):
        bulk_index(es_client, docs, index_name, "parallel")

This is the testing result from my machine:

time spent for sequential indexing: 123.88939809799194
time spent for streaming bulk: 0.6672859191894531
time spent for parallel bulk: 0.45793724060058594

References#