strand

Easy creation of non-blocking tasks

https://github.com/i2mint/strand

Science Score: 26.0%

This score indicates how likely this project is to be science-related based on various indicators:

  • CITATION.cff file
  • codemeta.json file
    Found codemeta.json file
  • .zenodo.json file
    Found .zenodo.json file
  • DOI references
  • Academic publication links
  • Committers with academic emails
  • Institutional organization owner
  • JOSS paper metadata
  • Scientific vocabulary similarity
    Low similarity (7.8%) to scientific vocabulary
Last synced: 10 months ago · JSON representation

Repository

Easy creation of non-blocking tasks

Basic Info
  • Host: GitHub
  • Owner: i2mint
  • License: apache-2.0
  • Language: Python
  • Default Branch: master
  • Size: 332 KB
Statistics
  • Stars: 0
  • Watchers: 3
  • Forks: 0
  • Open Issues: 0
  • Releases: 0
Created over 4 years ago · Last pushed 10 months ago
Metadata Files
Readme License

README.md

strand

Easy creation of non-blocking tasks

To install: pip install strand

Warning

In order to use threads or multiprocessing safely, you need to understand the constraints of those features. A thorough discussion of how not to shoot yourself in the foot is outside the scope of this library. Future versions of this library may include strong input checks to prevent more common mistakes, with optional arguments to override checks if necessary. This version does not contain any safety controls yet.

Basic Usage

```python from strand import ThreadTaskrunner

def handle_chunk(chunk): print(f'got a chunk: {chunk}')

def longblockingfunction(totalsize, chunksize): if totalsize < chunksize: totalsize = chunksize
biglist = range(totalsize) return (biglist[chunksize * n:chunksize * (n + 1)] for n in range(totalsize / chunk_size))

instantiate the runner

runner = ThreadTaskrunner(longblockingfunction, oniter=handlechunk)

call the runner with the arguments to pass to the function

the function will run in a thread

runner(1e8, 1e3) ```

Decorator syntax

```python from strand import as_task

def handle_chunk(chunk): print(f'got a chunk: {chunk}')

@astask(oniter=handlechunk) def longblockingfunction(totalsize, chunksize): if totalsize < chunksize: totalsize = chunksize
big
list = range(totalsize) return (biglist[chunksize * n:chunksize * (n + 1)] for n in range(totalsize / chunksize))

the function will run in a thread

longblockingfunction(1e8, 1e3) ```

The as_task decorator takes a taskrunner target as its first argument. The argument may be a Taskrunner subclass or a string. The allowed values are: * 'thread' (default): ThreadTaskrunner * 'process': MultiprocessTaskrunner * 'coroutine': CoroutineTaskrunner * 'store': StoreTaskWriter * 'sync': Taskrunner (just runs the function and returns the value synchronously without any change of context)

Base API

class strand.Taskrunner(func: Callable, *init_args, on_iter: Optional[Callable] = None, on_end: Optional[Callable] = None, on_error: Optional[Callable] = None, **init_kwargs)

The base Taskrunner class and its subclasses take a callable as their first init argument. Taskrunners implement __call__ and pass arguments to their stored callable when called.

The init_args and init_kwargs are also passed to func when called (as func(*init_args, *args, **init_kwargs, **kwargs), allowing a Taskrunner instance to serve as a partial invocation of a function.

The optional arguments on_iter, on_end, and on_error are callbacks to be invoked when applicable. * If on_iter is provided and func returns an iterable, on_iter will be called with every item in the iterable after func returns. * If on_end is provided, it will be called with the return value of func. Otherwise, for most subclasses, the return value of func will be discarded. * If on_error is provided, it will be called with any exceptions thrown within Taskrunner.__call__. Otherwise, the taskrunner will re-throw exceptions after catching them.

Subclasses

ThreadTaskrunner

class strand.ThreadTaskrunner(func: Callable, *init_args, on_iter: Optional[Callable], on_end: Optional[Callable], on_error: Optional[Callable])

Runs func in a thread. Simple as that.

MultiprocessTaskrunner

class strand.MultiprocessTaskrunner(func: Callable, *init_args, on_iter: Optional[Callable], on_end: Optional[Callable], on_error: Optional[Callable], **init_kwargs)

Runs func in a new process. Has a separate set of caveats from multi-threading.

CoroutineTaskrunner

class strand.MultiprocessTaskrunner(func: Callable, *init_args, on_iter: Optional[Callable], on_end: Optional[Callable], on_error: Optional[Callable]), yield_on_iter: Optional[bool], **init_kwargs)

