Skip to content

API

API Implemenation for the Customizer to start and control the payload processing.

lifespan(app) async

Lifespan function for FASTAPI to handle the startup and shutdown process.

Parameters:

Name Type Description Default
app FastAPI

The FastAPI application.

required
Source code in packages/pyxecm/src/pyxecm_api/app.py
@asynccontextmanager
async def lifespan(
    app: FastAPI,  # noqa: ARG001
) -> AsyncGenerator:
    """Lifespan function for FASTAPI to handle the startup and shutdown process.

    Args:
        app (FastAPI):
            The FastAPI application.

    """

    logger.debug("API settings -> %s", api_settings)

    with tracer.start_as_current_span("import_payloads"):
        if api_settings.import_payload:
            logger.info("Importing filesystem payloads...")

            # Base Payload
            import_payload(payload=api_settings.payload)

            # External Payload
            import_payload(payload_dir=api_settings.payload_dir, dependencies=True)

            # Optional Payload
            import_payload(payload_dir=api_settings.payload_dir_optional)

    logger.info("Starting maintenance page thread...")
    if api_settings.maintenance_page:
        run_maintenance_page()

    yield

    logger.info("Shutdown")
    PAYLOAD_LIST.stop_payload_processing()

run_api()

Start the FastAPI Webserver.

Source code in packages/pyxecm/src/pyxecm_api/app.py
def run_api() -> None:
    """Start the FastAPI Webserver."""

    # Check if Temp and Log dir exists
    if not os.path.exists(api_settings.temp_dir):
        os.makedirs(api_settings.temp_dir)
    if not os.path.exists(api_settings.logfolder):
        os.makedirs(api_settings.logfolder)

    # Check if Logfile and exists and is unique
    if os.path.isfile(os.path.join(api_settings.logfolder, api_settings.logfile)):
        customizer_start_time = datetime.now(UTC).strftime(
            "%Y-%m-%d_%H-%M",
        )
        api_settings.logfile = "customizer_{}.log".format(customizer_start_time)

    # Configure Logging for uvicorn
    log_config = uvicorn.config.LOGGING_CONFIG

    # Stdout
    log_config["formatters"]["standard"] = {
        "()": "uvicorn.logging.DefaultFormatter",
        "fmt": "%(levelprefix)s [%(name)s] [%(threadName)s] %(message)s",
        "use_colors": True,
    }

    log_config["formatters"]["logfile"] = {
        "()": "uvicorn.logging.DefaultFormatter",
        "fmt": "%(asctime)s %(levelname)s [%(name)s] [%(threadName)s] %(message)s",
        "datefmt": "%d-%b-%Y %H:%M:%S",
        "use_colors": True,
    }

    log_config["formatters"]["accesslog"] = {
        "()": "uvicorn.logging.AccessFormatter",
        "fmt": '%(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s',
        "use_colors": False,
    }

    log_config["handlers"]["console"] = {
        "formatter": "standard",
        "class": "logging.StreamHandler",
        "stream": "ext://sys.stdout",
    }

    log_config["handlers"]["file"] = {
        "formatter": "logfile",
        "class": "logging.FileHandler",
        "filename": os.path.join(api_settings.logfolder, api_settings.logfile),
        "mode": "a",
    }

    log_config["handlers"]["accesslog"] = {
        "formatter": "accesslog",
        "class": "logging.FileHandler",
        "filename": os.path.join(api_settings.logfolder, "access.log"),
        "mode": "a",
    }

    log_config["loggers"]["uvicorn.access"]["handlers"] = ["access", "accesslog"]

    log_config["loggers"]["root"] = {
        "level": api_settings.loglevel,
        "handlers": ["console", "file"],
    }

    logger.info("Starting processing thread...")
    PAYLOAD_LIST.run_payload_processing(concurrent=api_settings.concurrent_payloads)

    uvicorn.run(
        "pyxecm_api:app",
        host=api_settings.bind_address,
        port=api_settings.bind_port,
        workers=api_settings.workers,
        reload=api_settings.reload,
        proxy_headers=True,
        forwarded_allow_ips="*",
        log_config=log_config,
    )

Utility library to handle the authentication with OTDS.

login(form_data) async

Login using OTDS and return a token.

Source code in packages/pyxecm/src/pyxecm_api/auth/router.py
@router.post("/token", tags=["auth"])
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> JSONResponse:
    """Login using OTDS and return a token."""

    url = api_settings.otds_url + "/otdsws/rest/authentication/credentials"

    payload = json.dumps(
        {"userName": form_data.username, "password": form_data.password},
    )
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
    }

    try:
        response = requests.request(
            "POST",
            url,
            headers=headers,
            data=payload,
            timeout=10,
        )
    except requests.exceptions.ConnectionError as exc:
        raise HTTPException(
            status_code=500,
            detail=f"{exc.request.url} cannot be reached",
        ) from exc

    if response.ok:
        response = json.loads(response.text)
    else:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return JSONResponse(
        {
            "access_token": response["ticket"],
            "token_type": "bearer",
            "userId": response["userId"],
        },
    )

read_users_me(current_user) async

Get the current user.

current_user (User): The current user.

Source code in packages/pyxecm/src/pyxecm_api/auth/router.py
@router.get("/users/me", tags=["auth"])
async def read_users_me(current_user: Annotated[User, Depends(get_current_user)]) -> JSONResponse:
    """Get the current user.

    current_user (User):
        The current user.

    """

    if "otadmins@otds.admin" in current_user.groups:
        return JSONResponse(current_user.model_dump())
    else:
        raise HTTPException(
            status_code=403,
            detail=f"User {current_user.id} is not authorized",
        )

Models for FastAPI.

User

Bases: BaseModel

Model for users authenticated by OTDS.

Source code in packages/pyxecm/src/pyxecm_api/auth/models.py
class User(BaseModel):
    """Model for users authenticated by OTDS."""

    id: str
    full_name: str | None = None
    groups: list[str] | None = None
    is_admin: bool = False
    is_sysadmin: bool = False

Utility library to handle the authentication with OTDS.

get_authorized_user(current_user) async

Check if the user is authorized (member of the Group otadmin@otds.admin).

Source code in packages/pyxecm/src/pyxecm_api/auth/functions.py
async def get_authorized_user(current_user: Annotated[User, Depends(get_current_user)]) -> User:
    """Check if the user is authorized (member of the Group otadmin@otds.admin)."""

    if "otadmins@otds.admin" not in current_user.groups:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=f"User {current_user.id} is not authorized",
        )
    return current_user

get_current_user(token, api_key) async

Get the current user from OTDS and verify it.

Source code in packages/pyxecm/src/pyxecm_api/auth/functions.py
async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)], api_key: Annotated[str, Depends(apikey_header)]
) -> User:
    """Get the current user from OTDS and verify it."""

    if api_settings.api_key is not None and api_key == api_settings.api_key:
        return User(
            id="api",
            full_name="API Key",
            groups=["otadmins@otds.admin"],
            is_admin=True,
            is_sysadmin=True,
        )

    if token and token.startswith("*OTDSSSO*"):
        url = api_settings.otds_url + "/otdsws/rest/currentuser"
        headers = {
            "Accept": "application/json",
            "otdsticket": token,
        }
        response = requests.request("GET", url, headers=headers, timeout=2)

        if response.ok:
            response = json.loads(response.text)

            return User(
                id=response["user"]["id"],
                full_name=response["user"]["name"],
                groups=get_groups(response, token),
                is_admin=response["isAdmin"],
                is_sysadmin=response["isSysAdmin"],
            )

    raise HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid authentication credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

get_groups(response, token)

Get the groups of the user.

Parameters:

Name Type Description Default
response dict

description

required
token str

description

required

Returns:

Name Type Description
list list

description

Source code in packages/pyxecm/src/pyxecm_api/auth/functions.py
def get_groups(response: dict, token: str) -> list:
    """Get the groups of the user.

    Args:
        response (dict): _description_
        token (str): _description_

    Returns:
        list: _description_

    """

    headers = {
        "Accept": "application/json",
        "otdsticket": token,
    }
    url = api_settings.otds_url + "/otdsws/rest/users/" + response["user"]["id"] + "/memberof"

    response = requests.request("GET", url, headers=headers, timeout=5)
    if response.ok:
        response = json.loads(response.text)
        return [group["id"] for group in response.get("groups", [])]

    # Retur empty list if request wasn't successful
    return []

