Note
Go to the end to download the full example code.
Multiprocess training data upload#
This example demonstrates how to upload training data to a SimAI project using multiple parallel processes. It skips items that are already present in the project, so it is safe to run repeatedly without creating duplicates.
Before you begin#
Make sure you have:
Valid SimAI credentials and organization access.
A dataset folder where each subdirectory is one training data sample.
The
ansys-simai-corelibrary installed.
Configure your settings#
Update these variables before running the script:
import os
from multiprocessing import Pool
import ansys.simai.core as asc
ORGANIZATION_NAME = "<your_organization_name>"
PROJECT_NAME = "<your_project_name>"
DATASET_PATH = "<path_to_your_dataset_folder>"
# Maximum number of parallel upload workers
MAX_PROCESSES = 2
# Number of items dispatched to each worker at a time
MAX_CHUNK = 2
Define upload functions for multiprocessing#
_worker holds per-worker state (one SimAI client per process).
The initializer sets up the SimAI client and project for each worker process.
The upload function uploads one item and returns any error message instead of
raising exceptions, so that one failure does not abort the whole batch.
_worker: dict = {}
def _initializer(organization: str, dataset_path: str, project_name: str) -> None:
"""Set up per-worker state. Called once per worker process at pool start."""
_worker["simai"] = asc.SimAIClient(organization=organization)
_worker["project"] = _worker["simai"].projects.get(name=project_name)
_worker["dataset_path"] = dataset_path
def _upload_one(item_name: str) -> tuple[str, str | None]:
"""Upload one training-data folder.
Returns a ``(item_name, error_message)`` tuple so that a single failed
upload does not abort the rest of the batch.
"""
try:
simai = _worker["simai"]
project = _worker["project"]
dataset_path = _worker["dataset_path"]
training_data = simai.training_data.create(item_name, project)
simai.training_data.upload_folder(
training_data, folder_path=os.path.join(dataset_path, item_name)
)
return item_name, None
except Exception as exc: # noqa: BLE001
return item_name, str(exc)
Main entry point#
All SimAI calls and pool creation are inside main() and guarded by
if __name__ == "__main__". This prevents worker processes from
re-executing the setup logic when they import this module on Windows/macOS.
def main() -> None:
"""Run the parallel upload."""
simai = asc.SimAIClient(organization=ORGANIZATION_NAME)
project = simai.projects.get(name=PROJECT_NAME)
# Compare the local folder contents against what is already in the project
# and keep only the items that are missing.
all_items = os.listdir(DATASET_PATH)
existing_names = {td.name for td in project.list_training_data()}
items_to_upload = [item for item in all_items if item not in existing_names]
print(f"Dataset size : {len(all_items)}")
print(f"Already present: {len(all_items) - len(items_to_upload)}")
print(f"To upload : {len(items_to_upload)}")
if not items_to_upload:
print("Nothing to upload — project is already up to date.")
return
failed: list[str] = []
with Pool(
processes=MAX_PROCESSES,
initializer=_initializer,
initargs=(ORGANIZATION_NAME, DATASET_PATH, PROJECT_NAME),
) as pool:
for item_name, error in pool.imap(_upload_one, items_to_upload, chunksize=MAX_CHUNK):
if error is None:
print(f" Uploaded : {item_name}")
else:
print(f" Failed : {item_name} — {error}")
failed.append(item_name)
if failed:
print(f"\n{len(failed)} item(s) failed: {failed}")
else:
print("\nUpload complete — all items uploaded successfully.")
if __name__ == "__main__":
main()