Skip to content

Batch-Verarbeitung

Office Oxide ist schnell genug, dass für die meisten Batch-Jobs das Disk-I/O zum Flaschenhals wird, nicht das Parsing. Ein typisches Word-Dokument extrahiert in 0,8 ms — also schafft ein einzelner Thread ~1.000 Dateien pro Sekunde.

Dieser Guide zeigt Patterns, die skalieren: serielle Schleifen für kleine Jobs, Thread-Pools für mittlere, async I/O wenn du aus S3 oder HTTP streamst.

Serielle Schleife — der richtige Default

Bis zu ein paar tausend Dateien auf lokaler Disk ist die schlichte serielle Schleife die einfachste und oft schnellste Wahl. Du sparst dir Worker-Spawn-Overhead und die Konkurrenz paralleler Disk-Reads.

Python

from pathlib import Path
from office_oxide import Document

for src in Path("corpus").rglob("*"):
    if src.suffix.lower() in {".docx", ".xlsx", ".pptx", ".doc", ".xls", ".ppt"}:
        with Document.open(src) as doc:
            text = doc.plain_text()
        src.with_suffix(".txt").write_text(text)

Rust

use std::path::Path;
use office_oxide::Document;
use walkdir::WalkDir;

for entry in WalkDir::new("corpus") {
    let entry = entry?;
    let path = entry.path();
    if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
        if matches!(ext.to_ascii_lowercase().as_str(),
                    "docx" | "xlsx" | "pptx" | "doc" | "xls" | "ppt") {
            let doc = Document::open(path)?;
            std::fs::write(path.with_extension("txt"), doc.plain_text())?;
        }
    }
}

JavaScript

import { readdirSync, statSync, writeFileSync } from 'node:fs';
import { join, extname } from 'node:path';
import { Document } from 'office-oxide';

const exts = new Set(['.docx', '.xlsx', '.pptx', '.doc', '.xls', '.ppt']);

function* walk(dir) {
  for (const name of readdirSync(dir)) {
    const full = join(dir, name);
    if (statSync(full).isDirectory()) yield* walk(full);
    else yield full;
  }
}

for (const src of walk('corpus')) {
  if (!exts.has(extname(src).toLowerCase())) continue;
  using doc = Document.open(src);
  writeFileSync(src.replace(/\.\w+$/, '.txt'), doc.plainText());
}

WASM (Browser, hochgeladene Dateien)

import { WasmDocument } from 'office-oxide-wasm';

// <input type="file" multiple accept=".docx,.xlsx,.pptx,.doc,.xls,.ppt">
async function extractAll(fileList) {
  const results = [];
  for (const file of fileList) {
    const data = new Uint8Array(await file.arrayBuffer());
    const fmt = file.name.split('.').pop().toLowerCase();
    const doc = new WasmDocument(data, fmt);
    try {
      results.push({ name: file.name, text: doc.plainText() });
    } finally {
      doc.free();
    }
  }
  return results;
}

Läuft vollständig clientseitig — Dateien verlassen den Browser nie, ideal für datenschutzsensible Batch-Aufgaben.

Go

package main

import (
    "os"
    "path/filepath"
    "strings"

    officeoxide "github.com/yfedoseev/office_oxide/go"
)

var exts = map[string]bool{
    ".docx": true, ".xlsx": true, ".pptx": true,
    ".doc": true, ".xls": true, ".ppt": true,
}

func main() {
    filepath.Walk("corpus", func(path string, info os.FileInfo, err error) error {
        if err != nil || info.IsDir() { return err }
        if !exts[strings.ToLower(filepath.Ext(path))] { return nil }
        doc, err := officeoxide.Open(path)
        if err != nil { return nil } // unlesbare überspringen
        defer doc.Close()
        text, _ := doc.PlainText()
        return os.WriteFile(strings.TrimSuffix(path, filepath.Ext(path))+".txt", []byte(text), 0644)
    })
}

C#

using OfficeOxide;

var exts = new HashSet<string> { ".docx", ".xlsx", ".pptx", ".doc", ".xls", ".ppt" };