get_otcsticket(otcsticket) async

Check if the user is authorized (member of the Group otadmin@otds.admin).

Source code in packages/pyxecm/src/pyxecm_api/auth/functions.py
async def get_otcsticket(otcsticket: Annotated[str, Depends(otcsticket)]) -> User:
    """Check if the user is authorized (member of the Group otadmin@otds.admin)."""

    return otcsticket

API Implemenation for the Customizer to start and control the payload processing.

get_browser_automation_file(user, file)

Download the logfile for a specific payload.

Source code in packages/pyxecm/src/pyxecm_api/common/router.py
@router.get(path="/browser_automations/download", tags=["payload"])
def get_browser_automation_file(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    file: Annotated[str, Query(description="File name")],
) -> FileResponse:
    """Download the logfile for a specific payload."""

    filename = os.path.join(tempfile.gettempdir(), "browser_automations", file)

    if not os.path.isfile(filename):
        raise HTTPException(
            status_code=HTTPStatus.NOT_FOUND,
            detail="File -> '{}' not found".format(filename),
        )

    media_type, _ = mimetypes.guess_type(filename)

    with open(filename, "rb") as f:
        content = f.read()

    return Response(
        content,
        media_type=media_type,
        headers={
            "Content-Disposition": f'attachment; filename="{os.path.basename(filename)}"',
        },
    )

get_status() async

Get the status of the Customizer.

Source code in packages/pyxecm/src/pyxecm_api/common/router.py
@router.get(path="/status", name="Get Status")
async def get_status() -> CustomizerStatus:
    """Get the status of the Customizer."""

    df = PAYLOAD_LIST.get_payload_items()

    if df is None:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail="Payload list is empty.",
        )

    all_status = df["status"].value_counts().to_dict()

    return CustomizerStatus(
        version=2,
        customizer_duration=(all_status.get("running", None)),
        customizer_end_time=None,
        customizer_start_time=None,
        status_details=all_status,
        status="Running" if "running" in all_status else "Stopped",
        debug=df["log_debug"].sum(),
        info=df["log_info"].sum(),
        warning=df["log_warning"].sum(),
        error=df["log_error"].sum(),
        critical=df["log_critical"].sum(),
    )

list_browser_automation_files(user)

List all browser automation files.

Source code in packages/pyxecm/src/pyxecm_api/common/router.py
@router.get(path="/browser_automations/assets", tags=["payload"])
def list_browser_automation_files(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
) -> JSONResponse:
    """List all browser automation files."""

    result = list_files_in_directory(
        os.path.join(
            tempfile.gettempdir(),
            "browser_automations",
        )
    )

    return JSONResponse(result)

redirect_to_api(request) async

Redirect from / to /api.

Returns:

Type Description
RedirectResponse

None

Source code in packages/pyxecm/src/pyxecm_api/common/router.py
@router.get("/", include_in_schema=False)
async def redirect_to_api(request: Request) -> RedirectResponse:
    """Redirect from / to /api.

    Returns:
        None

    """
    return RedirectResponse(url=f"{request.url.path}api")

shutdown(user)

Endpoint to end the application.

Source code in packages/pyxecm/src/pyxecm_api/common/router.py
@router.get("/api/shutdown", include_in_schema=False)
def shutdown(user: Annotated[User, Depends(get_authorized_user)]) -> JSONResponse:
    """Endpoint to end the application."""

    logger.warning(
        "Shutting down the API - Requested via api by user -> %s",
        user.id,
    )
    os.kill(os.getpid(), signal.SIGTERM)

    return JSONResponse({"status": "shutdown"}, status_code=HTTPStatus.ACCEPTED)

Metics for payload logs.

payload_logs_by_payload(payload_list)

Metrics for payload logs by payload.

Source code in packages/pyxecm/src/pyxecm_api/common/metrics.py
def payload_logs_by_payload(payload_list: PayloadList) -> Callable[[Info], None]:
    """Metrics for payload logs by payload."""

    metrics_error = Gauge(
        "payload_error",
        "Number of ERROR log messages for by payload",
        labelnames=("index", "name", "logfile"),
    )

    metrics_warning = Gauge(
        "payload_warning",
        "Number of WARNING log messages for by payload",
        labelnames=("index", "name", "logfile"),
    )

    metrics_info = Gauge(
        "payload_info",
        "Number of INFO log messages for by payload",
        labelnames=("index", "name", "logfile"),
    )

    metrics_debug = Gauge(
        "payload_debug",
        "Number of DEBUG log messages for by payload",
        labelnames=("index", "name", "logfile"),
    )

    def instrumentation(info: Info) -> None:  # noqa: ARG001
        df = payload_list.get_payload_items()
        data = [{"index": idx, **row} for idx, row in df.iterrows()]

        for item in data:
            metrics_error.labels(item["index"], item["name"], item["logfile"]).set(
                item["log_error"],
            )
            metrics_warning.labels(item["index"], item["name"], item["logfile"]).set(
                item["log_warning"],
            )
            metrics_info.labels(item["index"], item["name"], item["logfile"]).set(
                item["log_info"],
            )
            metrics_debug.labels(item["index"], item["name"], item["logfile"]).set(
                item["log_debug"],
            )

    return instrumentation

payload_logs_total(payload_list)

Metrics for total payload logs messages.

Source code in packages/pyxecm/src/pyxecm_api/common/metrics.py
def payload_logs_total(payload_list: PayloadList) -> Callable[[Info], None]:
    """Metrics for total payload logs messages."""

    metrics_error = Gauge(
        "payload_error_total",
        "Total number of ERROR log messages",
    )

    metrics_warning = Gauge(
        "payload_warning_total",
        "Total number of WARNING log messages",
    )

    metrics_info = Gauge(
        "payload_info_total",
        "Total number of INFO log messages",
    )

    metrics_debug = Gauge(
        "payload_debug_total",
        "Total number of DEBUG log messages",
    )

    def instrumentation(info: Info) -> None:  # noqa: ARG001
        df = payload_list.get_payload_items()

        metrics_error.set(df["log_error"].sum())
        metrics_warning.set(df["log_warning"].sum())
        metrics_info.set(df["log_info"].sum())
        metrics_debug.set(df["log_debug"].sum())

    return instrumentation

Define common functions.

build_graph()

Build the knowledge Graph. And keep it updated every hour.

Source code in packages/pyxecm/src/pyxecm_api/common/functions.py
def build_graph() -> None:
    """Build the knowledge Graph. And keep it updated every hour."""

    def build() -> None:
        """Build the knowledge graph once."""

        logger.info("Starting knowledge graph build...")
        start_time = datetime.now(UTC)
        result = get_knowledgegraph_object().build_graph(
            workspace_type_exclusions=None,
            workspace_type_inclusions=[
                "Vendor",
                "Purchase Contract",
                "Purchase Order",
                "Material",
                "Customer",
                "Sales Order",
                "Sales Contract",
                "Delivery",
                "Goods Movement",
            ],
            workers=20,  # for multi-threaded traversal
            filter_at_traversal=True,  # also filter for workspace types if following relationships
            relationship_types=["child"],  # only go from parent to child
            strategy="BFS",  # Breadth-First-Search
            metadata=True,  # don't include workspace metadata
        )
        end_time = datetime.now(UTC)
        logger.info(
            "Knowledge graph completed in %s. Processed %d workspace nodes and traversed %d workspace relationships.",
            str(end_time - start_time),
            result["processed"],
            result["traversed"],
        )

    # Endless loop to build knowledge graph and update it every hour:
    while True:
        build()
        logger.info("Waiting for 1 hour before rebuilding the knowledge graph...")
        time.sleep(3600)

get_k8s_object()

Get an instance of a K8s object.

Returns:

Name Type Description
K8s K8s

Return a K8s object

Source code in packages/pyxecm/src/pyxecm_api/common/functions.py
def get_k8s_object() -> K8s:
    """Get an instance of a K8s object.

    Returns:
        K8s: Return a K8s object

    """

    return K8s(logger=logger, namespace=api_settings.namespace)

get_knowledgegraph_object()

Get the Knowledge Graph object.

