Skip to content

Commit f39d6f2

Browse files
committed
Merge branch 'main' into issue-578-custom-storage (1.14.4 release)
2 parents 9c3b053 + b2c5b9b commit f39d6f2

File tree

86 files changed

+2648
-1335
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+2648
-1335
lines changed

.github/workflows/k3d-nightly-ci.yaml

+17-1
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,24 @@ on:
88
workflow_dispatch:
99

1010
jobs:
11+
collect-test-modules:
12+
runs-on: ubuntu-latest
13+
outputs:
14+
matrix: ${{ steps.set-matrix.outputs.matrix }}
15+
steps:
16+
- uses: actions/checkout@v3
17+
- id: set-matrix
18+
run: |
19+
echo matrix="$(ls ./backend/test_nightly/ | grep -o "^test_.*" | jq -R -s -c 'split("\n")[:-1]')" >> $GITHUB_OUTPUT
20+
1121
btrix-k3d-nightly-test:
22+
name: ${{ matrix.module }}
23+
needs: collect-test-modules
1224
runs-on: ubuntu-latest
25+
strategy:
26+
matrix:
27+
module: ${{fromJSON(needs.collect-test-modules.outputs.matrix)}}
28+
fail-fast: false
1329
steps:
1430
- name: Create k3d Cluster
1531
uses: AbsaOSS/k3d-action@v2
@@ -82,7 +98,7 @@ jobs:
8298
run: kubectl exec -i deployment/local-minio -c minio -- mkdir /data/replica-0
8399

84100
- name: Run Tests
85-
run: pytest -vv ./backend/test_nightly/test_*.py
101+
run: pytest -vv ./backend/test_nightly/${{ matrix.module }}
86102

87103
- name: Print Backend Logs (API)
88104
if: ${{ failure() }}

backend/btrixcloud/basecrawls.py