Runs func in a coroutine. Requires the calling context to already be within a coroutine in order to derive much benefit. Not fully fleshed out yet.

If yield_on_iter is True, adds await asyncio.sleep(0) between every iteration, to yield control back to the coroutine scheduler.

StoreTaskWriter

class strand.StoreTaskWriter(func: Callable, store: Mapping, *init_args, on_iter: Optional[Callable], on_end: Optional[Callable], on_error: Optional[Callable]), read_store=None, pickle_func=False, get_result=None, **init_kwargs)

When called, serializes func along with its arguments and passes them to store for storage, where it may then be found by a StoreTaskReader or any other consumer in another place and time.

The argument read_store takes a store that should expect to find values written in store and immediately instantiates a StoreTaskReader instance that starts polling read_store for items in a new thread.

If pickle_func is true, func will be serialized with dill for storage. Otherwise, only func.__name__ will be stored (which should be enough for most use cases where the store reader knows as much as it should about the writer).

StoreTaskReader (Not yet implemented)

class strand.StoreTaskReader(store: Mapping, get_task_func: Optional[Callable])

Accepts an argument store that should be a store of tasks to run.

The argument get_task_func should be a callable that resolves an item from the store into a function to call. If get_task_func is not present, the reader will assume that store[some_key]['func'] is a pickled callable and will automatically attempt to unpickle it with dill before calling it with *store[some_key]['args'], **store[some_key]['kwargs']

Calling the listen method on a StoreTaskReader instance will cause it to start an infinite loop in a new thread to poll the store for new tasks and execute them. ```python reader = StoreTaskReader(task_store)

reader.listen() ```

Future

  • Taskrunners that dispatch tasks to network targets (e.g. MQTT, RabbitMQ, Redis)
    • Could just be a special case of store reader/writer
  • Utilities for dispatching multiple tasks at once
  • More customizable serialization
  • Customize context for autogenerated StoreTaskReader when StoreTaskWriter is initialized with read_store
  • Thorough/correct handling of coroutines (could be a whole library unto itself)
  • Safety checking

au.py - Asynchronous Computation Framework

A Python framework for transforming synchronous functions into asynchronous ones with status tracking, result persistence, and pluggable backends.

Features

  • 🚀 Simple decorator-based API - Transform any function into an async computation
  • 💾 Pluggable storage backends - File system, Redis, databases, etc.
  • 🔄 Multiple execution backends - Processes, threads, remote APIs
  • 🛡️ Middleware system - Logging, metrics, authentication, rate limiting
  • 🧹 Automatic cleanup - TTL-based expiration of old results
  • 📦 Flexible serialization - JSON, Pickle, or custom formats
  • 🔍 Status tracking - Monitor computation state and progress
  • Cancellation support - Stop long-running computations

Quick Start

```python from strand.au import async_compute

@asynccompute() def expensivecomputation(n: int) -> int: """Calculate factorial.""" result = 1 for i in range(1, n + 1): result *= i return result

Launch computation (returns immediately)

handle = expensive_computation(100)

Check status

print(handle.get_status()) # ComputationStatus.RUNNING

Get result (blocks with timeout)

result = handle.get_result(timeout=30) print(f"100! = {result}") ```

Use Cases

1. Long-Running Computations

Perfect for computations that take minutes or hours: - Machine learning model training - Data processing pipelines - Scientific simulations - Report generation

2. Web Application Background Tasks

Offload heavy work from request handlers: ```python @app.route('/analyze') def analyzedata(): handle = analyzelargedataset(request.files['data']) return {'jobid': handle.key}

@app.route('/status/') def checkstatus(jobid): handle = ComputationHandle(jobid, store) return {'status': handle.getstatus().value} ```