Source code in packages/pyxecm/src/pyxecm_api/common/functions.py
def get_knowledgegraph_object() -> KnowledgeGraph:
    """Get the Knowledge Graph object."""

    global KNOWLEDGEGRAPH_OBJECT  # noqa: PLW0603

    if KNOWLEDGEGRAPH_OBJECT is None:
        KNOWLEDGEGRAPH_OBJECT = KnowledgeGraph(otcs_object=get_otcs_object(), ontology=KNOWLEDGEGRAPH_ONTOLOGY)

    return KNOWLEDGEGRAPH_OBJECT

get_ontology()

Get the ontology for the knowledge graph.

Returns:

Name Type Description
dict dict

The ontology as a dictionary.

Source code in packages/pyxecm/src/pyxecm_api/common/functions.py
def get_ontology() -> dict:
    """Get the ontology for the knowledge graph.

    Returns:
        dict: The ontology as a dictionary.

    """

    return KNOWLEDGEGRAPH_ONTOLOGY

get_otca_object(otcs_object=None)

Get the Content Aviator (OTCA) object.

Parameters:

Name Type Description Default
otcs_object OTCS | None

The Content Server (OTCS) object. Defaults to None.

None

Returns:

Name Type Description
OTCA OTCA

The new Content Aviator object.

Source code in packages/pyxecm/src/pyxecm_api/common/functions.py
def get_otca_object(otcs_object: OTCS | None = None) -> OTCA:
    """Get the Content Aviator (OTCA) object.

    Args:
        otcs_object (OTCS | None, optional):
            The Content Server (OTCS) object. Defaults to None.

    Returns:
        OTCA:
            The new Content Aviator object.

    """

    settings = Settings()

    # Get the Kubernetes object:
    k8s_object = get_k8s_object()
    content_system = {}
    # Read the content system (e.g. OTCM) from the Kubernetes Config Map:
    for service in ["chat", "embed"]:
        cm = k8s_object.get_config_map(f"csai-{service}-svc")
        if cm:
            content_system[service] = cm.data.get("CONTENT_SYSTEM", "none")
            logger.info("Set content system for '%s' to -> '%s'.", service, content_system[service])

    # Create the Content Aviator object (OTCA class):
    otca = OTCA(
        chat_url=str(settings.aviator.chat_svc_url),
        embed_url=str(settings.aviator.embed_svc_url),
        studio_url=str(settings.aviator.studio_url),
        otds_url=str(settings.otds.url_internal),
        client_id=settings.aviator.oauth_client,
        client_secret=settings.aviator.oauth_secret,
        otcs_object=otcs_object,
        content_system=content_system,
        logger=logger.getChild("otca"),
    )

    return otca

get_otcs_logs_lock()

Get the Logs LOCK dict.

Returns:

Type Description
dict

The dict with all LOCKS for the logs

Source code in packages/pyxecm/src/pyxecm_api/common/functions.py
def get_otcs_logs_lock() -> dict:
    """Get the Logs LOCK dict.

    Returns:
        The dict with all LOCKS for the logs

    """

    return LOGS_LOCK

get_otcs_object()

Get an instance of a Content Server (OTCS) object.

Returns:

Name Type Description
OTCS OTCS

Return a new OTCS object.

Source code in packages/pyxecm/src/pyxecm_api/common/functions.py
def get_otcs_object() -> OTCS:
    """Get an instance of a Content Server (OTCS) object.

    Returns:
        OTCS:
            Return a new OTCS object.

    """

    settings = Settings()

    otcs = OTCS(
        protocol=settings.otcs.url_backend.scheme,
        hostname=settings.otcs.url_backend.host,
        port=settings.otcs.url_backend.port,
        public_url=str(settings.otcs.url),
        username=settings.otcs.username,
        password=settings.otcs.password.get_secret_value(),
        user_partition=settings.otcs.partition,
        resource_name=settings.otcs.resource_name,
        base_path=settings.otcs.base_path,
        support_path=settings.otcs.support_path,
        download_dir=settings.otcs.download_dir,
        feme_uri=settings.otcs.feme_uri,
        logger=logger,
    )

    # Authenticate at Content Server:
    otcs.authenticate()

    return otcs

get_otcs_object_from_otcsticket(otcs_ticket)

Get an instance of a Content Server (OTCS) object.

Returns:

Name Type Description
OTCS OTCS

Return an OTCS object.

Source code in packages/pyxecm/src/pyxecm_api/common/functions.py
def get_otcs_object_from_otcsticket(otcs_ticket: Annotated[str, Depends(get_otcsticket)]) -> OTCS:
    """Get an instance of a Content Server (OTCS) object.

    Returns:
        OTCS:
            Return an OTCS object.

    """

    settings = Settings()

    # Create an OTCS object without defining the username and password:
    otcs = OTCS(
        # protocol=settings.otcs.url_backend.scheme,
        # hostname=settings.otcs.url_backend.host,
        # port=settings.otcs.url_backend.port,
        protocol=settings.otcs.url_frontend.scheme,
        hostname=settings.otcs.url_frontend.host,
        port=settings.otcs.url_frontend.port,
        public_url=str(settings.otcs.url),
        user_partition=settings.otcs.partition,
        resource_name=settings.otcs.resource_name,
        base_path=settings.otcs.base_path,
        support_path=settings.otcs.support_path,
        download_dir=settings.otcs.download_dir,
        feme_uri=settings.otcs.feme_uri,
        logger=logger,
    )

    # Instead set the OTCS authentication ticket directly:
    otcs._otcs_ticket = otcs_ticket  # noqa: SLF001

    return otcs

get_settings()

Get the API Settings object.

Returns:

Name Type Description
CustomizerPISettings CustomizerAPISettings

Returns the API Settings.

Source code in packages/pyxecm/src/pyxecm_api/common/functions.py
def get_settings() -> CustomizerAPISettings:
    """Get the API Settings object.

    Returns:
        CustomizerPISettings:
            Returns the API Settings.

    """

    return api_settings

list_files_in_directory(directory)

Recursively list files in a directory and return a nested JSON structure with URLs.

Source code in packages/pyxecm/src/pyxecm_api/common/functions.py
def list_files_in_directory(directory: str) -> dict:
    """Recursively list files in a directory and return a nested JSON structure with URLs."""

    result = {}
    for root, dirs, files in os.walk(directory):
        # Sort directories and files alphabetically
        dirs.sort()
        files.sort()

        current_level = result
        path_parts = root.split(os.sep)
        relative_path = os.path.relpath(root, directory)
        for part in path_parts[len(directory.split(os.sep)) :]:
            if part not in current_level:
                current_level[part] = {}
            current_level = current_level[part]
        for file in files:
            file_path = os.path.join(relative_path, file)
            current_level[file] = file_path

    return result

Define router for v1_maintenance.

embed_metadata(user, otcs_object, body)

Embed the Metadata of the given objects.

Parameters:

Name Type Description Default
user Annotated[User, Depends

User required for authentication.

required
otcs_object Annotated[OTCS, Depends(get_otcs_object)]

The OTCS object to interact with OTCM (Content Server).

required
body Annotated[CSAIEmbedMetadata, Body

The request body.

required

Returns:

Name Type Description
JSONResponse JSONResponse

JSONResponse with success=true/false

Source code in packages/pyxecm/src/pyxecm_api/v1_csai/router.py
@router.post("/metadata")
def embed_metadata(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    otcs_object: Annotated[OTCS, Depends(get_otcs_object)],
    body: Annotated[CSAIEmbedMetadata, Body()],
) -> JSONResponse:
    """Embed the Metadata of the given objects.

    Args:
        user (Annotated[User, Depends):
            User required for authentication.
        otcs_object (Annotated[OTCS, Depends(get_otcs_object)]):
            The OTCS object to interact with OTCM (Content Server).
        body (Annotated[CSAIEmbedMetadata, Body):
            The request body.

    Returns:
        JSONResponse:
            JSONResponse with success=true/false

    """

    success = otcs_object.aviator_embed_metadata(**body.model_dump())

    return JSONResponse({"success": success})

get_csai_config_data(user, k8s_object, settings)

Get the csai config data.

Source code in packages/pyxecm/src/pyxecm_api/v1_csai/router.py
@router.get("")
def get_csai_config_data(
    user: Annotated[User, Depends(get_authorized_user)],
    k8s_object: Annotated[K8s, Depends(get_k8s_object)],
    settings: Annotated[CustomizerAPISettings, Depends(get_settings)],
) -> JSONResponse:
    """Get the csai config data."""

    logger.info("Read CSAI config data by user -> %s", user.id)

    config_data = {}

    try:
        csai_config_maps = [
            cm for cm in k8s_object.list_config_maps().items if cm.metadata.name.startswith(settings.csai_prefix)
        ]

        for config_map in csai_config_maps:
            config_data[config_map.metadata.name] = config_map.data

    except Exception as e:
        logger.error("Could not read config data from k8s -> %s", e)
        return JSONResponse({"status": "error", "message": str(e)})

    return JSONResponse(config_data)

get_csai_graph(name)

Display the graph of the given name.

Parameters:

Name Type Description Default
name str

The name of the CSAI graph.

required

Returns:

Name Type Description
HTMLResponse HTMLResponse

Visualization of the CSAI graph

Source code in packages/pyxecm/src/pyxecm_api/v1_csai/router.py
@router.get("/graph")
def get_csai_graph(name: Annotated[str, Query(..., description="Name of the graph")]) -> HTMLResponse:
    """Display the graph of the given name.

    Args:
        name (str):
            The name of the CSAI graph.

    Returns:
        HTMLResponse: Visualization of the CSAI graph

    """

    # Get the Content Aviator object:
    otca = get_otca_object(otcs_object=None)

    # Get all graphs configured in Content Aviator:
    graphs = otca.get_graphs()
    # Find the graph (LangGraph) with the given name:
    graph = [g for g in graphs if g["name"] == name]

    if not graph:
        logger.error("Couldn't find graph -> '%s' for visualization!", name)
        raise HTTPException(status_code=404, detail="Graph -> '{}' not found!".format(name))

    try:
        filename = otca.visualize_graph(graph[0]["id"])

        with open(filename) as f:
            file_content = f.read()
    except Exception as e:
        logger.error("Error visualizing graph -> '%s': %s", name, str(e))
        raise HTTPException(status_code=500, detail="Failed to visualize graph -> '{}'!".format(name)) from e

    logger.info("Successfully visualized graph -> '%s'", name)

    return HTMLResponse(status_code=200, content=file_content)

set_csai_config_data(user, settings, k8s_object, config)

Set the CSAI config data.

Source code in packages/pyxecm/src/pyxecm_api/v1_csai/router.py
@router.post("")
def set_csai_config_data(
    user: Annotated[User, Depends(get_authorized_user)],
    settings: Annotated[CustomizerAPISettings, Depends(get_settings)],
    k8s_object: Annotated[K8s, Depends(get_k8s_object)],
    config: Annotated[dict, Body()],
) -> JSONResponse:
    """Set the CSAI config data."""

    logger.info("Write CSAI config data by user -> %s", user.id)

    for config_map in config:
        if not config_map.startswith(settings.csai_prefix):
            return JSONResponse(
                {
                    "status": "error",
                    "message": f"Config map name {config_map} does not start with {settings.csai_prefix}",
                },
                status_code=HTTPStatus.BAD_REQUEST,
            )

    try:
        for key, value in config.items():
            logger.info("User: %s -> Replacing config map %s with %s", user.id, key, value)
            k8s_object.replace_config_map(
                config_map_name=key,
                config_map_data=value,
            )

    except Exception as e:
        logger.error("Could not replace config map %s with %s -> %s", key, value, e)
        return JSONResponse({"status": "error", "message": str(e)})

    for deployment in ["chat-svc", "embed-svc", "embed-wrkr"]:
        deployment = f"{settings.csai_prefix}-{deployment}"

        logger.info("User: %s ->Restarting deployment -> %s", user.id, deployment)
        k8s_object.restart_deployment(deployment)

    return get_csai_config_data(user=user, k8s_object=k8s_object, settings=settings)

Define router for v1_maintenance.

set_maintenance_mode_options(user, k8s_object, config) async

Configure the Maintenance Mode and set options.

Parameters:

Name Type Description Default
user User

Added to enforce authentication requirement

required
k8s_object K8s

K8s object instance of pyxecm K8s class

required
config MaintenanceModel

instance of the Maintenance Model

required

Returns:

Name Type Description
dict MaintenanceModel

description

Source code in packages/pyxecm/src/pyxecm_api/v1_maintenance/router.py
@router.post(path="")
async def set_maintenance_mode_options(
    user: Annotated[models.User, Depends(get_authorized_user)],  # noqa: ARG001
    k8s_object: Annotated[K8s, Depends(get_k8s_object)],
    config: Annotated[MaintenanceModel, Form()],
) -> MaintenanceModel:
    """Configure the Maintenance Mode and set options.

    Args:
        user (models.User):
            Added to enforce authentication requirement
        k8s_object (K8s):
            K8s object instance of pyxecm K8s class
        config (MaintenanceModel):
            instance of the Maintenance Model

    Returns:
        dict: _description_

    """
    # Enable / Disable the acutual Maintenance Mode
    set_maintenance_mode_via_ingress(config.enabled, k8s_object)

    if config.title:
        maint_settings.title = config.title

    if config.text:
        maint_settings.text = config.text

    if config.footer:
        maint_settings.footer = config.footer

    return get_maintenance_mode_status(k8s_object)

status(user, k8s_object) async

Return status of maintenance mode.

Parameters:

Name Type Description Default
user User

Added to enforce authentication requirement

required
k8s_object K8s

K8s object instance of pyxecm K8s class

required

Returns:

Name Type Description
dict MaintenanceModel

Details of maintenance mode.

Source code in packages/pyxecm/src/pyxecm_api/v1_maintenance/router.py
@router.get(path="")
async def status(
    user: Annotated[models.User, Depends(get_authorized_user)],  # noqa: ARG001
    k8s_object: Annotated[K8s, Depends(get_k8s_object)],
) -> MaintenanceModel:
    """Return status of maintenance mode.

    Args:
        user (models.User):
            Added to enforce authentication requirement
        k8s_object (K8s):
            K8s object instance of pyxecm K8s class


    Returns:
        dict:
            Details of maintenance mode.

    """

    return get_maintenance_mode_status(k8s_object)

Define Models for the Maintenance Page Config.

MaintenanceModel

Bases: BaseModel

Status object of the Maintenance Page.

Source code in packages/pyxecm/src/pyxecm_api/v1_maintenance/models.py
class MaintenanceModel(BaseModel):
    """Status object of the Maintenance Page."""

    enabled: bool
    title: str | None = ""
    text: str | None = ""
    footer: str | None = ""

Define functions for v1_maintenance.

get_cshost(k8s_object)

Get the cs_hostname from the environment Variable OTCS_PUBLIC_HOST otherwise read it from the otcs-frontend-configmap.

Source code in packages/pyxecm/src/pyxecm_api/v1_maintenance/functions.py
def get_cshost(k8s_object: K8s) -> str:
    """Get the cs_hostname from the environment Variable OTCS_PUBLIC_HOST otherwise read it from the otcs-frontend-configmap."""

    if "OTCS_PUBLIC_URL" in os.environ:
        return os.getenv("OTCS_PUBLIC_URL", "otcs")

    else:
        cm = k8s_object.get_config_map("otcs-frontend-configmap")

        if cm is None:
            raise HTTPException(
                status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
                detail=f"Could not read otcs-frontend-configmap from namespace: {k8s_object.get_namespace()}",
            )

        config_file = cm.data.get("config.yaml")
        config = yaml.safe_load(config_file)

        try:
            cs_url = HttpUrl(config.get("csurl"))
        except ValidationError as ve:
            raise HTTPException(
                status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
                detail="Could not read otcs_host from environment variable OTCS_PULIBC_URL or configmap otcs-frontend-configmap/config.yaml/cs_url",
            ) from ve
        return cs_url.host

get_maintenance_mode_status(k8s_object)

Get status of maintenance mode.

Returns:

Name Type Description
dict dict

Details of maintenance mode.

Source code in packages/pyxecm/src/pyxecm_api/v1_maintenance/functions.py
def get_maintenance_mode_status(k8s_object: K8s) -> dict:
    """Get status of maintenance mode.

    Returns:
        dict:
            Details of maintenance mode.

    """
    ingress = k8s_object.get_ingress("otxecm-ingress")

    if ingress is None:
        raise HTTPException(
            status_code=500,
            detail="No ingress object found to read Maintenance Mode status",
        )

    enabled = False
    for rule in ingress.spec.rules:
        if rule.host == get_cshost(k8s_object):
            enabled = rule.http.paths[0].backend.service.name != "otcs-frontend"

    return MaintenanceModel(
        enabled=enabled, title=maint_settings.title, text=maint_settings.text, footer=maint_settings.footer
    )

set_maintenance_mode_via_ingress(enabled, k8s_object)

Set maintenance mode.

Source code in packages/pyxecm/src/pyxecm_api/v1_maintenance/functions.py
def set_maintenance_mode_via_ingress(enabled: bool, k8s_object: K8s) -> None:
    """Set maintenance mode."""

    logger.warning(
        "Setting Maintenance Mode to -> %s",
        (enabled),
    )

    if enabled:
        k8s_object.update_ingress_backend_services(
            "otxecm-ingress",
            get_cshost(k8s_object=k8s_object),
            "customizer",
            5555,
        )
    else:
        k8s_object.update_ingress_backend_services(
            "otxecm-ingress",
            get_cshost(k8s_object=k8s_object),
            "otcs-frontend",
            80,
        )

Define router for v1_otcs.

delete_otcs_log_file(user, settings, file_name) async

Delete single OTCS log archive.

Source code in packages/pyxecm/src/pyxecm_api/v1_otcs/router.py
@router.delete("/logs/{file_name}", tags=["otcs"])
async def delete_otcs_log_file(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    settings: Annotated[CustomizerAPISettings, Depends(get_settings)],
    file_name: str,
) -> FileResponse:
    """Delete single OTCS log archive."""
    file_path = os.path.join(settings.upload_folder, file_name)

    if not os.path.exists(file_path):
        raise HTTPException(status_code=404, detail="File not found")

    try:
        os.remove(file_path)
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"{e}") from e

    return JSONResponse({"message": f"Successfully deleted {file_name}"}, status_code=HTTPStatus.OK)

delete_otcs_log_files(user, settings) async

Delete all otcs log files.

Source code in packages/pyxecm/src/pyxecm_api/v1_otcs/router.py
@router.delete("/logs", tags=["otcs"])
async def delete_otcs_log_files(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    settings: Annotated[CustomizerAPISettings, Depends(get_settings)],
) -> JSONResponse:
    """Delete all otcs log files."""
    shutil.rmtree(settings.upload_folder)
    return JSONResponse({"message": "Successfully deleted all files"}, status_code=HTTPStatus.OK)

get_otcs_log_file(user, settings, file_name) async

Download OTCS log archive.

Source code in packages/pyxecm/src/pyxecm_api/v1_otcs/router.py
@router.get("/logs/{file_name}", tags=["otcs"])
async def get_otcs_log_file(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    settings: Annotated[CustomizerAPISettings, Depends(get_settings)],
    file_name: str,
) -> FileResponse:
    """Download OTCS log archive."""
    file_path = os.path.join(settings.upload_folder, file_name)
    if not os.path.exists(file_path):
        raise HTTPException(status_code=404, detail="File not found")

    return FileResponse(file_path, media_type="application/octet-stream", filename=file_name)

get_otcs_log_files(user, settings, k8s_object, otcs_logs_lock) async

List all otcs logs that can be downloaded.

Source code in packages/pyxecm/src/pyxecm_api/v1_otcs/router.py
@router.get("/logs", tags=["otcs"])
async def get_otcs_log_files(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    settings: Annotated[CustomizerAPISettings, Depends(get_settings)],
    k8s_object: Annotated[K8s, Depends(get_k8s_object)],
    otcs_logs_lock: Annotated[dict[str, Lock], Depends(get_otcs_logs_lock)],
) -> JSONResponse:
    """List all otcs logs that can be downloaded."""

    os.makedirs(settings.upload_folder, exist_ok=True)

    files = []
    for filename in sorted(os.listdir(settings.upload_folder)):
        file_path = os.path.join(settings.upload_folder, filename)
        if os.path.isfile(file_path):
            file_size = os.path.getsize(file_path)
            files.append({"filename": filename, "size": file_size})

    response = {"status": {host: bool(otcs_logs_lock[host].locked()) for host in otcs_logs_lock}, "files": files}

    # Extend response with all hosts
    for sts in ["otcs-admin", "otcs-frontend", "otcs-backend-search", "otcs-da"]:
        try:
            sts_replicas = k8s_object.get_stateful_set_scale(sts)

            if sts_replicas is None:
                logger.debug("Cannot get statefulset {sts}")
                continue

            for i in range(sts_replicas.status.replicas):
                host = f"{sts}-{i}"

                if host in otcs_logs_lock:
                    response["status"][host] = otcs_logs_lock[host].locked()
                else:
                    response["status"][host] = False

        except Exception as e:
            raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR) from e

    return JSONResponse(response, status_code=HTTPStatus.OK)