+13-49
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""base crawl type"""
22

3-
from datetime import datetime, timedelta
3+
from datetime import datetime
44
from typing import Optional, List, Union, Dict, Any, Type, TYPE_CHECKING, cast, Tuple
55
from uuid import UUID
66
import os
@@ -29,10 +29,9 @@
2929
UpdatedResponse,
3030
DeletedResponseQuota,
3131
CrawlSearchValuesResponse,
32-
PRESIGN_DURATION_SECONDS,
3332
)
3433
from .pagination import paginated_format, DEFAULT_PAGE_SIZE
35-
from .utils import dt_now, date_to_str, get_origin
34+
from .utils import dt_now, get_origin, date_to_str
3635

3736
if TYPE_CHECKING:
3837
from .crawlconfigs import CrawlConfigOps
@@ -65,9 +64,6 @@ class BaseCrawlOps:
6564
background_job_ops: BackgroundJobOps
6665
page_ops: PageOps
6766

68-
presign_duration_seconds: int
69-
expire_at_duration_seconds: int
70-
7167
def __init__(
7268
self,
7369
mdb,
@@ -89,9 +85,6 @@ def __init__(
8985
self.background_job_ops = background_job_ops
9086
self.page_ops = cast(PageOps, None)
9187

92-
# renew when <25% of time remaining
93-
self.expire_at_duration_seconds = int(PRESIGN_DURATION_SECONDS * 0.75)
94-
9588
def set_page_ops(self, page_ops):
9689
"""set page ops reference"""
9790
self.page_ops = page_ops
@@ -124,13 +117,12 @@ async def _files_to_resources(
124117
files: List[Dict],
125118
org: Organization,
126119
crawlid: str,
127-
qa_run_id: Optional[str] = None,
128120
) -> List[CrawlFileOut]:
129121
if not files:
130122
return []
131123

132124
crawl_files = [CrawlFile(**data) for data in files]
133-
return await self.resolve_signed_urls(crawl_files, org, crawlid, qa_run_id)
125+
return await self.resolve_signed_urls(crawl_files, org, crawlid)
134126

135127
async def get_wacz_files(self, crawl_id: str, org: Organization):
136128
"""Return list of WACZ files associated with crawl."""
@@ -177,11 +169,14 @@ async def get_crawl_out(
177169

178170
oid = res.get("oid")
179171
if oid:
172+
origin = get_origin(headers)
180173
res["pagesQueryUrl"] = (
181-
get_origin(headers)
182-
+ f"/api/orgs/{oid}/crawls/{crawlid}/pagesSearch"
174+
origin + f"/api/orgs/{oid}/crawls/{crawlid}/pagesSearch"
183175
)
184176

177+
# this will now disable the downloadUrl in RWP
178+
res["downloadUrl"] = None
179+
185180
crawl = CrawlOutWithResources.from_dict(res)
186181

187182
if not skip_resources:
@@ -464,50 +459,19 @@ async def resolve_signed_urls(
464459
files: List[CrawlFile],
465460
org: Organization,
466461
crawl_id: Optional[str] = None,
467-
qa_run_id: Optional[str] = None,
468-
update_presigned_url: bool = False,
462+
force_update=False,
469463
) -> List[CrawlFileOut]:
470464
"""Regenerate presigned URLs for files as necessary"""
471465
if not files:
472466
print("no files")
473467
return []
474468

475-
delta = timedelta(seconds=self.expire_at_duration_seconds)
476-
477469
out_files = []
478470

479471
for file_ in files:
480-
presigned_url = file_.presignedUrl
481-
now = dt_now()
482-
483-
if (
484-
update_presigned_url
485-
or not presigned_url
486-
or (file_.expireAt and now >= file_.expireAt)
487-
):
488-
exp = now + delta
489-
presigned_url = await self.storage_ops.get_presigned_url(
490-
org, file_, PRESIGN_DURATION_SECONDS
491-
)
492-
493-
prefix = "files"
494-
if qa_run_id:
495-
prefix = f"qaFinished.{qa_run_id}.{prefix}"
496-
497-
await self.crawls.find_one_and_update(
498-
{f"{prefix}.filename": file_.filename},
499-
{
500-
"$set": {
501-
f"{prefix}.$.presignedUrl": presigned_url,
502-
f"{prefix}.$.expireAt": exp,
503-
}
504-
},
505-
)
506-
file_.expireAt = exp
507-
508-
expire_at_str = ""
509-
if file_.expireAt:
510-
expire_at_str = date_to_str(file_.expireAt)
472+
presigned_url, expire_at = await self.storage_ops.get_presigned_url(
473+
org, file_, force_update=force_update
474+
)
511475

512476
out_files.append(
513477
CrawlFileOut(
@@ -517,7 +481,7 @@ async def resolve_signed_urls(
517481
size=file_.size,
518482
crawlId=crawl_id,
519483
numReplicas=len(file_.replicas) if file_.replicas else 0,
520-
expireAt=expire_at_str,
484+
expireAt=date_to_str(expire_at),
521485
)
522486
)
523487

backend/btrixcloud/colls.py

+80-8
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import asyncio
1313
import pymongo
14+
import aiohttp
1415
from pymongo.collation import Collation
1516
from fastapi import Depends, HTTPException, Response
1617
from fastapi.responses import StreamingResponse
@@ -342,9 +343,11 @@ async def get_collection_out(
342343
result = await self.get_collection_raw(coll_id, public_or_unlisted_only)
343344

344345
if resources:
345-
result["resources"], crawl_ids, pages_optimized = (
346-
await self.get_collection_crawl_resources(coll_id)
347-
)
346+
(
347+
result["resources"],
348+
crawl_ids,
349+
pages_optimized,
350+
) = await self.get_collection_crawl_resources(coll_id)
348351

349352
initial_pages, _ = await self.page_ops.list_pages(
350353
crawl_ids=crawl_ids,
@@ -353,11 +356,21 @@ async def get_collection_out(
353356

354357
public = "public/" if public_or_unlisted_only else ""
355358

359+
origin = get_origin(headers)
360+
361+
if public_or_unlisted_only:
362+
slug = result.get("slug")
363+
result["downloadUrl"] = (
364+
origin + f"/api/{public}orgs/{org.slug}/collections/{slug}/download"
365+
)
366+
else:
367+
# disable download link, as not public without auth
368+
result["downloadUrl"] = None
369+
356370
if pages_optimized:
357371
result["initialPages"] = initial_pages
358372
result["pagesQueryUrl"] = (
359-
get_origin(headers)
360-
+ f"/api/orgs/{org.id}/collections/{coll_id}/{public}pages"
373+
origin + f"/api/orgs/{org.id}/collections/{coll_id}/{public}pages"
361374
)
362375

363376
thumbnail = result.get("thumbnail")
@@ -378,6 +391,9 @@ async def get_public_collection_out(
378391
"""Get PublicCollOut by id"""
379392
result = await self.get_collection_raw(coll_id)
380393

394+
result["orgName"] = org.name
395+
result["orgPublicProfile"] = org.enablePublicProfile
396+
381397
allowed_access = [CollAccessType.PUBLIC]
382398
if allow_unlisted:
383399
allowed_access.append(CollAccessType.UNLISTED)
@@ -396,6 +412,38 @@ async def get_public_collection_out(
396412

397413
return PublicCollOut.from_dict(result)
398414

415+
async def get_public_thumbnail(
416+
self, slug: str, org: Organization
417+
) -> StreamingResponse:
418+
"""return thumbnail of public collection, if any"""
419+
result = await self.get_collection_raw_by_slug(
420+
slug, public_or_unlisted_only=True
421+
)
422+
423+
thumbnail = result.get("thumbnail")
424+
if not thumbnail:
425+
raise HTTPException(status_code=404, detail="thumbnail_not_found")
426+
427+
image_file = ImageFile(**thumbnail)
428+
image_file_out = await image_file.get_public_image_file_out(
429+
org, self.storage_ops
430+
)
431+
432+
path = self.storage_ops.resolve_internal_access_path(image_file_out.path)
433+
434+
async def reader():
435+
async with aiohttp.ClientSession() as session:
436+
async with session.get(path) as resp:
437+
async for chunk in resp.content.iter_chunked(4096):
438+
yield chunk
439+
440+
headers = {
441+
"Cache-Control": "max-age=3600, stale-while-revalidate=86400",
442+
"Content-Length": f"{image_file.size}",
443+
"Etag": f'"{image_file.hash}"',
444+
}
445+
return StreamingResponse(reader(), media_type=image_file.mime, headers=headers)
446+
399447
async def list_collections(
400448
self,
401449
org: Organization,
@@ -497,6 +545,9 @@ async def list_collections(
497545
org, self.storage_ops
498546
)
499547

548+
res["orgName"] = org.name
549+
res["orgPublicProfile"] = org.enablePublicProfile
550+
500551
if public_colls_out:
501552
collections.append(PublicCollOut.from_dict(res))
502553
else:
@@ -839,6 +890,7 @@ async def stream_iter():
839890
file_prep.upload_name,
840891
stream_iter(),
841892
MIN_UPLOAD_PART_SIZE,
893+
mime=file_prep.mime,
842894
):
843895
print("Collection thumbnail stream upload failed", flush=True)
844896
raise HTTPException(status_code=400, detail="upload_failed")
@@ -962,9 +1014,11 @@ async def get_collection_all(org: Organization = Depends(org_viewer_dep)):
9621014
try:
9631015
all_collections, _ = await colls.list_collections(org, page_size=10_000)
9641016
for collection in all_collections:
965-
results[collection.name], _, _ = (
966-
await colls.get_collection_crawl_resources(collection.id)
967-
)
1017+
(
1018+
results[collection.name],
1019+
_,
1020+
_,
1021+
) = await colls.get_collection_crawl_resources(collection.id)
9681022
except Exception as exc:
9691023
# pylint: disable=raise-missing-from
9701024
raise HTTPException(
@@ -1162,6 +1216,24 @@ async def download_public_collection(
11621216

11631217
return await colls.download_collection(coll.id, org)
11641218

1219+
@app.get(
1220+
"/public/orgs/{org_slug}/collections/{coll_slug}/thumbnail",
1221+
tags=["collections", "public"],
1222+
response_class=StreamingResponse,
1223+
)
1224+
async def get_public_thumbnail(
1225+
org_slug: str,
1226+
coll_slug: str,
1227+
):
1228+
try:
1229+
org = await colls.orgs.get_org_by_slug(org_slug)
1230+
# pylint: disable=broad-exception-caught
1231+
except Exception:
1232+
# pylint: disable=raise-missing-from
1233+
raise HTTPException(status_code=404, detail="collection_not_found")
1234+
1235+
return await colls.get_public_thumbnail(coll_slug, org)
1236+
11651237
@app.post(
11661238
"/orgs/{oid}/collections/{coll_id}/home-url",
11671239
tags=["collections"],

backend/btrixcloud/crawlconfigs.py

+26
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
CrawlerChannel,
3737
CrawlerChannels,
3838
StartedResponse,
39+
SuccessResponse,
3940
CrawlConfigAddedResponse,
4041
CrawlConfigSearchValues,
4142
CrawlConfigUpdateResponse,
@@ -1036,6 +1037,21 @@ def get_warc_prefix(self, org: Organization, crawlconfig: CrawlConfig) -> str:
10361037
prefix = org.slug + "-" + name
10371038
return prefix[:80]
10381039

1040+
async def re_add_all_scheduled_cron_jobs(self):
1041+
"""Re-add all scheduled workflow cronjobs"""
1042+
match_query = {"schedule": {"$nin": ["", None]}, "inactive": {"$ne": True}}
1043+
async for config_dict in self.crawl_configs.find(match_query):
1044+
config = CrawlConfig.from_dict(config_dict)
1045+
try:
1046+
await self.crawl_manager.update_scheduled_job(config)
1047+
print(f"Updated cronjob for scheduled workflow {config.id}", flush=True)
1048+
# pylint: disable=broad-except
1049+
except Exception as err:
1050+
print(
1051+
f"Error updating cronjob for scheduled workflow {config.id}: {err}",
1052+
flush=True,
1053+
)
1054+
10391055

10401056
# ============================================================================
10411057
# pylint: disable=too-many-locals
@@ -1272,6 +1288,16 @@ async def make_inactive(cid: UUID, org: Organization = Depends(org_crawl_dep)):
12721288

12731289
return await ops.do_make_inactive(crawlconfig)
12741290

1291+
@app.post("/orgs/all/crawlconfigs/reAddCronjobs", response_model=SuccessResponse)
1292+
async def re_add_all_scheduled_cron_jobs(
1293+
user: User = Depends(user_dep),
1294+
):
1295+
if not user.is_superuser:
1296+
raise HTTPException(status_code=403, detail="Not Allowed")
1297+
1298+
asyncio.create_task(ops.re_add_all_scheduled_cron_jobs())
1299+
return {"success": True}
1300+
12751301
org_ops.router.include_router(router)
12761302

12771303
return ops

backend/btrixcloud/crawls.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -1026,9 +1026,7 @@ async def get_qa_run_for_replay(
10261026
if not org:
10271027
raise HTTPException(status_code=400, detail="missing_org")
10281028

1029-
resources = await self.resolve_signed_urls(
1030-
qa_run.files, org, crawl.id, qa_run_id
1031-
)
1029+
resources = await self.resolve_signed_urls(qa_run.files, org, crawl.id)
10321030

10331031
qa_run.files = []
10341032

0 commit comments

Comments
 (0)