foreach (var src in Directory.EnumerateFiles("corpus", "*", SearchOption.AllDirectories))
{
    if (!exts.Contains(Path.GetExtension(src).ToLowerInvariant())) continue;
    using var doc = Document.Open(src);
    File.WriteAllText(Path.ChangeExtension(src, ".txt"), doc.PlainText());
}

Parallel — für große Korpora

Wenn du zehntausende Dateien und eine schnelle SSD hast, hilft Parallelisierung. Vorsicht: zu viele Worker sättigen die Disk und drücken den Durchsatz.

Python (ProcessPoolExecutor)

from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from office_oxide import Document

def process(path: Path) -> None:
    with Document.open(path) as doc:
        path.with_suffix(".md").write_text(doc.to_markdown())

paths = [p for p in Path("corpus").rglob("*")
         if p.suffix.lower() in {".docx", ".xlsx", ".pptx", ".doc", ".xls", ".ppt"}]

with ProcessPoolExecutor(max_workers=8) as ex:
    for _ in ex.map(process, paths):
        pass

Die Python-Bindung gibt das GIL beim nativen Parsing frei, also funktioniert auch ein ThreadPoolExecutor — aber Prozesse isolieren besser, wenn ein Dokument paniert.

Rust (rayon)

use rayon::prelude::*;
use office_oxide::Document;

paths.par_iter().for_each(|path| {
    if let Ok(doc) = Document::open(path) {
        let _ = std::fs::write(path.with_extension("md"), doc.to_markdown());
    }
});

Rayons Default-Thread-Anzahl entspricht deiner CPU — fast immer die richtige Einstellung.

Go (Goroutine-Pool)

package main

import (
    "os"
    "path/filepath"
    "runtime"
    "strings"
    "sync"

    officeoxide "github.com/yfedoseev/office_oxide/go"
)

func main() {
    var paths []string
    filepath.Walk("corpus", func(p string, info os.FileInfo, err error) error {
        if err != nil || info.IsDir() { return err }
        ext := strings.ToLower(filepath.Ext(p))
        if ext == ".docx" || ext == ".xlsx" || ext == ".pptx" { paths = append(paths, p) }
        return nil
    })

    jobs := make(chan string)
    var wg sync.WaitGroup
    for i := 0; i < runtime.NumCPU(); i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for path := range jobs {
                doc, err := officeoxide.Open(path)
                if err != nil { continue }
                md, _ := doc.ToMarkdown()
                os.WriteFile(strings.TrimSuffix(path, filepath.Ext(path))+".md", []byte(md), 0644)
                doc.Close()
            }
        }()
    }
    for _, p := range paths { jobs <- p }
    close(jobs)
    wg.Wait()
}

C# (Parallel.ForEach)

using OfficeOxide;

var exts = new HashSet<string> { ".docx", ".xlsx", ".pptx" };
var paths = Directory.EnumerateFiles("corpus", "*", SearchOption.AllDirectories)
    .Where(p => exts.Contains(Path.GetExtension(p).ToLowerInvariant()))
    .ToList();

Parallel.ForEach(paths, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, path =>
{
    try
    {
        using var doc = Document.Open(path);
        File.WriteAllText(Path.ChangeExtension(path, ".md"), doc.ToMarkdown());
    }
    catch (OfficeOxideException) { /* unlesbare überspringen */ }
});

JavaScript (Promise.all)

import { readdirSync, statSync } from 'node:fs';
import { join, extname } from 'node:path';
import { Document } from 'office-oxide';

const exts = new Set(['.docx', '.xlsx', '.pptx', '.doc', '.xls', '.ppt']);

function* walk(dir) {
  for (const name of readdirSync(dir)) {
    const full = join(dir, name);
    if (statSync(full).isDirectory()) yield* walk(full);
    else yield full;
  }
}

const paths = [...walk('corpus')].filter(p => exts.has(extname(p).toLowerCase()));

// Concurrency begrenzen
const CONCURRENCY = 8;
let i = 0;
async function worker() {
  while (i < paths.length) {
    const path = paths[i++];
    using doc = Document.open(path);
    // doc.toMarkdown() verarbeiten ...
  }
}
await Promise.all(Array.from({ length: CONCURRENCY }, worker));

Async — wenn Dateien von woanders kommen