post_otcs_log_file(file, settings, key='') async

Upload a file to disk.

Parameters:

Name Type Description Default
file Annotated[UploadFile, File(...)]

File to be uploaded.

required
settings Annotated[CustomizerAPISettings, Depends(get_settings)]

CustomizerAPISettings.

required
key str

Key to validate the upload.

''

Returns:

Name Type Description
JSONResponse JSONResponse

Status of the upload

Source code in packages/pyxecm/src/pyxecm_api/v1_otcs/router.py
@router.post("/logs/upload", tags=["otcs"], include_in_schema=True)
async def post_otcs_log_file(
    file: Annotated[UploadFile, File(...)],
    settings: Annotated[CustomizerAPISettings, Depends(get_settings)],
    key: str = "",
) -> JSONResponse:
    """Upload a file to disk.

    Args:
        file: File to be uploaded.
        settings: CustomizerAPISettings.
        key: Key to validate the upload.

    Returns:
        JSONResponse: Status of the upload

    """
    if key != settings.upload_key:
        raise HTTPException(status_code=403, detail="Invalid Uploadkey")

    os.makedirs(settings.upload_folder, exist_ok=True)

    try:
        async with await anyio.open_file(os.path.join(settings.upload_folder, file.filename), "wb") as f:
            # Process the file in chunks instead of loading the entire file into memory
            while True:
                chunk = await file.read(65536)  # Read 64KB at a time
                if not chunk:
                    break
                await f.write(chunk)
    except Exception as e:
        raise HTTPException(status_code=500, detail="Something went wrong") from e
    finally:
        await file.close()

    return {"message": f"Successfully uploaded {file.filename}"}

put_otcs_logs(user, k8s_object, settings, otcs_logs_lock, hosts) async

Collect the logs from the given OTCS instances.

Source code in packages/pyxecm/src/pyxecm_api/v1_otcs/router.py
@router.put(path="/logs", tags=["otcs"])
async def put_otcs_logs(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    k8s_object: Annotated[K8s, Depends(get_k8s_object)],
    settings: Annotated[CustomizerAPISettings, Depends(get_settings)],
    otcs_logs_lock: Annotated[dict[str, Lock], Depends(get_otcs_logs_lock)],
    hosts: Annotated[list[str], Body()],
) -> JSONResponse:
    """Collect the logs from the given OTCS instances."""

    if "all" in hosts:
        hosts = []
        for sts in ["otcs-admin", "otcs-frontend", "otcs-backend-search", "otcs-da"]:
            try:
                sts_replicas = k8s_object.get_stateful_set_scale(sts)

                if sts_replicas is None:
                    logger.debug("Cannot get statefulset {sts}")
                    continue

                hosts.extend([f"{sts}-{i}" for i in range(sts_replicas.status.replicas)])
            except Exception as e:
                raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR) from e

    msg = {}
    for host in hosts:
        if host not in otcs_logs_lock:
            otcs_logs_lock[host] = Lock()

        if not otcs_logs_lock[host].locked():
            Thread(target=collect_otcs_logs, args=(host, k8s_object, otcs_logs_lock[host], settings)).start()
            msg[host] = {"status": "ok", "message": "Logs are being collected"}
        else:
            msg[host] = {"status": "error", "message": "Logs are already being collected"}

    status = (
        HTTPStatus.TOO_MANY_REQUESTS if any(msg[host]["status"] == "error" for host in msg) else HTTPStatus.ACCEPTED
    )
    return JSONResponse(msg, status_code=status)

Define functions for v1_otcs.

