Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"regions",
"ssh",
"stackscripts",
"streams",
"tickets",
"tags",
"users",
Expand Down
122 changes: 122 additions & 0 deletions tests/integration/streams/fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import os
from typing import Any, Dict

import pytest
import requests

from tests.integration.helpers import (
BASE_CMDS,
delete_target_id,
exec_test_command,
get_random_region_with_caps,
get_random_text,
)


def get_object_storage_buckets() -> Dict[str, Any]:
token = os.getenv("LINODE_CLI_TOKEN")
if token is None:
raise ValueError("LINODE_CLI_TOKEN environment variable is not set. ")
host = os.getenv("LINODE_CLI_API_HOST")
if token is None:
raise ValueError(
"LINODE_CLI_API_HOST environment variable is not set. "
)
version = os.getenv("LINODE_CLI_API_VERSION")
if token is None:
raise ValueError(
"LINODE_CLI_API_VERSION environment variable is not set. "
)

url = f"https://{host}/{version}/object-storage/buckets"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}

response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()

return response.json()


@pytest.fixture(scope="session")
def create_stream(create_destination_akamai_object_storage_type):
label = get_random_text(8) + "_stream_cli_test"
result_create_stream = exec_test_command(
BASE_CMDS["streams"]
+ [
"create",
"--label",
label,
"--type",
"audit_logs",
"--destinations",
create_destination_akamai_object_storage_type[0],
"--delimiter",
",",
"--text",
]
).splitlines()
yield result_create_stream
delete_target_id("streams", str(result_create_stream[1]), "delete")


@pytest.fixture
def create_object_storage_keys():
label = get_random_text(8) + "_destination_cli_test"
test_bucket = get_object_storage_buckets()["data"][0]
result_keys = exec_test_command(
BASE_CMDS["object-storage"]
+ [
"keys-create",
"--label",
label,
"--bucket_access",
'[{"region": "'
+ test_bucket["region"]
+ '", "bucket_name": "'
+ test_bucket["label"]
+ '", "permissions": "read_write" }]',
"--text",
"--no-headers",
"--delimiter",
",",
"--format",
"access_key,id",
]
).split(",")
yield result_keys[0], test_bucket["label"], test_bucket["s3_endpoint"]
delete_target_id("object-storage", str(result_keys[1]), "keys-delete")


@pytest.fixture(scope="function")
def create_destination_akamai_object_storage_type(create_object_storage_keys):
get_random_region_with_caps(required_capabilities=["Linodes"])

label = get_random_text(8) + "_destination_cli_test"
result_create_destination = exec_test_command(
BASE_CMDS["streams"]
+ [
"destination-create",
"--label",
label,
"--type",
"akamai_object_storage",
"--details.host",
create_object_storage_keys[2],
"--details.bucket_name",
create_object_storage_keys[1],
"--details.path",
"audit-logs",
"--details.access_key_id",
create_object_storage_keys[0],
"--delimiter",
",",
"--text",
]
).splitlines()
yield result_create_destination
delete_target_id(
"streams", str(result_create_destination[0]), "destination-delete"
)
158 changes: 158 additions & 0 deletions tests/integration/streams/test_destination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from linodecli.exit_codes import ExitCodes
from tests.integration.helpers import (
BASE_CMDS,
assert_headers_in_lines,
exec_failing_test_command,
exec_test_command,
)
from tests.integration.streams.fixtures import (
create_destination_akamai_object_storage_type,
create_object_storage_keys,
)


def test_list_destinations():
result = exec_test_command(
BASE_CMDS["streams"]
+ ["destinations-list", "--delimiter", ",", "--text"]
)
lines = result.splitlines()
headers = [
"created",
"created_by",
"details",
"id",
"label",
"status",
"type",
"updated",
"updated_by",
"version",
]
assert_headers_in_lines(headers, lines)


def test_create_destination_error():
result = exec_failing_test_command(
BASE_CMDS["streams"]
+ [
"destination-create",
"--label",
"test",
"--type",
"custom_https",
"--details",
"test",
"--delimiter",
",",
"--text",
],
expected_code=ExitCodes.REQUEST_FAILED,
)
assert "Request failed: 400" in result
assert "details,Must be of type Object" in result


def test_view_destination_error():
result = exec_failing_test_command(
BASE_CMDS["streams"]
+ [
"destination-view",
"1",
"--delimiter",
",",
"--text",
],
expected_code=ExitCodes.REQUEST_FAILED,
)
assert "Request failed: 404" in result
assert "Destination not found" in result


def test_destination_history_view_error():
result = exec_failing_test_command(
BASE_CMDS["streams"]
+ [
"destination-history-view",
"1",
"--delimiter",
",",
"--text",
],
expected_code=ExitCodes.REQUEST_FAILED,
)
assert "Request failed: 404" in result
assert "Destination not found" in result


def test_update_destination_error():
result = exec_failing_test_command(
BASE_CMDS["streams"]
+ [
"destination-update",
"1",
"--details",
"test",
"--delimiter",
",",
"--text",
],
expected_code=ExitCodes.REQUEST_FAILED,
)
assert "Request failed: 404" in result
assert "Destination not found" in result


def test_remove_destination_error():
result = exec_failing_test_command(
BASE_CMDS["streams"]
+ [
"destination-delete",
"1",
"--delimiter",
",",
"--text",
],
expected_code=ExitCodes.REQUEST_FAILED,
)
assert "Request failed: 404" in result
assert "Destination not found" in result


def test_view_delete_destination(create_destination_akamai_object_storage_type):
result_view = exec_test_command(
BASE_CMDS["streams"]
+ [
"destination-view",
create_destination_akamai_object_storage_type,
"--delimiter",
",",
"--text",
],
)
assert result_view == "test"

exec_test_command(
BASE_CMDS["streams"]
+ [
"delete",
create_destination_akamai_object_storage_type,
"--delimiter",
",",
"--text",
],
)

result = exec_failing_test_command(
BASE_CMDS["streams"]
+ [
"destination-view",
create_destination_akamai_object_storage_type,
"--delimiter",
",",
"--text",
],
expected_code=ExitCodes.REQUEST_FAILED,
)
assert "Request failed: 404" in result
assert "Not found" in result
Loading
Loading