Advanced Features
This page documents advanced features including data transformation pipelines, rechunking operations, and administrative functions.
Table of Contents
Data Transformation
Rechunking
Administrative
Data Transformation
muller.compute()
Overview
Decorator for creating data transformation functions that can be applied to datasets. This enables parallel processing and efficient data pipelines.
Parameters
- func (
callable): The function to decorate. Should take input data and output dataset as parameters.
Returns
- ComputeFunction: A wrapped function that can be evaluated on datasets.
Examples
import muller
import numpy as np
# Define a transformation function
@muller.compute
def preprocess_images(sample_in, sample_out):
# Normalize images
image = sample_in["images"]
normalized = (image - image.mean()) / image.std()
sample_out["images"].append(normalized)
sample_out["labels"].append(sample_in["labels"])
return sample_out
# Apply transformation
source_ds = muller.load("./raw_dataset")
target_ds = muller.dataset("./processed_dataset", overwrite=True)
target_ds.create_tensor("images")
target_ds.create_tensor("labels")
with target_ds:
preprocess_images().eval(
source_ds,
target_ds,
num_workers=4,
progressbar=True
)
# Transformation with augmentation
@muller.compute
def augment_data(sample_in, sample_out):
image = sample_in["images"]
label = sample_in["labels"]
# Original
sample_out["images"].append(image)
sample_out["labels"].append(label)
# Flipped
sample_out["images"].append(np.fliplr(image))
sample_out["labels"].append(label)
return sample_out
# Apply augmentation
with target_ds:
augment_data().eval(source_ds, target_ds, num_workers=4)
# Filtering transformation
@muller.compute
def filter_high_quality(sample_in, sample_out):
if sample_in["quality_score"] > 0.8:
sample_out["images"].append(sample_in["images"])
sample_out["labels"].append(sample_in["labels"])
return sample_out
muller.ComputeFunction
Overview
A class representing a compute function that can be evaluated on datasets. Created by the @muller.compute decorator.
Methods
- eval(): Evaluate the compute function on input data.
Parameters for eval()
- data_in: Input data (Dataset or iterable).
- ds_out (
Dataset): Output dataset. - num_workers (
int, optional): Number of parallel workers. Defaults to0. - scheduler (
str, optional): Scheduler type ("threaded", "processed", "serial"). Defaults to"threaded". - progressbar (
bool, optional): Show progress bar. Defaults toTrue. - skip_ok (
bool, optional): Skip samples that cause errors. Defaults toFalse. - ignore_errors (
bool, optional): Continue processing on errors. Defaults toFalse.
Examples
import muller
@muller.compute
def transform_data(sample_in, sample_out):
sample_out["processed"].append(sample_in["raw"] * 2)
return sample_out
source_ds = muller.load("./source")
target_ds = muller.dataset("./target", overwrite=True)
target_ds.create_tensor("processed")
# Evaluate with different configurations
with target_ds:
# Serial processing
transform_data().eval(source_ds, target_ds, num_workers=0)
# Parallel with threads
transform_data().eval(
source_ds,
target_ds,
num_workers=4,
scheduler="threaded"
)
# Parallel with processes
transform_data().eval(
source_ds,
target_ds,
num_workers=4,
scheduler="processed"
)
# With error handling
transform_data().eval(
source_ds,
target_ds,
num_workers=4,
skip_ok=True,
ignore_errors=True
)
muller.Pipeline
Overview
Create a pipeline of multiple transformation functions that are applied sequentially.
Parameters
- functions (
List[ComputeFunction]): List of compute functions to chain.
Returns
- Pipeline: A pipeline object that can be evaluated.
Examples
import muller
import numpy as np
# Define transformation steps
@muller.compute
def normalize(sample_in, sample_out):
image = sample_in["images"]
normalized = (image - image.mean()) / image.std()
sample_out["images"].append(normalized)
sample_out["labels"].append(sample_in["labels"])
return sample_out
@muller.compute
def resize(sample_in, sample_out):
from PIL import Image
image = sample_in["images"]
resized = np.array(Image.fromarray(image).resize((224, 224)))
sample_out["images"].append(resized)
sample_out["labels"].append(sample_in["labels"])
return sample_out
@muller.compute
def augment(sample_in, sample_out):
image = sample_in["images"]
# Original
sample_out["images"].append(image)
sample_out["labels"].append(sample_in["labels"])
# Flipped
sample_out["images"].append(np.fliplr(image))
sample_out["labels"].append(sample_in["labels"])
return sample_out
# Create pipeline
from muller.core.transform import Pipeline
pipeline = Pipeline([normalize, resize, augment])
# Apply pipeline
source_ds = muller.load("./raw_data")
target_ds = muller.dataset("./processed_data", overwrite=True)
target_ds.create_tensor("images")
target_ds.create_tensor("labels")
with target_ds:
pipeline.eval(
source_ds,
target_ds,
num_workers=4,
progressbar=True
)
Rechunking
ds.rechunk()
Overview
Reorganize the dataset's chunk structure for better storage efficiency and access patterns. This is useful after many modifications or when optimizing for specific access patterns.
Parameters
- tensors (
List[str], optional): List of tensor names to rechunk. If not provided, rechunks all tensors. Defaults toNone. - num_workers (
int, optional): Number of parallel workers. Defaults to0. - progressbar (
bool, optional): Show progress bar. Defaults toTrue.
Returns
- None
Examples
import muller
ds = muller.load("./my_dataset")
# Rechunk all tensors
ds.rechunk()
# Rechunk specific tensors
ds.rechunk(tensors=["images", "labels"])
# Rechunk with parallel workers
ds.rechunk(num_workers=4, progressbar=True)
# Rechunk after many modifications
with ds:
for i in range(1000):
ds.append({"data": i})
ds.commit("Added 1000 samples")
# Optimize storage
ds.rechunk(num_workers=4)
ds.commit("Rechunked for optimization")
When to Use
- After many small append operations
- After deleting many samples
- When optimizing for sequential vs random access
- Before sharing or archiving the dataset
ds.rechunk_if_necessary()
Overview
Automatically rechunk the dataset if certain conditions are met (e.g., fragmentation exceeds threshold). This is a smart version of rechunk() that only rechunks when beneficial.
Parameters
- tensors (
List[str], optional): List of tensor names to check and rechunk. Defaults toNone. - threshold (
float, optional): Fragmentation threshold (0-1) above which rechunking occurs. Defaults to0.3.
Returns
- bool:
Trueif rechunking was performed,Falseotherwise.
Examples
import muller
ds = muller.load("./my_dataset")
# Automatically rechunk if needed
was_rechunked = ds.rechunk_if_necessary()
if was_rechunked:
print("Dataset was rechunked")
# Check specific tensors
ds.rechunk_if_necessary(tensors=["images"])
# Use custom threshold
ds.rechunk_if_necessary(threshold=0.5) # More lenient
# In a maintenance routine
def optimize_dataset(ds):
if ds.rechunk_if_necessary():
ds.commit("Auto-rechunked for optimization")
ds.flush()
optimize_dataset(ds)
Administrative
ds.enable_admin_mode()
Overview
Enable administrative mode for the dataset. This grants elevated permissions for operations that are normally restricted.
Parameters
None
Returns
- None
Examples
import muller
ds = muller.load("./my_dataset")
# Enable admin mode
ds.enable_admin_mode()
# Perform administrative operations
with ds:
# Operations that require admin privileges
ds.delete_tensor("sensitive_data", large_ok=True)
ds.reshard_index("labels", num_shards=20)
# Disable when done
ds.disable_admin_mode()
Warning
Admin mode bypasses certain safety checks. Use with caution and disable when not needed.
ds.disable_admin_mode()
Overview
Disable administrative mode and return to normal operation mode.
Parameters
None
Returns
- None
Examples
import muller
ds = muller.load("./my_dataset")
# Enable admin mode for specific operations
ds.enable_admin_mode()
try:
# Perform admin operations
ds.delete_tensor("old_tensor", large_ok=True)
finally:
# Always disable admin mode
ds.disable_admin_mode()
# Check admin mode status
if ds.is_admin_mode:
ds.disable_admin_mode()
ds.lock()
Overview
Acquire a write lock on the dataset to prevent concurrent modifications. This is useful in multi-process or distributed environments.
Parameters
- timeout (
int, optional): Maximum time to wait for lock in seconds. Defaults to0(no wait).
Returns
- None
Examples
import muller
ds = muller.load("./my_dataset")
# Acquire lock
ds.lock()
try:
# Perform operations with exclusive access
with ds:
ds.append({"data": new_data})
ds.commit("Added data with lock")
finally:
# Always release lock
ds.unlock()
# Lock with timeout
try:
ds.lock(timeout=30) # Wait up to 30 seconds
with ds:
ds.extend({"data": batch_data})
ds.commit("Batch update")
finally:
ds.unlock()
ds.unlock()
Overview
Release the write lock on the dataset, allowing other processes to acquire it.
Parameters
None
Returns
- None
Examples
import muller
ds = muller.load("./my_dataset")
# Lock and unlock pattern
ds.lock()
try:
with ds:
ds.append({"data": data})
ds.commit("Update")
finally:
ds.unlock()
# Context manager pattern (recommended)
class DatasetLock:
def __init__(self, dataset):
self.ds = dataset
def __enter__(self):
self.ds.lock()
return self.ds
def __exit__(self, exc_type, exc_val, exc_tb):
self.ds.unlock()
# Usage
with DatasetLock(ds):
with ds:
ds.append({"data": data})
ds.commit("Safe update")
Advanced Workflow Examples
Parallel Data Processing Pipeline
import muller
import numpy as np
# Define transformation pipeline
@muller.compute
def preprocess(sample_in, sample_out):
image = sample_in["raw_image"]
# Normalize
normalized = (image - image.mean()) / image.std()
sample_out["image"].append(normalized)
sample_out["label"].append(sample_in["label"])
return sample_out
@muller.compute
def augment(sample_in, sample_out):
image = sample_in["image"]
label = sample_in["label"]
# Original
sample_out["image"].append(image)
sample_out["label"].append(label)
# Augmented versions
sample_out["image"].append(np.fliplr(image))
sample_out["label"].append(label)
sample_out["image"].append(np.flipud(image))
sample_out["label"].append(label)
return sample_out
# Process data
source = muller.load("./raw_data")
processed = muller.dataset("./processed_data", overwrite=True)
processed.create_tensor("image")
processed.create_tensor("label")
with processed:
# Apply preprocessing
preprocess().eval(source, processed, num_workers=8, progressbar=True)
processed.commit("Preprocessed data")
# Rechunk for efficiency
processed.rechunk(num_workers=4)
processed.commit("Rechunked")
# Create augmented version
augmented = muller.dataset("./augmented_data", overwrite=True)
augmented.create_tensor("image")
augmented.create_tensor("label")
with augmented:
augment().eval(processed, augmented, num_workers=8, progressbar=True)
augmented.commit("Augmented data")
Dataset Maintenance
import muller
def maintain_dataset(path):
"""Perform routine maintenance on a dataset."""
ds = muller.load(path)
# Enable admin mode for maintenance
ds.enable_admin_mode()
try:
# Rechunk if necessary
if ds.rechunk_if_necessary():
print("Dataset was rechunked")
ds.commit("Maintenance: rechunked")
# Optimize indexes
for tensor_name in ds.indexed_tensors:
ds.optimize_index(tensor_name)
print(f"Optimized index for {tensor_name}")
# Flush all changes
ds.flush()
print("Maintenance completed successfully")
finally:
# Always disable admin mode
ds.disable_admin_mode()
# Run maintenance
maintain_dataset("./my_dataset")
Safe Concurrent Access
import muller
from contextlib import contextmanager
@contextmanager
def locked_dataset(path, timeout=30):
"""Context manager for safe concurrent dataset access."""
ds = muller.load(path)
ds.lock(timeout=timeout)
try:
yield ds
finally:
ds.unlock()
# Use in concurrent environment
with locked_dataset("./shared_dataset") as ds:
with ds:
ds.append({"data": new_data})
ds.commit("Concurrent update")