3. Distributed Computing

Use remote backends to distribute work: python @async_compute(backend=RemoteAPIBackend(api_url="https://compute.example.com")) def distributed_task(data): return complex_analysis(data)

4. Batch Processing

Process multiple items with shared infrastructure: ```python store = FileSystemStore("/var/computations", ttl_seconds=3600) backend = ProcessBackend(store)

@asynccompute(backend=backend, store=store) def processitem(itemid): return transformitem(item_id)

Launch multiple computations

handles = [process_item(i) for i in range(1000)] ```

Usage Patterns

Basic Usage

```python from strand.au import async_compute

Simple async function with default settings

@asynccompute() def myfunction(x): return x * 2

handle = myfunction(21) result = handle.getresult(timeout=10) # Returns 42 ```

Custom Configuration

```python from strand.au import async_compute, FileSystemStore, ProcessBackend from strand.au import LoggingMiddleware, MetricsMiddleware, SerializationFormat

Configure store with TTL and serialization

store = FileSystemStore( "/var/computations", ttl_seconds=3600, # 1 hour TTL serialization=SerializationFormat.PICKLE # For complex objects )

Add middleware

middleware = [ LoggingMiddleware(level=logging.INFO), MetricsMiddleware() ]

Create backend with middleware

backend = ProcessBackend(store, middleware=middleware)

Apply to function

@asynccompute(backend=backend, store=store) def complexcomputation(data): return analyze(data) ```

Shared Infrastructure

```python

Create shared components

store = FileSystemStore("/var/shared", ttl_seconds=7200) backend = ProcessBackend(store)

Multiple functions share the same infrastructure

@async_compute(backend=backend, store=store) def step1(x): return preprocess(x)

@async_compute(backend=backend, store=store) def step2(x): return transform(x)

Chain computations

data = loaddata() h1 = step1(data) preprocessed = h1.getresult(timeout=60) h2 = step2(preprocessed) finalresult = h2.getresult(timeout=60) ```

Temporary Computations

```python from strand.au import temporaryasynccompute

Automatic cleanup when context exits

with temporaryasynccompute(ttlseconds=60) as asyncfunc: @asyncfunc def quickjob(x): return x ** 2

handle = quick_job(10)
result = handle.get_result(timeout=5)
# Temporary directory cleaned up automatically

```

Thread Backend for I/O-Bound Tasks

```python from strand.au import ThreadBackend

Use threads for I/O-bound operations

store = FileSystemStore("/tmp/io_tasks") backend = ThreadBackend(store)

@asynccompute(backend=backend, store=store) def fetchdata(url): return requests.get(url).json()

Launch multiple I/O operations

handles = [fetch_data(url) for url in urls] ```

Architecture & Design

Core Components

  1. Storage Abstraction (ComputationStore)

    • Implements Python's MutableMapping interface
    • Handles result persistence and retrieval
    • Supports TTL-based expiration
    • Extensible for any storage backend
  2. Execution Abstraction (ComputationBackend)

    • Defines how computations are launched
    • Supports different execution models
    • Integrates middleware for cross-cutting concerns
  3. Result Handling (ComputationHandle)

    • Clean API for checking status and retrieving results
    • Supports timeouts and cancellation
    • Provides access to metadata
  4. Middleware System

    • Lifecycle hooks: before, after, error
    • Composable and reusable
    • Examples: logging, metrics, auth, rate limiting

Design Principles

  • Separation of Concerns: Storage, execution, and result handling are independent
  • Dependency Injection: All components are injected, avoiding hardcoded dependencies
  • Open/Closed Principle: Extend functionality without modifying core code
  • Standard Interfaces: Uses Python's collections.abc interfaces
  • Functional Approach: Decorator-based API preserves function signatures

Trade-offs & Considerations

Pros

  • ✅ Clean abstraction allows easy swapping of implementations
  • ✅ Type hints and dataclasses provide excellent IDE support
  • ✅ Follows SOLID principles for maintainability
  • ✅ Minimal dependencies (uses only Python stdlib)
  • ✅ Flexible serialization supports complex objects
  • ✅ Middleware enables cross-cutting concerns

