|
26 | 26 | StorageRef,
|
27 | 27 | User,
|
28 | 28 | SuccessResponse,
|
| 29 | + JobProgress, |
29 | 30 | )
|
30 | 31 | from .pagination import DEFAULT_PAGE_SIZE, paginated_format
|
31 | 32 | from .utils import dt_now
|
@@ -463,6 +464,52 @@ def _get_job_by_type_from_data(self, data: dict[str, object]):
|
463 | 464 |
|
464 | 465 | return DeleteOrgJob.from_dict(data)
|
465 | 466 |
|
| 467 | + async def get_job_progress(self, job_id: str) -> JobProgress: |
| 468 | + """Return progress of background job for supported types""" |
| 469 | + job = await self.get_background_job(job_id) |
| 470 | + |
| 471 | + if job.type != BgJobType.COPY_BUCKET: |
| 472 | + raise HTTPException(status_code=403, detail="job_type_not_supported") |
| 473 | + |
| 474 | + if job.success is False: |
| 475 | + raise HTTPException(status_code=400, detail="job_failed") |
| 476 | + |
| 477 | + if job.finished: |
| 478 | + return JobProgress(percentage=1.0) |
| 479 | + |
| 480 | + log_tail = await self.crawl_manager.tail_background_job(job_id) |
| 481 | + if not log_tail: |
| 482 | + raise HTTPException(status_code=400, detail="job_log_not_available") |
| 483 | + |
| 484 | + lines = log_tail.splitlines() |
| 485 | + reversed_lines = list(reversed(lines)) |
| 486 | + |
| 487 | + progress = JobProgress(percentage=0.0) |
| 488 | + |
| 489 | + # Parse lines in reverse order until we find one with latest stats |
| 490 | + for line in reversed_lines: |
| 491 | + try: |
| 492 | + if "ETA" not in line: |
| 493 | + continue |
| 494 | + |
| 495 | + stats_groups = line.split(",") |
| 496 | + for group in stats_groups: |
| 497 | + group = group.strip() |
| 498 | + if "%" in group: |
| 499 | + progress.percentage = float(group.strip("%")) / 100 |
| 500 | + if "ETA" in group: |
| 501 | + eta_str = group.strip("ETA ") |
| 502 | + # Split on white space to remove byte mark rclone sometimes |
| 503 | + # adds to end of stats line |
| 504 | + eta_list = eta_str.split(" ") |
| 505 | + progress.eta = eta_list[0] |
| 506 | + |
| 507 | + break |
| 508 | + except: |
| 509 | + continue |
| 510 | + |
| 511 | + return progress |
| 512 | + |
466 | 513 | async def list_background_jobs(
|
467 | 514 | self,
|
468 | 515 | org: Organization,
|
@@ -672,6 +719,17 @@ async def get_background_job(
|
672 | 719 | """Retrieve information for background job"""
|
673 | 720 | return await ops.get_background_job(job_id, org.id)
|
674 | 721 |
|
| 722 | + @router.get( |
| 723 | + "/{job_id}/progress", |
| 724 | + response_model=JobProgress, |
| 725 | + ) |
| 726 | + async def get_job_progress( |
| 727 | + job_id: str, |
| 728 | + org: Organization = Depends(org_crawl_dep), |
| 729 | + ): |
| 730 | + """Return progress information for background job""" |
| 731 | + return await ops.get_job_progress(job_id) |
| 732 | + |
675 | 733 | @app.get("/orgs/all/jobs/{job_id}", response_model=AnyJob, tags=["jobs"])
|
676 | 734 | async def get_background_job_all_orgs(job_id: str, user: User = Depends(user_dep)):
|
677 | 735 | """Get background job from any org"""
|
|
0 commit comments