def create_job(): """Creates a new job and returns its job_id.""" resp = requests.post(f"{BASE_URL}/api/jobs", headers={"Authorization": TOKEN}) resp.raise_for_status() return resp.json()["job_id"]
Minimal file upload example:
Copy
def get_signed_url(job_id, file_name, mime_type): """Retrieves a signed URL for uploading a file.""" url = f"{BASE_URL}/api/jobs/{job_id}/files/upload-url" payload = {"fileName": file_name, "fileType": mime_type} resp = requests.post(url, json=payload, headers={"Authorization": TOKEN}) resp.raise_for_status() return resp.json()["signedUrl"]def upload_file(job_id, file_path): """Uploads a file to the server using its signed URL.""" file_path = Path(file_path) mime_type, _ = mimetypes.guess_type(str(file_path)) mime_type = mime_type or "application/octet-stream" signed_url = get_signed_url(job_id, file_path.name, mime_type) with file_path.open("rb") as f: resp = requests.put(signed_url, data=f, headers={"Content-Type": mime_type}) resp.raise_for_status()
def create_job_yaml(job_id, input_file): """Creates a YAML job file with the required parameters.""" params = { "files": [Path(input_file).name], "model": "mini" } yaml_file = Path(f"{job_id}.yaml") with yaml_file.open("w") as f: yaml.safe_dump(params, f, default_flow_style=False) return yaml_file
Minimal file upload example:
Copy
def upload_file(job_id, file_path): """Uploads a file to the server using its signed URL.""" file_path = Path(file_path) mime_type, _ = mimetypes.guess_type(str(file_path)) mime_type = mime_type or "application/octet-stream" signed_url = get_signed_url(job_id, file_path.name, mime_type) with file_path.open("rb") as f: resp = requests.put(signed_url, data=f, headers={"Content-Type": mime_type}) resp.raise_for_status()