Commit f1478ca0 authored by Mustafa Gezen's avatar Mustafa Gezen 🏗
Browse files

move from background_tasks to aio-pika. add periodic tasks

parent 0f09750e
......@@ -2,6 +2,11 @@
### Development
### Kerberos
```
kinit -R -kt {PATH_TO_KEYTAB} koji/distrobuild@ROCKYLINUX.ORG -S HTTP/koji.rockylinux.org@ROCKYLINUX.org
```
#### UI
```
cd ui
......@@ -18,3 +23,10 @@ pip install -r requirements.txt
aerich upgrade
uvicorn distrobuild.app:app --reload --port 8090
```
#### Scheduler
```
virtualenv .venv
source .venv/bin/activate
python3 run_scheduler.py
```
import asyncio
from tortoise import Tortoise
Tortoise.init_models(["distrobuild.models"], "distrobuild")
......@@ -10,16 +12,18 @@ from tortoise.contrib.fastapi import register_tortoise
from distrobuild import settings
from distrobuild.routes import register_routes
# init sessions
from distrobuild import session
from distrobuild_scheduler import init_channel
app = FastAPI()
app.mount("/static/files", StaticFiles(directory="ui/dist/files"), name="static")
register_routes(app)
templates = Jinja2Templates(directory="ui/dist/templates")
@app.get("/{full_path:path}", response_class=HTMLResponse, include_in_schema=False)
async def serve_frontend(request: Request):
return templates.TemplateResponse("index.html", {
......@@ -28,7 +32,13 @@ async def serve_frontend(request: Request):
"gitlab_url": f"https://{settings.settings.gitlab_host}{settings.settings.repo_prefix}"
})
@app.on_event("startup")
async def startup():
await init_channel(asyncio.get_event_loop())
register_tortoise(
app,
config=settings.TORTOISE_ORM
app,
config=settings.TORTOISE_ORM
)
import json
import re
from distrobuild.models import Package, PackageModule, Repo
async def process_repo_dump(repo: Repo) -> None:
with open(f"/tmp/{repo}_ALL.txt", "r") as f:
lines = f.readlines()
......@@ -12,6 +12,7 @@ async def process_repo_dump(repo: Repo) -> None:
continue
await Package.create(name=package_name, el8=True, is_package=True, repo=repo)
async def process_module_dump() -> None:
f = open("/tmp/modules.json")
module_list = json.loads(f.read())
......
import datetime
from typing import List
from enum import Enum
from tortoise import Model, Tortoise, fields
from tortoise import Model, fields
class Repo(str, Enum):
BASEOS = "BASEOS"
APPSTREAM = "APPSTREAM"
POWERTOOLS = "POWERTOOLS"
class BuildStatus(str, Enum):
QUEUED = "QUEUED"
BUILDING = "BUILDING"
......@@ -17,6 +16,7 @@ class BuildStatus(str, Enum):
SUCCEEDED = "SUCCEEDED"
CANCELLED = "CANCELLED"
class Package(Model):
id = fields.BigIntField(pk=True)
created_at = fields.DatetimeField(auto_now_add=True)
......@@ -37,16 +37,19 @@ class Package(Model):
class PydanticMeta:
backward_relations = False
class PackageModule(Model):
id = fields.BigIntField(pk=True)
created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_add=True, null=True)
package = fields.ForeignKeyField("distrobuild.Package", on_delete="RESTRICT", related_name="m_subpackages")
module_parent_package = fields.ForeignKeyField("distrobuild.Package", on_delete="RESTRICT", related_name="m_module_parent_pacakges")
module_parent_package = fields.ForeignKeyField("distrobuild.Package", on_delete="RESTRICT",
related_name="m_module_parent_pacakges")
class PydanticMeta:
backward_relations = False
class Build(Model):
id = fields.BigIntField(pk=True)
created_at = fields.DatetimeField(auto_now_add=True)
......@@ -56,10 +59,13 @@ class Build(Model):
mbs = fields.BooleanField(default=False)
koji_id = fields.BigIntField(null=True)
mbs_id = fields.BigIntField(null=True)
commit = fields.CharField(max_length=255)
branch = fields.CharField(max_length=255)
class Meta:
table = "builds"
class Import(Model):
id = fields.BigIntField(pk=True)
created_at = fields.DatetimeField(auto_now_add=True)
......@@ -67,7 +73,22 @@ class Import(Model):
package = fields.ForeignKeyField("distrobuild.Package", on_delete="CASCADE")
status = fields.CharEnumField(BuildStatus)
version = fields.IntField()
commit = fields.CharField(max_length=255, null=True)
module = fields.BooleanField(default=False)
commits: fields.ReverseRelation["ImportCommit"] = fields.ReverseRelation
class Meta:
table = "imports"
class PydanticMeta:
backward_relations = False
class ImportCommit(Model):
id = fields.BigIntField(pk=True)
commit = fields.CharField(max_length=255)
branch = fields.CharField(max_length=255)
import_ = fields.ForeignKeyField("distrobuild.Import", on_delete="CASCADE")
class Meta:
table = "import_commits"
......@@ -4,6 +4,7 @@ from distrobuild.routes import build, packages, bootstrap
_base_router = APIRouter(prefix="/api")
def register_routes(app):
_base_router.include_router(packages.router)
_base_router.include_router(bootstrap.router)
......
import datetime
from typing import Optional, List
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import PlainTextResponse
from fastapi_pagination import Page, pagination_params
from fastapi_pagination.ext.tortoise import paginate
from tortoise.transactions import atomic
from pydantic import BaseModel, validator
from distrobuild.models import Build, Import, Package, PackageModule, BuildStatus
from distrobuild.pydantic import Build_Pydantic, Import_Pydantic
from distrobuild.models import Build, Import, ImportCommit, Package, PackageModule, BuildStatus
from distrobuild.serialize import Build_Pydantic
from distrobuild.session import gl, koji_session
from distrobuild.settings import settings
from distrobuild import srpmproc
from distrobuild_scheduler import import_package_task, build_package_task
router = APIRouter(prefix="/build")
class BuildRequest(BaseModel):
package_id: Optional[int]
package_name: Optional[str]
......@@ -31,87 +26,43 @@ class BuildRequest(BaseModel):
raise ValueError('either package_id or package_name is required')
return package_name
class BatchBuildRequest(BaseModel):
packages: List[BuildRequest]
def gen_body_filters(body: BuildRequest) -> dict:
if body.get("package_id"):
return {"id": body["package_id"]}
if body.get("package_name"):
return {"name": body["package_name"]}
@atomic()
async def do_build_task(package: Package, build: Build):
if build.mbs:
pass
else:
scratch = False
target = "dist-rocky8"
latest_import = await Import.filter(package_id=package.id).order_by("-created_at").first()
source = f"git+https://{settings.gitlab_host}{settings.repo_prefix}/rpms/{package.name}.git?#{latest_import.commit}"
task_id = koji_session.build(source, target)
build.koji_id = task_id
build.status = BuildStatus.BUILDING
await build.save()
async def build_task(package: Package, build: Build):
try:
await do_build_task(package, build)
except Exception as e:
print(e)
build.status = BuildStatus.FAILED
await build.save()
@atomic()
async def do_import_task(package: Package, import_obj: Import):
koji_session.packageListAdd("dist-rocky8", package.name, "distrobuild")
import_obj.status = BuildStatus.BUILDING
await import_obj.save()
await srpmproc.import_project(import_obj.id, package.name)
package.last_import = datetime.datetime.now()
await package.save()
def gen_body_filters(body_in) -> dict:
body = BuildRequest(**body_in)
if body.package_id:
return {"id": body.package_id}
if body.package_name:
return {"name": body.package_name}
project = gl.projects.get(f"{settings.repo_prefix.removeprefix('/')}/rpms/{package.name}")
project.visibility = "public"
project.save()
latest_commit = project.commits.list(ref_name=f"r{import_obj.version}")[0].id
import_obj.status = BuildStatus.SUCCEEDED
import_obj.commit = latest_commit
await import_obj.save()
async def import_task(package: Package, import_obj: Import):
try:
await do_import_task(package, import_obj)
except Exception as e:
print(e)
import_obj.status = BuildStatus.FAILED
await import_obj.save()
@router.get("/", response_model=Page[Build_Pydantic], dependencies=[Depends(pagination_params)])
async def list_builds():
return await paginate(Build.all().order_by('-created_at').prefetch_related("package"))
# response_model causes some weird errors with Import. why?
# TODO: find out (removing response_model for now)
@router.get("/imports/", dependencies=[Depends(pagination_params)])
async def list_imports():
return await paginate(Import.all().order_by('-created_at').prefetch_related("package"))
@router.get("/imports/{id}/logs", response_class=PlainTextResponse)
async def get_import_logs(id: int):
import_obj = await Import.filter(id=id).get()
with open(f"/tmp/import-{import_obj.id}.log") as f:
return f.read()
@router.get("/imports/{import_id}/logs", response_class=PlainTextResponse)
async def get_import_logs(import_id: int):
import_obj = await Import.filter(id=import_id).get()
try:
with open(f"/tmp/import-{import_obj.id}.log") as f:
return f.read()
except FileNotFoundError:
raise HTTPException(404, detail="import not started")
@router.post("/", status_code=202)
async def queue_build(body: BuildRequest, background_tasks: BackgroundTasks):
async def queue_build(body: BuildRequest):
filters = gen_body_filters(body)
package = await Package.filter(**filters).get_or_none()
if not package:
......@@ -122,35 +73,56 @@ async def queue_build(body: BuildRequest, background_tasks: BackgroundTasks):
if len(package_modules) > 0:
mbs = True
build = await Build.create(package_id=package.id, status=BuildStatus.QUEUED, mbs=mbs)
background_tasks.add_task(build_task, package, build)
latest_import = await Import.filter(package_id=package.id).order_by("-created_at").first()
import_commits = await ImportCommit.filter(import__id=latest_import.id).all()
for import_commit in import_commits:
if "-beta" not in import_commit.branch:
build = await Build.create(package_id=package.id, status=BuildStatus.QUEUED, mbs=mbs,
commit=import_commit.commit, branch=import_commit.branch)
await build_package_task(package.id, build.id)
return {}
@router.post("/batch", status_code=202)
async def batch_queue_build(body: BatchBuildRequest, background_tasks: BackgroundTasks):
async def batch_queue_build(body: BatchBuildRequest):
for build_request in body.packages:
await queue_build(build_request, background_tasks)
await queue_build(build_request)
return {}
@router.post("/imports/", status_code=202)
async def import_package(body: BuildRequest, background_tasks: BackgroundTasks):
async def import_package_route(body: BuildRequest):
filters = gen_body_filters(body)
package = await Package.filter(**filters).get_or_none()
if not package:
raise HTTPException(404, detail="package does not exist")
import_obj = await Import.create(package_id=package.id, status=BuildStatus.QUEUED, version=8)
if package.is_package:
package_import = await Import.create(package_id=package.id, status=BuildStatus.QUEUED, version=8)
await import_package_task(package.id, package_import.id)
if package.is_module:
subpackages = await PackageModule.filter(module_parent_package_id=package.id).all()
all_packages_imported = True
for subpackage in subpackages:
imports = await Import.filter(package_id=subpackage.package_id).all()
if not imports or len(imports) == 0:
all_packages_imported = False
await import_package_route(BuildRequest(package_id=subpackage.package_id))
background_tasks.add_task(import_task, package, import_obj)
if all_packages_imported:
package_import = await Import.create(package_id=package.id, status=BuildStatus.QUEUED, version=8,
module=True)
await import_package_task(package.id, package_import.id)
return {}
@router.post("/imports/batch", status_code=202)
async def batch_import_package(body: BatchBuildRequest, background_tasks: BackgroundTasks):
async def batch_import_package(body: BatchBuildRequest):
for build_request in body.packages:
await import_package(build_request, background_tasks)
await import_package_route(build_request)
return {}
from typing import List, Optional
from typing import Optional
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from fastapi_pagination import Page, pagination_params
from fastapi_pagination.ext.tortoise import paginate
from distrobuild.models import Package
from distrobuild.pydantic import Package_Pydantic
from distrobuild.serialize import Package_Pydantic
router = APIRouter(prefix="/packages")
@router.get("/", response_model=Page[Package_Pydantic], dependencies=[Depends(pagination_params)])
async def list_packages(name: Optional[str] = None, modules_only: bool = False, non_modules_only: bool = False):
filters = {}
......
......@@ -10,6 +10,7 @@ from distrobuild.settings import settings
gl = gitlab.Gitlab(f"https://{settings.gitlab_host}", private_token=settings.gitlab_api_key)
# from https://pagure.io/koji/blob/master/f/cli/koji_cli/lib.py
def ensure_connection(session):
try:
......@@ -18,7 +19,8 @@ def ensure_connection(session):
raise Exception("Error: Unable to connect to server")
if ret != koji.API_VERSION:
print("WARNING: The server is at API version %d and "
"the client is at %d" % (ret, koji.API_VERSION))
"the client is at %d" % (ret, koji.API_VERSION))
def activate_session(session, options):
"""Test and login the session is applicable"""
......@@ -33,8 +35,8 @@ def activate_session(session, options):
# authenticate using SSL client cert
session.ssl_login(options.cert, None, options.serverca, proxyuser=runas)
elif options.authtype == "password" \
or getattr(options, 'user', None) \
and options.authtype is None:
or getattr(options, 'user', None) \
and options.authtype is None:
# authenticate using user/password
session.login()
elif options.authtype == "kerberos" or options.authtype is None:
......@@ -51,8 +53,10 @@ def activate_session(session, options):
ensure_connection(session)
if getattr(options, 'debug', None):
print("successfully connected to hub")
# end
koji_config = koji.read_config("koji")
koji_session = koji.ClientSession(settings.koji_hub_url, koji_config)
koji_session = koji.ClientSession(koji_config["server"], koji_config)
activate_session(koji_session, koji_config)
from typing import Optional
from enum import Enum
from pydantic import BaseSettings
class Settings(BaseSettings):
gitlab_host: str
koji_hub_url: str
repo_prefix: str
storage_addr: str
ssh_user: str = "git"
......@@ -13,11 +12,14 @@ class Settings(BaseSettings):
version: int = 8
bugs_api_key: str
gitlab_api_key: str
broker_url: str
routing_key: str = "distrobuild"
workers: int = 10
class Config:
env_file = "/etc/distrobuild/settings"
settings = Settings()
TORTOISE_ORM = {
......
import asyncio
import json
from distrobuild.settings import settings
async def import_project(import_id: int, source_rpm: str):
async def import_project(import_id: int, source_rpm: str, module_mode: bool = False) -> dict:
upstream_prefix = f"ssh://{settings.ssh_user}@{settings.gitlab_host}:{settings.ssh_port}{settings.repo_prefix}"
args = [
......@@ -22,9 +24,26 @@ async def import_project(import_id: int, source_rpm: str):
args.append("--ssh-key-location")
args.append(settings.ssh_key_location)
if module_mode:
args.append("--module-mode")
f = open(f"/tmp/import-{import_id}.log", "w")
proc = await asyncio.create_subprocess_exec("srpmproc", *args, stdout=f, stderr=f)
proc = await asyncio.create_subprocess_exec("srpmproc", *args, stdout=asyncio.subprocess.PIPE, stderr=f)
last_line = ""
while True:
line = (await proc.stdout.readline()).decode('utf-8')
f.write(line)
if proc.stdout.at_eof():
break
last_line = line
await proc.wait()
f.close()
if proc.returncode != 0:
raise Exception("srpmproc failed")
else:
return json.loads(last_line.strip())
import json
import logging
from typing import Optional
import aio_pika
from distrobuild.settings import settings
from distrobuild_scheduler import import_package
# singleton
connection: Optional[aio_pika.RobustConnection] = None
channel: Optional[aio_pika.Channel] = None
logger = logging.getLogger("distrobuild_scheduler")
logging.basicConfig()
logger.setLevel(logging.INFO)
async def init_channel(loop) -> None:
global channel
global connection
connection = await aio_pika.connect_robust(settings.broker_url, loop=loop)
logger.info("[*] Connected to {}".format(settings.broker_url))
channel = await connection.channel()
async def import_package_task(package_id: int, import_id: int):
msg_body = {
"message": "import_package",
"package_id": package_id,
"import_id": import_id,
}
encoded = json.dumps(msg_body).encode()
await channel.default_exchange.publish(
aio_pika.Message(
body=encoded,
),
routing_key=settings.routing_key,
)
async def build_package_task(package_id: int, build_id: int):
msg_body = {
"message": "build_package",
"package_id": package_id,
"build_id": build_id,
}
encoded = json.dumps(msg_body).encode()
await channel.default_exchange.publish(
aio_pika.Message(
body=encoded,
),
routing_key=settings.routing_key,
)
from tortoise.transactions import atomic
from distrobuild.models import Build, BuildStatus, Package
from distrobuild.session import koji_session
from distrobuild.settings import settings
@atomic()
async def do(package: Package, build: Build):
if build.mbs:
pass
else:
target = "dist-rocky8"
source = f"git+https://{settings.gitlab_host}{settings.repo_prefix}/rpms/{package.name}.git?#{build.commit}"
task_id = koji_session.build(source, target)
build.koji_id = task_id
build.status = BuildStatus.BUILDING
await build.save()
# noinspection DuplicatedCode
async def task(package_id: int, build_id):
build = await Build.filter(id=build_id).get()
package = await Package.filter(id=package_id).get()
try:
await do(package, build)
except Exception as e:
print(e)
build.status = BuildStatus.FAILED
finally:
await build.save()
await package.save()