Functional example of LiveTaskRunner¶
The LiveTaskRunner orchestrates and executes ScanAnalyzers using a queue that updates and implements priorities. The queueing, prioritization and statuses of analyzers allows multiple LiveTaskRunners to be executed to speed up analysis.
In [ ]:
Copied!
import logging
import time
from geecs_data_utils import ScanTag, ScanPaths
from geecs_data_utils.config_roots import image_analysis_config, scan_analysis_config
from scan_analysis.live_task_runner import LiveTaskRunner
# Quiet logs
for name in ("image_analysis", "scan_analysis", "geecs_data_utils"):
logging.getLogger(name).setLevel(logging.ERROR)
logging.getLogger("scan_analysis.live_task_runner").setLevel(logging.INFO)
logging.getLogger("scan_analysis.task_queue").setLevel(logging.INFO)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Point to config roots (or set env vars IMAGE_ANALYSIS_CONFIG_DIR / SCAN_ANALYSIS_CONFIG_DIR)
image_analysis_config.set_base_dir(ScanPaths.paths_config.image_analysis_configs_path)
scan_analysis_config.set_base_dir(ScanPaths.paths_config.scan_analysis_configs_path)
date_tag = ScanTag(year=2026, month=2, day=12, number=0, experiment="Undulator")
runner = LiveTaskRunner(
analyzer_group="Undulator",
date_tag=date_tag,
config_dir=scan_analysis_config.base_dir, # optional; else env/global
image_config_dir=image_analysis_config.base_dir, # optional
gdoc_enabled=True,
)
import logging
import time
from geecs_data_utils import ScanTag, ScanPaths
from geecs_data_utils.config_roots import image_analysis_config, scan_analysis_config
from scan_analysis.live_task_runner import LiveTaskRunner
# Quiet logs
for name in ("image_analysis", "scan_analysis", "geecs_data_utils"):
logging.getLogger(name).setLevel(logging.ERROR)
logging.getLogger("scan_analysis.live_task_runner").setLevel(logging.INFO)
logging.getLogger("scan_analysis.task_queue").setLevel(logging.INFO)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Point to config roots (or set env vars IMAGE_ANALYSIS_CONFIG_DIR / SCAN_ANALYSIS_CONFIG_DIR)
image_analysis_config.set_base_dir(ScanPaths.paths_config.image_analysis_configs_path)
scan_analysis_config.set_base_dir(ScanPaths.paths_config.scan_analysis_configs_path)
date_tag = ScanTag(year=2026, month=2, day=12, number=0, experiment="Undulator")
runner = LiveTaskRunner(
analyzer_group="Undulator",
date_tag=date_tag,
config_dir=scan_analysis_config.base_dir, # optional; else env/global
image_config_dir=image_analysis_config.base_dir, # optional
gdoc_enabled=True,
)
In [ ]:
Copied!
runner.start()
try:
while True:
runner.process_new(
base_directory=ScanPaths.paths_config.base_path,
max_items=1,
dry_run=False,
rerun_completed=True, # one-time reset of done -> queued per scan
rerun_failed=True,
)
time.sleep(1)
finally:
runner.stop()
runner.start()
try:
while True:
runner.process_new(
base_directory=ScanPaths.paths_config.base_path,
max_items=1,
dry_run=False,
rerun_completed=True, # one-time reset of done -> queued per scan
rerun_failed=True,
)
time.sleep(1)
finally:
runner.stop()
In [ ]:
Copied!