collect_otcs_logs(host, k8s_object, logs_lock, settings)

Collect the logs for the given OTCS instance.

Source code in packages/pyxecm/src/pyxecm_api/v1_otcs/functions.py
def collect_otcs_logs(host: str, k8s_object: K8s, logs_lock: Lock, settings: CustomizerAPISettings) -> None:
    """Collect the logs for the given OTCS instance."""

    with logs_lock:
        timestamp = datetime.now(tz=UTC).strftime("%Y-%m-%d_%H-%M")
        tgz_file = f"/tmp/{timestamp}_{host}.tar.gz"  # noqa: S108

        if host.startswith("otcs-frontend"):
            container = "otcs-frontend-container"
        elif host.startswith("otcs-backend-search"):
            container = "otcs-backend-search-container"
        elif host.startswith("otcs-admin"):
            container = "otcs-admin-container"
        elif host.startswith("otcs-da"):
            container = "otcs-da-container"
        else:
            container = None

        logger.info("Collecting logs for %s", host)
        k8s_object.exec_pod_command(
            pod_name=host,
            command=["tar", "-czvf", tgz_file, "/opt/opentext/cs/logs", "/opt/opentext/cs_persist/contentserver.log"],
            container=container,
            timeout=1800,
        )

        logger.info("Uploading logs for %s", host)
        k8s_object.exec_pod_command(
            pod_name=host,
            command=[
                "curl",
                "-X",
                "POST",
                "-F",
                f"file=@{tgz_file}",
                f"{settings.upload_url}?key={settings.upload_key}",
            ],
            container=container,
            timeout=1800,
        )

        logger.info("Cleanup logs for %s", host)
        k8s_object.exec_pod_command(
            pod_name=host,
            command=["rm", tgz_file],
            container=container,
        )

Define router for v1_payload.

create_payload_item(user, settings, upload_file, name='', dependencies=None, enabled=True, loglevel='INFO')

Upload a new payload item.

Parameters:

Name Type Description Default
user User

The user who is uploading the payload. Defaults to None.

required
settings CustomizerAPISettings

The settings object.

required
upload_file UploadFile

The file to upload. Defaults to File(...).

required
name str

The name of the payload (if not provided we will use the file name).

''
dependencies list of integers

List of other payload items this item depends on.

None
enabled bool

Flag indicating if the payload is enabled or not.

True
loglevel str

The loglevel for the payload processing. Defaults to "INFO".

'INFO'

Raises:

Type Description
HTTPException

Raised, if payload list is not initialized.

Returns:

Name Type Description
dict PayloadListItem

The HTTP response.

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/router.py
@router.post(path="")
def create_payload_item(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    settings: Annotated[CustomizerAPISettings, Depends(get_settings)],
    upload_file: Annotated[UploadFile, File(...)],
    name: Annotated[str, Form()] = "",
    dependencies: Annotated[list[int] | list[str] | None, Form()] = None,
    enabled: Annotated[bool, Form()] = True,
    loglevel: Annotated[
        Literal["DEBUG", "INFO", "WARNING"] | None,
        Form(
            description="Loglevel for the Payload processing",
        ),
    ] = "INFO",
) -> PayloadListItem:
    """Upload a new payload item.

    Args:
        user (User, optional):
            The user who is uploading the payload. Defaults to None.
        settings (CustomizerAPISettings):
            The settings object.
        upload_file (UploadFile, optional):
            The file to upload. Defaults to File(...).
        name (str, optional):
            The name of the payload (if not provided we will use the file name).
        dependencies (list of integers):
            List of other payload items this item depends on.
        enabled (bool):
            Flag indicating if the payload is enabled or not.
        loglevel (str, optional):
            The loglevel for the payload processing. Defaults to "INFO".

    Raises:
        HTTPException:
            Raised, if payload list is not initialized.

    Returns:
        dict:
            The HTTP response.

    """
    if dependencies:
        dependencies = prepare_dependencies(dependencies)

    # Set name if not provided
    name = name or os.path.splitext(os.path.basename(upload_file.filename))[0]
    file_extension = os.path.splitext(upload_file.filename)[1]
    file_name = os.path.join(settings.temp_dir, f"{name}{file_extension}")

    with open(file_name, "wb") as buffer:
        shutil.copyfileobj(upload_file.file, buffer)

    if dependencies == [-1]:
        dependencies = []

    return PayloadListItem(
        PAYLOAD_LIST.add_payload_item(
            name=name,
            filename=file_name,
            status="planned",
            logfile=os.path.join(settings.temp_dir, "{}.log".format(name)),
            dependencies=dependencies or [],
            enabled=enabled,
            loglevel=loglevel,
        )
    )

delete_payload_item(user, payload_id) async

Delete an existing payload item.

Parameters:

Name Type Description Default
user Optional[User]

User performing the update.

required
payload_id int

The ID of the payload to update.

required

Returns:

Name Type Description
dict JSONResponse

response or None

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/router.py
@router.delete(path="/{payload_id}")
async def delete_payload_item(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    payload_id: int,
) -> JSONResponse:
    """Delete an existing payload item.

    Args:
        user (Optional[User]): User performing the update.
        payload_id (int): The ID of the payload to update.

    Returns:
        dict: response or None

    """

    # Check if the payload exists
    result = PAYLOAD_LIST.remove_payload_item(payload_id)
    if not result:
        raise HTTPException(
            status_code=HTTPStatus.NOT_FOUND,
            detail="Payload with ID -> {} not found.".format(payload_id),
        )

download_payload_content(user, payload_id)

Download the payload for a specific payload item.

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/router.py
@router.get(path="/{payload_id}/download")
def download_payload_content(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    payload_id: int,
) -> FileResponse:
    """Download the payload for a specific payload item."""

    payload = PAYLOAD_LIST.get_payload_item(index=payload_id)

    if payload is None:
        raise HTTPException(
            status_code=HTTPStatus.NOT_FOUND,
            detail="Payload with ID -> {} not found!".format(payload_id),
        )

    if not os.path.isfile(payload.filename):
        raise HTTPException(
            status_code=HTTPStatus.NOT_FOUND,
            detail="Payload file -> '{}' not found".format(payload.filename),
        )

    with open(payload.filename, encoding="UTF-8") as file:
        content = file.read()

        if payload.filename.endswith(".gz.b64"):
            content = base64.b64decode(content)
            content = gzip.decompress(content)

    return Response(
        content,
        media_type="application/octet-stream",
        headers={
            "Content-Disposition": f'attachment; filename="{os.path.basename(payload.filename.removesuffix(".gz.b64"))}"',
        },
    )

download_payload_logfile(user, payload_id)

Download the logfile for a specific payload.

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/router.py
@router.get(path="/{payload_id}/log")
def download_payload_logfile(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    payload_id: int,
) -> FileResponse:
    """Download the logfile for a specific payload."""

    payload = PAYLOAD_LIST.get_payload_item(index=payload_id)

    if payload is None:
        raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="Payload not found")

    filename = payload.logfile

    if not os.path.isfile(filename):
        raise HTTPException(
            status_code=HTTPStatus.NOT_FOUND,
            detail="Log file -> '{}' not found".format(filename),
        )
    with open(filename, encoding="UTF-8") as file:
        content = file.read()
    return Response(
        content,
        media_type="application/octet-stream",
        headers={
            "Content-Disposition": f'attachment; filename="{os.path.basename(filename)}"',
        },
    )

get_payload_content(user, payload_id) async

Get a payload item based on its ID.

Parameters:

Name Type Description Default
user Annotated[User, Depends(get_authorized_user)]

Annotated[User, Depends(get_authorized_user)]

required
payload_id int

The payload item ID.

required

Raises:

Type Description
HTTPException

A payload item with the given ID couldn't be found.

Returns:

Name Type Description
dict dict | None

HTTP response.

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/router.py
@router.get(path="/{payload_id}/content")
async def get_payload_content(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    # pylint: disable=unused-argument
    payload_id: int,
) -> dict | None:
    """Get a payload item based on its ID.

    Args:
        user: Annotated[User, Depends(get_authorized_user)]
        payload_id (int):
            The payload item ID.

    Raises:
        HTTPException:
            A payload item with the given ID couldn't be found.

    Returns:
        dict:
            HTTP response.

    """

    data = PAYLOAD_LIST.get_payload_item(index=payload_id)

    if data is None:
        raise HTTPException(
            status_code=HTTPStatus.NOT_FOUND,
            detail="Payload with ID -> {} not found!".format(payload_id),
        )

    filename = data.filename

    return load_payload(payload_source=filename)