Wenn Inputs aus HTTP, S3 oder einer Queue kommen, gewinnt Async I/O, weil das Netzwerk die Parse-Zeit dominiert. Aus Bytes öffnen, niemals aus dem Pfad.

Python (asyncio + aiohttp)

import asyncio, aiohttp
from office_oxide import Document

async def fetch_and_extract(session, url):
    async with session.get(url) as r:
        data = await r.read()
    fmt = url.rsplit(".", 1)[-1].lower()
    with Document.from_bytes(data, fmt) as doc:
        return doc.plain_text()

async def main(urls):
    async with aiohttp.ClientSession() as session:
        return await asyncio.gather(*(fetch_and_extract(session, u) for u in urls))

Rust (tokio)

use office_oxide::{Document, DocumentFormat};
use std::io::Cursor;

let bytes = reqwest::get(url).await?.bytes().await?;
let fmt = DocumentFormat::Docx;
// Parsing in einen blocking Task auslagern — Extraction ist CPU-bound
let text = tokio::task::spawn_blocking(move || -> office_oxide::Result<String> {
    let doc = Document::from_reader(Cursor::new(bytes.to_vec()), fmt)?;
    Ok(doc.plain_text())
}).await??;

JavaScript (fetch + Concurrency-Limit)

import { Document } from 'office-oxide';

async function fetchAndExtract(url) {
  const res = await fetch(url);
  const buf = Buffer.from(await res.arrayBuffer());
  const fmt = url.split('.').pop().toLowerCase();
  using doc = Document.fromBytes(buf, fmt);
  return doc.plainText();
}

const CONCURRENCY = 16;
const queue = [...urls];
const results = [];
await Promise.all(Array.from({ length: CONCURRENCY }, async () => {
  while (queue.length) {
    const url = queue.shift();
    results.push(await fetchAndExtract(url));
  }
}));

Go (HTTP-Fan-Out)

package main

import (
    "io"
    "net/http"
    "strings"
    "sync"

    officeoxide "github.com/yfedoseev/office_oxide/go"
)

func fetchAndExtract(url string) (string, error) {
    resp, err := http.Get(url)
    if err != nil { return "", err }
    defer resp.Body.Close()
    data, err := io.ReadAll(resp.Body)
    if err != nil { return "", err }

    fmt := url[strings.LastIndex(url, ".")+1:]
    doc, err := officeoxide.OpenFromBytes(data, fmt)
    if err != nil { return "", err }
    defer doc.Close()
    return doc.PlainText()
}

func main() {
    urls := []string{ /* ... */ }
    sem := make(chan struct{}, 16) // Concurrency-Deckel
    var wg sync.WaitGroup
    for _, u := range urls {
        wg.Add(1)
        sem <- struct{}{}
        go func(url string) {
            defer wg.Done()
            defer func() { <-sem }()
            text, _ := fetchAndExtract(url)
            _ = text // verarbeiten...
        }(u)
    }
    wg.Wait()
}

C# (HttpClient + async)

using OfficeOxide;

using var http = new HttpClient();

async Task<string> FetchAndExtract(string url)
{
    var data = await http.GetByteArrayAsync(url);
    var fmt = url[(url.LastIndexOf('.') + 1)..].ToLowerInvariant();
    using var doc = Document.FromBytes(data, fmt);
    return doc.PlainText();
}

// Concurrency mit SemaphoreSlim begrenzen
var sem = new SemaphoreSlim(16);
var tasks = urls.Select(async url =>
{
    await sem.WaitAsync();
    try { return await FetchAndExtract(url); }
    finally { sem.Release(); }
});
var results = await Task.WhenAll(tasks);

Speicher-Tipps

  • Für sehr große XLSX in Rust mit der mmap-Feature bauen (features = ["mmap"]) und Document::open_mmap aufrufen, um nicht das ganze Archiv in die Heap zu kopieren.
  • Pro Worker immer nur ein offenes Document halten. Jedes Handle hält die geparste Struktur im Speicher; das Schließen (Drop in Rust, Verlassen des with in Python, close()/using in JS) gibt sie frei.
  • Bei LLM-Ingestion im großen Stil bevorzuge to_markdown() gegenüber to_html() — Markdown produziert kleineren Output und besseren Downstream-LLM-Throughput.

Siehe auch