import time
import requests
import os
# initialize all necessary variables
base_url = "https://api.tokenfactory.nebius.com"
token = os.environ["NEBIUS_API_KEY"]
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
}
records = [
{
"prompt": [
{
"role": "assistant",
"content": "You are a helpful assistant.",
},
{
"role": "user",
"content": "What is a capital of The Netherlands?",
},
],
"custom_id": "1",
},
{
"prompt": [
{
"role": "assistant",
"content": [
{
"text": "You are a helpful assistant.",
"type": "text",
},
],
},
{
"role": "user",
"content": "What should I do if it’s raining and I forgot my umbrella?",
},
],
"custom_id": "3",
},
]
# upload dataset
# it's possible to upload arbitrarily datasets with arbitrary column names and use them in inference or fine-tuning
response = requests.post(
f"{base_url}/v1/datasets",
json={
"name": "Example Batch Inference Dataset",
"dataset_schema": [
{
"name": "prompt",
"type": {
"name": "json"
},
},
{
"name": "custom_id",
"type": {
"name": "string",
},
},
],
"folder": "/demo",
"rows": records,
},
headers=headers,
)
response.raise_for_status()
print("Dataset uploaded:")
print(response.json())
source_dataset_id = response.json()["id"]
source_dataset_version_id = response.json()["current_version"]
# run batch inference
# here we are able to use arbitrary columns from the dataset uploaded above using the mapping
response = requests.post(
f"{base_url}/v1/operations",
json={
"type": "batch_inference",
"src": [
{
"id": source_dataset_id,
"version": source_dataset_version_id,
"mapping": {
"type": "text_messages",
"messages": {
"type": "column",
"name": "prompt", # you can use any column that contains JSON in the appropriate format
},
"custom_id": { # optional
"type": "column",
"name": "custom_id",
},
"max_tokens": { # optional
"type": "text",
"value": "32000",
}
},
},
],
"dst": [],
"params": {
"model": "openai/gpt-oss-20b",
"completion_window": "12h",
},
},
headers=headers,
)
response.raise_for_status()
print("Batch inference started:")
print(response.json())
dst_dataset_id = response.json()["dst"][0]["id"]
operation_id = response.json()["id"]
# wait for operation to complete
is_running = True
while is_running:
status_response = requests.get(
f"{base_url}/v1/operations/{operation_id}",
headers=headers,
)
status_response.raise_for_status()
status_data = status_response.json()
is_running = status_data["status"] in {"queued", "running"}
print(f"Operation status: {status_data['status']}")
time.sleep(5)
# download results
response = requests.get(
f"{base_url}/v1/datasets/{dst_dataset_id}/export?format=jsonl", # csv is also supported, use limit and offset for big datasets
headers=headers,
)
response.raise_for_status()
print("Batch inference results:")
print(response.text)