Cons

  • ❌ Process-based backend has overhead for small computations
  • ❌ File-based storage might not scale for high throughput
  • ❌ Metrics middleware doesn't share state across processes by default
  • ❌ No built-in distributed coordination
  • ❌ Fork method required for ProcessBackend (platform-specific)

When to Use

  • ✅ Long-running computations (minutes to hours)
  • ✅ Need to persist results across restarts
  • ✅ Want to separate computation from result retrieval
  • ✅ Building async APIs or job queues
  • ✅ Need cancellation or timeout support

When NOT to Use

  • ❌ Sub-second computations (overhead too high)
  • ❌ Need distributed coordination (use Celery/Dask)
  • ❌ Require complex workflow orchestration
  • ❌ Need real-time streaming results

Advanced Features

Custom Middleware

```python from strand.au import Middleware

class RateLimitMiddleware(Middleware): def init(self, maxperminute: int = 60): self.maxperminute = maxperminute self.requests = []

def before_compute(self, func, args, kwargs, key):
    now = time.time()
    self.requests = [t for t in self.requests if now - t < 60]

    if len(self.requests) >= self.max_per_minute:
        raise Exception("Rate limit exceeded")

    self.requests.append(now)

def after_compute(self, key, result):
    pass

def on_error(self, key, error):
    pass

Use the middleware

@asynccompute(middleware=[RateLimitMiddleware(maxperminute=10)]) def ratelimitedfunction(x): return expensiveapi_call(x) ```

Custom Storage Backend

```python from strand.au import ComputationStore, ComputationResult import redis

class RedisStore(ComputationStore): def init(self, redisclient, *, ttlseconds=None): super().init(ttlseconds=ttlseconds) self.redis = redis_client

def create_key(self):
    return f"computation:{uuid.uuid4()}"

def __getitem__(self, key):
    data = self.redis.get(key)
    if data is None:
        return ComputationResult(None, ComputationStatus.PENDING)
    return pickle.loads(data)

def __setitem__(self, key, result):
    data = pickle.dumps(result)
    if self.ttl_seconds:
        self.redis.setex(key, self.ttl_seconds, data)
    else:
        self.redis.set(key, data)

def __delitem__(self, key):
    self.redis.delete(key)

def __iter__(self):
    return iter(self.redis.scan_iter("computation:*"))

def __len__(self):
    return len(list(self))

def cleanup_expired(self):
    # Redis handles expiration automatically
    return 0

Use Redis backend

redisclient = redis.Redis(host='localhost', port=6379) store = RedisStore(redisclient, ttl_seconds=3600)

@asynccompute(store=store) def distributedcomputation(x): return process(x) ```

Monitoring & Metrics

```python from strand.au import MetricsMiddleware

Create shared metrics

metrics = MetricsMiddleware()

@asynccompute(middleware=[metrics]) def monitoredfunction(x): return compute(x)

Launch several computations

for i in range(10): monitored_function(i)

Check metrics

stats = metrics.getstats() print(f"Total: {stats['total']}") print(f"Completed: {stats['completed']}") print(f"Failed: {stats['failed']}") print(f"Avg Duration: {stats['avgduration']:.2f}s") ```

Error Handling

```python @asynccompute() def mayfail(x): if x < 0: raise ValueError("x must be positive") return x ** 2

handle = may_fail(-5)

try: result = handle.getresult(timeout=5) except Exception as e: print(f"Computation failed: {e}") print(f"Status: {handle.getstatus()}") # ComputationStatus.FAILED ```

Cleanup Strategies

```python

Manual cleanup

@asynccompute(ttlseconds=3600) def my_func(x): return x * 2

Clean up expired results

removed = myfunc.cleanupexpired() print(f"Removed {removed} expired results")

Automatic cleanup with probability

store = FileSystemStore( "/tmp/computations", ttlseconds=3600, autocleanup=True, cleanup_probability=0.1 # 10% chance on each access ) ```