get_payload_item(user, payload_id) async

Get a payload item based on its ID.

Parameters:

Name Type Description Default
user Annotated[User, Depends(get_authorized_user)]

Annotated[User, Depends(get_authorized_user)]

required
payload_id int

payload item ID

required

Raises:

Type Description
HTTPException

a payload item with the given ID couldn't be found

Returns:

Name Type Description
dict PayloadListItem

HTTP response.

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/router.py
@router.get(path="/{payload_id}")
async def get_payload_item(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    payload_id: int,
) -> PayloadListItem:
    """Get a payload item based on its ID.

    Args:
        user: Annotated[User, Depends(get_authorized_user)]
        payload_id (int): payload item ID

    Raises:
        HTTPException: a payload item with the given ID couldn't be found

    Returns:
        dict:
            HTTP response.

    """
    data = PAYLOAD_LIST.get_payload_item(index=payload_id)

    if data is None:
        raise HTTPException(
            status_code=HTTPStatus.NOT_FOUND,
            detail="Payload with index -> {} not found".format(payload_id),
        )

    return PayloadListItem(index=payload_id, **data, asd="123")

get_payload_items(user) async

Get all Payload items.

Raises:

Type Description
HTTPException

payload list not initialized

HTTPException

payload list is empty

Returns:

Name Type Description
dict PayloadListItems

HTTP response with the result data

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/router.py
@router.get(path="")
async def get_payload_items(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
) -> PayloadListItems:
    """Get all Payload items.

    Raises:
        HTTPException: payload list not initialized
        HTTPException: payload list is empty

    Returns:
        dict:
            HTTP response with the result data

    """

    df = PAYLOAD_LIST.get_payload_items()

    if df is None:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail="Payload list is empty.",
        )

    data = [PayloadListItem(index=idx, **row) for idx, row in df.iterrows()]

    stats = {
        "count": len(df),
        "status": df["status"].value_counts().to_dict(),
        "logs": {
            "debug": df["log_debug"].sum(),
            "info": df["log_info"].sum(),
            "warning": df["log_warning"].sum(),
            "error": df["log_error"].sum(),
            "critical": df["log_critical"].sum(),
        },
    }

    return PayloadListItems(stats=stats, results=data)

move_payload_item_down(user, payload_id) async

Move a payload item down in the list.

Parameters:

Name Type Description Default
user Annotated[User, Depends(get_authorized_user)]

Annotated[User, Depends(get_authorized_user)]

required
payload_id int

The payload item ID.

required

Raises:

Type Description
HTTPException

a payload item with the given ID couldn't be found

Returns:

Name Type Description
dict dict

HTTP response

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/router.py
@router.put(path="/{payload_id}/down")
async def move_payload_item_down(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    payload_id: int,
) -> dict:
    """Move a payload item down in the list.

    Args:
        user: Annotated[User, Depends(get_authorized_user)]
        payload_id (int):
            The payload item ID.

    Raises:
        HTTPException: a payload item with the given ID couldn't be found

    Returns:
        dict: HTTP response

    """

    position = PAYLOAD_LIST.move_payload_item_down(index=payload_id)

    if position is None:
        raise HTTPException(
            status_code=HTTPStatus.NOT_FOUND,
            detail="Payload item with index -> {} is either out of range or is already on bottom of the payload list!".format(
                payload_id,
            ),
        )

    return {"result": {"new_position": position}}

move_payload_item_up(user, payload_id) async

Move a payload item up in the list.

Parameters:

Name Type Description Default
user Annotated[User, Depends(get_authorized_user)]

Annotated[User, Depends(get_authorized_user)]

required
payload_id int

payload item ID

required

Raises:

Type Description
HTTPException

a payload item with the given ID couldn't be found

Returns:

Name Type Description
dict dict

HTTP response

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/router.py
@router.put(path="/{payload_id}/up")
async def move_payload_item_up(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    payload_id: int,
) -> dict:
    """Move a payload item up in the list.

    Args:
        user: Annotated[User, Depends(get_authorized_user)]
        payload_id (int): payload item ID

    Raises:
        HTTPException: a payload item with the given ID couldn't be found

    Returns:
        dict: HTTP response

    """

    position = PAYLOAD_LIST.move_payload_item_up(index=payload_id)

    if position is None:
        raise HTTPException(
            status_code=HTTPStatus.NOT_FOUND,
            detail="Payload item with index -> {} is either out of range or is already on top of the payload list!".format(
                payload_id,
            ),
        )

    return {"result": {"new_position": position}}

stream_logfile(user, payload_id) async

Stream the logfile and follow changes.

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/router.py
@router.get(path="/{payload_id}/log/stream")
async def stream_logfile(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    payload_id: int,
) -> StreamingResponse:
    """Stream the logfile and follow changes."""

    payload = PAYLOAD_LIST.get_payload_item(index=payload_id)

    if payload is None:
        raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="Payload not found")

    filename = payload.logfile

    if os.path.isfile(filename):
        return StreamingResponse(tail_log(filename), media_type="text/plain")

    raise HTTPException(status_code=HTTPStatus.NOT_FOUND)

update_payload_item(user, payload_id, name=None, dependencies=None, enabled=None, status=None, loglevel=None, customizer_settings=None) async

Update an existing payload item.

Parameters:

Name Type Description Default
user Optional[User]

User performing the update.

required
payload_id int

ID of the payload to update.

required
upload_file UploadFile

replace the file name

required
name Optional[str]

Updated name.

None
dependencies Optional[List[int]]

Updated list of dependencies.

None
enabled Optional[bool]

Updated enabled status.

None
loglevel Optional[str]

Updated loglevel.

None
status Optional[str]

Updated status.

None
customizer_settings Optional[str]

Updated customizer settings.

None

Returns:

Name Type Description
dict UpdatedPayloadListItem

HTTP response with the updated payload details.

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/router.py
@router.put(path="/{payload_id}")
async def update_payload_item(
    user: Annotated[User, Depends(get_authorized_user)],  # noqa: ARG001
    payload_id: int,
    name: Annotated[str | None, Form()] = None,
    dependencies: Annotated[list[int] | list[str] | None, Form()] = None,
    enabled: Annotated[bool | None, Form()] = None,
    status: Annotated[Literal["planned", "completed"] | None, Form()] = None,
    loglevel: Annotated[
        Literal["DEBUG", "INFO", "WARNING"] | None,
        Form(
            description="Loglevel for the Payload processing",
        ),
    ] = None,
    customizer_settings: Annotated[str | None, Form()] = None,
) -> UpdatedPayloadListItem:
    """Update an existing payload item.

    Args:
        user (Optional[User]): User performing the update.
        payload_id (int): ID of the payload to update.
        upload_file (UploadFile, optional): replace the file name
        name (Optional[str]): Updated name.
        dependencies (Optional[List[int]]): Updated list of dependencies.
        enabled (Optional[bool]): Updated enabled status.
        loglevel (Optional[str]): Updated loglevel.
        status (Optional[str]): Updated status.
        customizer_settings (Optional[str]): Updated customizer settings.

    Returns:
        dict: HTTP response with the updated payload details.

    """

    if dependencies:
        dependencies = prepare_dependencies(dependencies)
    # Check if the payload exists
    payload_item = PAYLOAD_LIST.get_payload_item(
        payload_id,
    )  # Assumes a method to retrieve payload by ID
    if payload_item is None:
        raise HTTPException(
            status_code=HTTPStatus.NOT_FOUND,
            detail="Payload with ID -> {} not found.".format(payload_id),
        )

    update_data = {}

    # Update fields if provided
    if name is not None:
        update_data["name"] = name
    if dependencies is not None:
        update_data["dependencies"] = dependencies
    if enabled is not None:
        update_data["enabled"] = enabled
    if status is not None:
        update_data["status"] = status
    if loglevel is not None:
        update_data["loglevel"] = loglevel

        thread_logger = logging.getLogger(name=f"Payload_{payload_id}")
        thread_logger.setLevel(loglevel)

    if customizer_settings is not None:
        try:
            update_data["customizer_settings"] = json.loads(customizer_settings)
        except Exception as e:
            raise HTTPException(detail=e, status_code=HTTPStatus.BAD_REQUEST) from e

    if "status" in update_data and update_data["status"] == "planned":
        logger.info("Resetting log message counters for -> %s", payload_id)
        update_data["log_debug"] = 0
        update_data["log_info"] = 0
        update_data["log_warning"] = 0
        update_data["log_error"] = 0
        update_data["log_critical"] = 0

        update_data["start_time"] = None
        update_data["stop_time"] = None
        update_data["duration"] = None

        data = PAYLOAD_LIST.get_payload_item(index=payload_id)
        if os.path.isfile(data.logfile):
            logger.info(
                "Deleting log file (for payload) -> %s (%s)",
                data.logfile,
                payload_id,
            )

            now = datetime.now(UTC)
            old_log_name = (
                os.path.dirname(data.logfile)
                + "/"
                + os.path.splitext(os.path.basename(data.logfile))[0]
                + now.strftime("_%Y-%m-%d_%H-%M-%S.log")
            )

            os.rename(data.logfile, old_log_name)

    # Save the updated payload back to the list (or database)
    result = PAYLOAD_LIST.update_payload_item(
        index=payload_id,
        update_data=update_data,
    )  # Assumes a method to update the payload
    if not result:
        raise HTTPException(
            status_code=HTTPStatus.NOT_FOUND,
            detail="Failed to update Payload with ID -> {} with data -> {}".format(
                payload_id,
                update_data,
            ),
        )

    return UpdatedPayloadListItem(
        message="Payload updated successfully",
        payload=PayloadListItem(index=payload_id, **PAYLOAD_LIST.get_payload_item(index=payload_id)),
        updated_fields=update_data,
    )