API Reference

Main Decorator

python @async_compute( backend=None, # Execution backend (default: ProcessBackend) store=None, # Storage backend (default: FileSystemStore) base_path="/tmp/computations", # Path for default file store ttl_seconds=3600, # Time-to-live for results serialization=SerializationFormat.JSON, # JSON or PICKLE middleware=None # List of middleware components )

ComputationHandle Methods

  • is_ready() -> bool: Check if computation is complete
  • get_status() -> ComputationStatus: Get current status
  • get_result(timeout=None) -> T: Get result, optionally wait
  • cancel() -> bool: Attempt to cancel computation
  • metadata -> Dict[str, Any]: Access computation metadata

ComputationStatus Enum

  • PENDING: Not started yet
  • RUNNING: Currently executing
  • COMPLETED: Successfully finished
  • FAILED: Failed with error

Process Management Utility

The run_process context manager allows you to launch a process for the duration of a context, with optional readiness checks and automatic cleanup. It is useful for ensuring a background service or worker is running for the duration of a test or script.

Example:

```python from strand.taskrunning.utils import run_process import time

def my_worker(): print("Worker started!") time.sleep(5) print("Worker exiting!")

with runprocess(myworker, processname="myworker", isready=0.2, timeout=5) as proc: print(f"Process running: {proc.isalive()}") # Do work while the process is running time.sleep(1)

After the context, the process is cleaned up

```

You can also use the process_already_running argument to avoid launching a process if an external check indicates it is already running:

```python def isservicerunning(): # Return True if the service is already running ...

with runprocess(myworker, processalreadyrunning=isservicerunning) as proc: if proc is None: print("Service was already running!") else: print("Launched new worker process!") ```

Owner

  • Name: i2mint
  • Login: i2mint
  • Kind: organization

Coding for humans

GitHub Events

Total
  • Push event: 4
  • Create event: 5
Last Year
  • Push event: 4
  • Create event: 5

Committers

Last synced: over 3 years ago

All Time
  • Total Commits: 15
  • Total Committers: 3
  • Avg Commits per committer: 5.0
  • Development Distribution Score (DDS): 0.4
Top Committers
Name Email Commits
Steve Herzog s****g@a****m 9
GitHub CI Runner t****1@g****m 5
Valentin Féron v****n@p****m 1
Committer Domains (Top 20 + Academic)

Issues and Pull Requests

Last synced: 10 months ago

All Time
  • Total issues: 0
  • Total pull requests: 0
  • Average time to close issues: N/A
  • Average time to close pull requests: N/A
  • Total issue authors: 0
  • Total pull request authors: 0
  • Average comments per issue: 0
  • Average comments per pull request: 0
  • Merged pull requests: 0
  • Bot issues: 0
  • Bot pull requests: 0
Past Year
  • Issues: 0
  • Pull requests: 0
  • Average time to close issues: N/A
  • Average time to close pull requests: N/A
  • Issue authors: 0
  • Pull request authors: 0
  • Average comments per issue: 0
  • Average comments per pull request: 0
  • Merged pull requests: 0
  • Bot issues: 0
  • Bot pull requests: 0
Top Authors
Issue Authors
Pull Request Authors
Top Labels
Issue Labels
Pull Request Labels

Packages

  • Total packages: 1
  • Total downloads:
    • pypi 1,309 last-month
  • Total dependent packages: 1
  • Total dependent repositories: 1
  • Total versions: 10
  • Total maintainers: 1
pypi.org: strand

Easy creation of non-blocking tasks

  • Versions: 10
  • Dependent Packages: 1
  • Dependent Repositories: 1
  • Downloads: 1,309 Last month
Rankings
Dependent packages count: 10.0%
Downloads: 16.0%
Dependent repos count: 21.7%
Average: 23.3%
Forks count: 29.8%
Stargazers count: 38.8%
Maintainers (1)
Last synced: 10 months ago

Dependencies

.github/workflows/ci.yml actions
  • actions/checkout v2 composite
  • actions/setup-python v2 composite
setup.py pypi