Define Models for Payload.

PayloadListItem

Bases: BaseModel

Defines PayloadListItem Model.

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/models.py
class PayloadListItem(BaseModel):
    """Defines PayloadListItem Model."""

    index: int
    name: str
    filename: str
    dependencies: list
    logfile: str
    status: str
    enabled: bool
    git_url: str | None
    loglevel: str = "INFO"
    start_time: Any | None
    stop_time: Any | None
    duration: Any | None
    log_debug: int = 0
    log_info: int = 0
    log_warning: int = 0
    log_error: int = 0
    log_critical: int = 0
    customizer_settings: dict = {}

PayloadListItems

Bases: BaseModel

Defines PayloadListItems Model.

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/models.py
class PayloadListItems(BaseModel):
    """Defines PayloadListItems Model."""

    stats: PayloadStats
    results: list[PayloadListItem]

PayloadStats

Bases: BaseModel

Defines PayloadStats Model.

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/models.py
class PayloadStats(BaseModel):
    """Defines PayloadStats Model."""

    count: int = 0
    status: dict = {}
    logs: dict = {}

UpdatedPayloadListItem

Bases: BaseModel

Defines UpdatedPayloadListItem Model.

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/models.py
class UpdatedPayloadListItem(BaseModel):
    """Defines UpdatedPayloadListItem Model."""

    message: str
    payload: PayloadListItem
    updated_fields: dict

API Implemenation for the Customizer to start and control the payload processing.

import_payload(payload=None, payload_dir=None, enabled=None, dependencies=None)

Automatically load payload items from disk of a given directory.

Parameters:

Name Type Description Default
payload str

The name of the payload.

None
payload_dir str

The local path.

None
enabled bool

Automatically start the processing (True), or only define items (False). Defaults to False.

None
dependencies bool

Automatically add dependency on the last payload in the queue

None
Source code in packages/pyxecm/src/pyxecm_api/v1_payload/functions.py
def import_payload(
    payload: str | None = None,
    payload_dir: str | None = None,
    enabled: bool | None = None,
    dependencies: bool | None = None,
) -> None:
    """Automatically load payload items from disk of a given directory.

    Args:
        payload (str):
            The name of the payload.
        payload_dir (str):
            The local path.
        enabled (bool, optional):
            Automatically start the processing (True), or only define items (False).
            Defaults to False.
        dependencies (bool, optional):
            Automatically add dependency on the last payload in the queue

    """

    def import_payload_file(
        filename: str,
        enabled: bool | None,
        dependencies: bool | None,
    ) -> None:
        if not os.path.isfile(filename):
            return

        if not (filename.endswith((".yaml", ".tfvars", ".tf", ".yml.gz.b64"))):
            logger.debug("Skipping file: %s", filename)
            return

        # Load payload file
        payload_content = load_payload(filename)
        if payload_content is None:
            exception = f"The import of payload -> {filename} failed. Payload content could not be loaded."
            raise PayloadImportError(exception)

        payload_options = payload_content.get("payloadOptions", {})

        if enabled is None:
            enabled = payload_options.get("enabled", True)

        # read name from options section if specified, otherwise take filename
        name = payload_options.get("name", os.path.basename(filename))

        # Get the loglevel from payloadOptions if set, otherwise use the default loglevel
        loglevel = payload_options.get("loglevel", api_settings.loglevel)

        # Get the git_url
        git_url = payload_options.get("git_url", None)

        # Dependency Management
        if dependencies is None:
            dependencies = []

            # Get all dependencies from payloadOptions and resolve their ID
            for dependency_name in payload_options.get("dependencies", []):
                dependend_item = PAYLOAD_LIST.get_payload_item_by_name(dependency_name)

                if dependend_item is None:
                    exception = (
                        f"The import of payload -> {name} failed. Dependencies cannot be resovled: {dependency_name}",
                    )
                    raise PayloadImportError(
                        exception,
                    )
                # Add the ID to the list of dependencies
                dependencies.append(dependend_item["index"])

        elif dependencies:
            try:
                payload_items = len(PAYLOAD_LIST.get_payload_items()) - 1
                dependencies = [payload_items] if payload_items != -1 else []
            except Exception:
                dependencies = []
        else:
            dependencies = []

        customizer_settings = payload_content.get("customizerSettings", {})

        logger.info("Adding payload: %s", filename)
        payload = PAYLOAD_LIST.add_payload_item(
            name=name,
            filename=filename,
            status="planned",
            logfile=f"{api_settings.logfolder}/{name}.log",
            dependencies=dependencies,
            enabled=enabled,
            git_url=git_url,
            loglevel=loglevel,
            customizer_settings=customizer_settings,
        )
        dependencies = payload["index"]

        return

    if payload is None and payload_dir is None:
        exception = "No payload or payload_dir provided"
        raise ValueError(exception)

    if payload and os.path.isdir(payload) and payload_dir is None:
        payload_dir = payload

    if payload_dir is None:
        try:
            import_payload_file(payload, enabled, dependencies)
        except PayloadImportError as exc:
            logger.error(exc)
            logger.debug(exc, exc_info=True)
        return

    elif not os.path.isdir(payload_dir):
        return

    for filename in sorted(os.listdir(payload_dir)):
        try:
            with tracer.start_as_current_span("import_payload") as t:
                t.set_attribute("payload", filename)
                import_payload_file(os.path.join(payload_dir, filename), enabled, dependencies)
        except PayloadImportError as exc:
            logger.error(exc)
            logger.debug(exc, exc_info=True)

prepare_dependencies(dependencies)

Convert the dependencies string to a list of integers.

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/functions.py
def prepare_dependencies(dependencies: list) -> list | None:
    """Convert the dependencies string to a list of integers."""
    try:
        list_all = dependencies[0].split(",")
    except IndexError:
        return None

    # Remove empty values from the list
    items = list(filter(None, list_all))
    converted_list = []
    for item in items:
        try:
            converted_list.append(int(item))
        except ValueError:
            continue

    return converted_list

tail_log(file_path) async

Asynchronously follow the log file like tail -f.

Source code in packages/pyxecm/src/pyxecm_api/v1_payload/functions.py
async def tail_log(file_path: str) -> AsyncGenerator[str]:
    """Asynchronously follow the log file like `tail -f`."""
    try:
        async with await anyio.open_file(file_path) as file:
            # Move the pointer to the end of the file
            await file.seek(0, os.SEEK_END)

            while True:
                # Read new line
                line = await file.readline()
                if not line:
                    # Sleep for a little while before checking for new lines
                    await asyncio.sleep(0.5)
                    continue
                yield line
    except asyncio.exceptions.CancelledError:
        pass