# coding: utf-8
"""
Amorphic Data Platform
Amorphic Data Platform - API Definition documentation
The version of the OpenAPI document: 1.0
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
import warnings
from pydantic import validate_call, Field, StrictFloat, StrictStr, StrictInt
from typing import Any, Dict, List, Optional, Tuple, Union
from typing_extensions import Annotated
from pydantic import StrictStr
from typing import Optional
from openapi_client.models.asset_comment_input import AssetCommentInput
from openapi_client.models.asset_comments_list import AssetCommentsList
from openapi_client.models.asset_details import AssetDetails
from openapi_client.models.asset_metadata import AssetMetadata
from openapi_client.models.datasource_sync_job_details import DatasourceSyncJobDetails
from openapi_client.models.datasource_sync_jobs import DatasourceSyncJobs
from openapi_client.models.list_asset_response import ListAssetResponse
from openapi_client.models.message_status import MessageStatus
from openapi_client.api_client import ApiClient, RequestSerialized
from openapi_client.api_response import ApiResponse
from openapi_client.rest import RESTResponseType
class DatasourcesApi:
"""NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
"""
def __init__(self, api_client=None) -> None:
if api_client is None:
api_client = ApiClient.get_default()
self.api_client = api_client
@validate_call
def add_datasource_asset_comment_with_http_info(
self,
role_id: StrictStr,
datasource_id: StrictStr,
asset_id: StrictStr,
asset_comment_input: AssetCommentInput,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[MessageStatus]:
"""Add a comment to an asset
Add a comment to an asset
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param asset_id: (required)
:type asset_id: str
:param asset_comment_input: (required)
:type asset_comment_input: AssetCommentInput
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._add_datasource_asset_comment_serialize(
role_id=role_id,
datasource_id=datasource_id,
asset_id=asset_id,
asset_comment_input=asset_comment_input,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "MessageStatus",
'400': "Error",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
def add_datasource_asset_comment_without_preload_content(
self,
role_id: StrictStr,
datasource_id: StrictStr,
asset_id: StrictStr,
asset_comment_input: AssetCommentInput,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Add a comment to an asset
Add a comment to an asset
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param asset_id: (required)
:type asset_id: str
:param asset_comment_input: (required)
:type asset_comment_input: AssetCommentInput
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._add_datasource_asset_comment_serialize(
role_id=role_id,
datasource_id=datasource_id,
asset_id=asset_id,
asset_comment_input=asset_comment_input,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "MessageStatus",
'400': "Error",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
return response_data.response
def _add_datasource_asset_comment_serialize(
self,
role_id,
datasource_id,
asset_id,
asset_comment_input,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if datasource_id is not None:
_path_params['datasource_id'] = datasource_id
if asset_id is not None:
_path_params['asset_id'] = asset_id
# process the query parameters
# process the header parameters
if role_id is not None:
_header_params['role_id'] = role_id
# process the form parameters
# process the body parameter
if asset_comment_input is not None:
_body_params = asset_comment_input
# set the HTTP header `Accept`
if 'Accept' not in _header_params:
_header_params['Accept'] = self.api_client.select_header_accept(
[
'application/json'
]
)
# set the HTTP header `Content-Type`
if _content_type:
_header_params['Content-Type'] = _content_type
else:
_default_content_type = (
self.api_client.select_header_content_type(
[
'application/json'
]
)
)
if _default_content_type is not None:
_header_params['Content-Type'] = _default_content_type
# authentication setting
_auth_settings: List[str] = [
'LambdaAuthorizer'
]
return self.api_client.param_serialize(
method='POST',
resource_path='/datasources/{datasource_id}/assets/{asset_id}/comments',
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth
)
@validate_call
def delete_datasource_asset_comment_with_http_info(
self,
datasource_id: StrictStr,
asset_id: StrictStr,
comment_id: StrictStr,
role_id: StrictStr,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[MessageStatus]:
"""Delete the comment on an asset
Deletes a comment on an asset
:param datasource_id: (required)
:type datasource_id: str
:param asset_id: (required)
:type asset_id: str
:param comment_id: (required)
:type comment_id: str
:param role_id: (required)
:type role_id: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._delete_datasource_asset_comment_serialize(
datasource_id=datasource_id,
asset_id=asset_id,
comment_id=comment_id,
role_id=role_id,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "MessageStatus",
'400': "Error",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
def delete_datasource_asset_comment_without_preload_content(
self,
datasource_id: StrictStr,
asset_id: StrictStr,
comment_id: StrictStr,
role_id: StrictStr,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Delete the comment on an asset
Deletes a comment on an asset
:param datasource_id: (required)
:type datasource_id: str
:param asset_id: (required)
:type asset_id: str
:param comment_id: (required)
:type comment_id: str
:param role_id: (required)
:type role_id: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._delete_datasource_asset_comment_serialize(
datasource_id=datasource_id,
asset_id=asset_id,
comment_id=comment_id,
role_id=role_id,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "MessageStatus",
'400': "Error",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
return response_data.response
def _delete_datasource_asset_comment_serialize(
self,
datasource_id,
asset_id,
comment_id,
role_id,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if datasource_id is not None:
_path_params['datasource_id'] = datasource_id
if asset_id is not None:
_path_params['asset_id'] = asset_id
if comment_id is not None:
_path_params['comment_id'] = comment_id
# process the query parameters
# process the header parameters
if role_id is not None:
_header_params['role_id'] = role_id
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if 'Accept' not in _header_params:
_header_params['Accept'] = self.api_client.select_header_accept(
[
'application/json'
]
)
# authentication setting
_auth_settings: List[str] = [
'LambdaAuthorizer'
]
return self.api_client.param_serialize(
method='DELETE',
resource_path='/datasources/{datasource_id}/assets/{asset_id}/comments/{comment_id}',
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth
)
[docs]
@validate_call
def get_datasource_asset(
self,
role_id: StrictStr,
datasource_id: StrictStr,
asset_id: StrictStr,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> AssetDetails:
"""Get details of an asset in a datasource
Returns details of an asset in amorphic datasource
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param asset_id: (required)
:type asset_id: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._get_datasource_asset_serialize(
role_id=role_id,
datasource_id=datasource_id,
asset_id=asset_id,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "AssetDetails",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
def get_datasource_asset_with_http_info(
self,
role_id: StrictStr,
datasource_id: StrictStr,
asset_id: StrictStr,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[AssetDetails]:
"""Get details of an asset in a datasource
Returns details of an asset in amorphic datasource
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param asset_id: (required)
:type asset_id: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._get_datasource_asset_serialize(
role_id=role_id,
datasource_id=datasource_id,
asset_id=asset_id,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "AssetDetails",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
def get_datasource_asset_without_preload_content(
self,
role_id: StrictStr,
datasource_id: StrictStr,
asset_id: StrictStr,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get details of an asset in a datasource
Returns details of an asset in amorphic datasource
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param asset_id: (required)
:type asset_id: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._get_datasource_asset_serialize(
role_id=role_id,
datasource_id=datasource_id,
asset_id=asset_id,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "AssetDetails",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
return response_data.response
def _get_datasource_asset_serialize(
self,
role_id,
datasource_id,
asset_id,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if datasource_id is not None:
_path_params['datasource_id'] = datasource_id
if asset_id is not None:
_path_params['asset_id'] = asset_id
# process the query parameters
# process the header parameters
if role_id is not None:
_header_params['role_id'] = role_id
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if 'Accept' not in _header_params:
_header_params['Accept'] = self.api_client.select_header_accept(
[
'application/json'
]
)
# authentication setting
_auth_settings: List[str] = [
'LambdaAuthorizer'
]
return self.api_client.param_serialize(
method='GET',
resource_path='/datasources/{datasource_id}/assets/{asset_id}',
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth
)
[docs]
@validate_call
def get_datasource_sync_job(
self,
role_id: StrictStr,
datasource_id: StrictStr,
job_id: StrictStr,
limit: Optional[StrictStr] = None,
offset: Optional[StrictStr] = None,
sortorder: Optional[StrictStr] = None,
sortby: Optional[StrictStr] = None,
projection_expression: Optional[StrictStr] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> DatasourceSyncJobDetails:
"""Get get details of a specific sync job for a datasource
Returns the details of a specific sync job triggered for syncing asset details for a datasource
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param job_id: (required)
:type job_id: str
:param limit:
:type limit: str
:param offset:
:type offset: str
:param sortorder:
:type sortorder: str
:param sortby:
:type sortby: str
:param projection_expression:
:type projection_expression: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._get_datasource_sync_job_serialize(
role_id=role_id,
datasource_id=datasource_id,
job_id=job_id,
limit=limit,
offset=offset,
sortorder=sortorder,
sortby=sortby,
projection_expression=projection_expression,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "DatasourceSyncJobDetails",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
def get_datasource_sync_job_with_http_info(
self,
role_id: StrictStr,
datasource_id: StrictStr,
job_id: StrictStr,
limit: Optional[StrictStr] = None,
offset: Optional[StrictStr] = None,
sortorder: Optional[StrictStr] = None,
sortby: Optional[StrictStr] = None,
projection_expression: Optional[StrictStr] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[DatasourceSyncJobDetails]:
"""Get get details of a specific sync job for a datasource
Returns the details of a specific sync job triggered for syncing asset details for a datasource
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param job_id: (required)
:type job_id: str
:param limit:
:type limit: str
:param offset:
:type offset: str
:param sortorder:
:type sortorder: str
:param sortby:
:type sortby: str
:param projection_expression:
:type projection_expression: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._get_datasource_sync_job_serialize(
role_id=role_id,
datasource_id=datasource_id,
job_id=job_id,
limit=limit,
offset=offset,
sortorder=sortorder,
sortby=sortby,
projection_expression=projection_expression,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "DatasourceSyncJobDetails",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
def get_datasource_sync_job_without_preload_content(
self,
role_id: StrictStr,
datasource_id: StrictStr,
job_id: StrictStr,
limit: Optional[StrictStr] = None,
offset: Optional[StrictStr] = None,
sortorder: Optional[StrictStr] = None,
sortby: Optional[StrictStr] = None,
projection_expression: Optional[StrictStr] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get get details of a specific sync job for a datasource
Returns the details of a specific sync job triggered for syncing asset details for a datasource
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param job_id: (required)
:type job_id: str
:param limit:
:type limit: str
:param offset:
:type offset: str
:param sortorder:
:type sortorder: str
:param sortby:
:type sortby: str
:param projection_expression:
:type projection_expression: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._get_datasource_sync_job_serialize(
role_id=role_id,
datasource_id=datasource_id,
job_id=job_id,
limit=limit,
offset=offset,
sortorder=sortorder,
sortby=sortby,
projection_expression=projection_expression,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "DatasourceSyncJobDetails",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
return response_data.response
def _get_datasource_sync_job_serialize(
self,
role_id,
datasource_id,
job_id,
limit,
offset,
sortorder,
sortby,
projection_expression,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if datasource_id is not None:
_path_params['datasource_id'] = datasource_id
if job_id is not None:
_path_params['job_id'] = job_id
# process the query parameters
if limit is not None:
_query_params.append(('limit', limit))
if offset is not None:
_query_params.append(('offset', offset))
if sortorder is not None:
_query_params.append(('sortorder', sortorder))
if sortby is not None:
_query_params.append(('sortby', sortby))
if projection_expression is not None:
_query_params.append(('projectionExpression', projection_expression))
# process the header parameters
if role_id is not None:
_header_params['role_id'] = role_id
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if 'Accept' not in _header_params:
_header_params['Accept'] = self.api_client.select_header_accept(
[
'application/json'
]
)
# authentication setting
_auth_settings: List[str] = [
'LambdaAuthorizer'
]
return self.api_client.param_serialize(
method='GET',
resource_path='/datasources/{datasource_id}/sync-jobs/{job_id}',
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth
)
@validate_call
def list_datasource_asset_comments_with_http_info(
self,
role_id: StrictStr,
datasource_id: StrictStr,
asset_id: StrictStr,
limit: Optional[StrictStr] = None,
offset: Optional[StrictStr] = None,
sortorder: Optional[StrictStr] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[AssetCommentsList]:
"""Get list of comments for an asset
Returns a list of comments for an asset
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param asset_id: (required)
:type asset_id: str
:param limit:
:type limit: str
:param offset:
:type offset: str
:param sortorder:
:type sortorder: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._list_datasource_asset_comments_serialize(
role_id=role_id,
datasource_id=datasource_id,
asset_id=asset_id,
limit=limit,
offset=offset,
sortorder=sortorder,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "AssetCommentsList",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
def list_datasource_asset_comments_without_preload_content(
self,
role_id: StrictStr,
datasource_id: StrictStr,
asset_id: StrictStr,
limit: Optional[StrictStr] = None,
offset: Optional[StrictStr] = None,
sortorder: Optional[StrictStr] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get list of comments for an asset
Returns a list of comments for an asset
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param asset_id: (required)
:type asset_id: str
:param limit:
:type limit: str
:param offset:
:type offset: str
:param sortorder:
:type sortorder: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._list_datasource_asset_comments_serialize(
role_id=role_id,
datasource_id=datasource_id,
asset_id=asset_id,
limit=limit,
offset=offset,
sortorder=sortorder,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "AssetCommentsList",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
return response_data.response
def _list_datasource_asset_comments_serialize(
self,
role_id,
datasource_id,
asset_id,
limit,
offset,
sortorder,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if datasource_id is not None:
_path_params['datasource_id'] = datasource_id
if asset_id is not None:
_path_params['asset_id'] = asset_id
# process the query parameters
if limit is not None:
_query_params.append(('limit', limit))
if offset is not None:
_query_params.append(('offset', offset))
if sortorder is not None:
_query_params.append(('sortorder', sortorder))
# process the header parameters
if role_id is not None:
_header_params['role_id'] = role_id
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if 'Accept' not in _header_params:
_header_params['Accept'] = self.api_client.select_header_accept(
[
'application/json'
]
)
# authentication setting
_auth_settings: List[str] = [
'LambdaAuthorizer'
]
return self.api_client.param_serialize(
method='GET',
resource_path='/datasources/{datasource_id}/assets/{asset_id}/comments',
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth
)
[docs]
@validate_call
def list_datasource_assets(
self,
role_id: StrictStr,
datasource_id: StrictStr,
limit: Optional[StrictStr] = None,
offset: Optional[StrictStr] = None,
sortorder: Optional[StrictStr] = None,
sortby: Optional[StrictStr] = None,
projection_expression: Optional[StrictStr] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ListAssetResponse:
"""List assets in a datasource
Returns list of assets in amorphic datasource
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param limit:
:type limit: str
:param offset:
:type offset: str
:param sortorder:
:type sortorder: str
:param sortby:
:type sortby: str
:param projection_expression:
:type projection_expression: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._list_datasource_assets_serialize(
role_id=role_id,
datasource_id=datasource_id,
limit=limit,
offset=offset,
sortorder=sortorder,
sortby=sortby,
projection_expression=projection_expression,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "ListAssetResponse",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
def list_datasource_assets_with_http_info(
self,
role_id: StrictStr,
datasource_id: StrictStr,
limit: Optional[StrictStr] = None,
offset: Optional[StrictStr] = None,
sortorder: Optional[StrictStr] = None,
sortby: Optional[StrictStr] = None,
projection_expression: Optional[StrictStr] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[ListAssetResponse]:
"""List assets in a datasource
Returns list of assets in amorphic datasource
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param limit:
:type limit: str
:param offset:
:type offset: str
:param sortorder:
:type sortorder: str
:param sortby:
:type sortby: str
:param projection_expression:
:type projection_expression: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._list_datasource_assets_serialize(
role_id=role_id,
datasource_id=datasource_id,
limit=limit,
offset=offset,
sortorder=sortorder,
sortby=sortby,
projection_expression=projection_expression,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "ListAssetResponse",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
def list_datasource_assets_without_preload_content(
self,
role_id: StrictStr,
datasource_id: StrictStr,
limit: Optional[StrictStr] = None,
offset: Optional[StrictStr] = None,
sortorder: Optional[StrictStr] = None,
sortby: Optional[StrictStr] = None,
projection_expression: Optional[StrictStr] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""List assets in a datasource
Returns list of assets in amorphic datasource
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param limit:
:type limit: str
:param offset:
:type offset: str
:param sortorder:
:type sortorder: str
:param sortby:
:type sortby: str
:param projection_expression:
:type projection_expression: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._list_datasource_assets_serialize(
role_id=role_id,
datasource_id=datasource_id,
limit=limit,
offset=offset,
sortorder=sortorder,
sortby=sortby,
projection_expression=projection_expression,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "ListAssetResponse",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
return response_data.response
def _list_datasource_assets_serialize(
self,
role_id,
datasource_id,
limit,
offset,
sortorder,
sortby,
projection_expression,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if datasource_id is not None:
_path_params['datasource_id'] = datasource_id
# process the query parameters
if limit is not None:
_query_params.append(('limit', limit))
if offset is not None:
_query_params.append(('offset', offset))
if sortorder is not None:
_query_params.append(('sortorder', sortorder))
if sortby is not None:
_query_params.append(('sortby', sortby))
if projection_expression is not None:
_query_params.append(('projectionExpression', projection_expression))
# process the header parameters
if role_id is not None:
_header_params['role_id'] = role_id
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if 'Accept' not in _header_params:
_header_params['Accept'] = self.api_client.select_header_accept(
[
'application/json'
]
)
# authentication setting
_auth_settings: List[str] = [
'LambdaAuthorizer'
]
return self.api_client.param_serialize(
method='GET',
resource_path='/datasources/{datasource_id}/assets',
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth
)
[docs]
@validate_call
def list_datasource_sync_jobs(
self,
role_id: StrictStr,
datasource_id: StrictStr,
limit: Optional[StrictStr] = None,
offset: Optional[StrictStr] = None,
sortorder: Optional[StrictStr] = None,
sortby: Optional[StrictStr] = None,
projection_expression: Optional[StrictStr] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> DatasourceSyncJobs:
"""Get list of amorphic datasource sync jobs triggered for a datasource
Returns a list of amorphic datasource sync jobs triggered for syncing asset details for a datasource
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param limit:
:type limit: str
:param offset:
:type offset: str
:param sortorder:
:type sortorder: str
:param sortby:
:type sortby: str
:param projection_expression:
:type projection_expression: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._list_datasource_sync_jobs_serialize(
role_id=role_id,
datasource_id=datasource_id,
limit=limit,
offset=offset,
sortorder=sortorder,
sortby=sortby,
projection_expression=projection_expression,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "DatasourceSyncJobs",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
def list_datasource_sync_jobs_with_http_info(
self,
role_id: StrictStr,
datasource_id: StrictStr,
limit: Optional[StrictStr] = None,
offset: Optional[StrictStr] = None,
sortorder: Optional[StrictStr] = None,
sortby: Optional[StrictStr] = None,
projection_expression: Optional[StrictStr] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[DatasourceSyncJobs]:
"""Get list of amorphic datasource sync jobs triggered for a datasource
Returns a list of amorphic datasource sync jobs triggered for syncing asset details for a datasource
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param limit:
:type limit: str
:param offset:
:type offset: str
:param sortorder:
:type sortorder: str
:param sortby:
:type sortby: str
:param projection_expression:
:type projection_expression: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._list_datasource_sync_jobs_serialize(
role_id=role_id,
datasource_id=datasource_id,
limit=limit,
offset=offset,
sortorder=sortorder,
sortby=sortby,
projection_expression=projection_expression,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "DatasourceSyncJobs",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
def list_datasource_sync_jobs_without_preload_content(
self,
role_id: StrictStr,
datasource_id: StrictStr,
limit: Optional[StrictStr] = None,
offset: Optional[StrictStr] = None,
sortorder: Optional[StrictStr] = None,
sortby: Optional[StrictStr] = None,
projection_expression: Optional[StrictStr] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get list of amorphic datasource sync jobs triggered for a datasource
Returns a list of amorphic datasource sync jobs triggered for syncing asset details for a datasource
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param limit:
:type limit: str
:param offset:
:type offset: str
:param sortorder:
:type sortorder: str
:param sortby:
:type sortby: str
:param projection_expression:
:type projection_expression: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._list_datasource_sync_jobs_serialize(
role_id=role_id,
datasource_id=datasource_id,
limit=limit,
offset=offset,
sortorder=sortorder,
sortby=sortby,
projection_expression=projection_expression,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "DatasourceSyncJobs",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
return response_data.response
def _list_datasource_sync_jobs_serialize(
self,
role_id,
datasource_id,
limit,
offset,
sortorder,
sortby,
projection_expression,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if datasource_id is not None:
_path_params['datasource_id'] = datasource_id
# process the query parameters
if limit is not None:
_query_params.append(('limit', limit))
if offset is not None:
_query_params.append(('offset', offset))
if sortorder is not None:
_query_params.append(('sortorder', sortorder))
if sortby is not None:
_query_params.append(('sortby', sortby))
if projection_expression is not None:
_query_params.append(('projectionExpression', projection_expression))
# process the header parameters
if role_id is not None:
_header_params['role_id'] = role_id
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if 'Accept' not in _header_params:
_header_params['Accept'] = self.api_client.select_header_accept(
[
'application/json'
]
)
# authentication setting
_auth_settings: List[str] = [
'LambdaAuthorizer'
]
return self.api_client.param_serialize(
method='GET',
resource_path='/datasources/{datasource_id}/sync-jobs',
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth
)
[docs]
@validate_call
def trigger_datasource_sync_job(
self,
role_id: StrictStr,
datasource_id: StrictStr,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> MessageStatus:
"""Trigger a amorphic datasource sync job
Trigger a amorphic sync job for syncing the assets with the source
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._trigger_datasource_sync_job_serialize(
role_id=role_id,
datasource_id=datasource_id,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "MessageStatus",
'400': "Error",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
def trigger_datasource_sync_job_with_http_info(
self,
role_id: StrictStr,
datasource_id: StrictStr,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[MessageStatus]:
"""Trigger a amorphic datasource sync job
Trigger a amorphic sync job for syncing the assets with the source
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._trigger_datasource_sync_job_serialize(
role_id=role_id,
datasource_id=datasource_id,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "MessageStatus",
'400': "Error",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
def trigger_datasource_sync_job_without_preload_content(
self,
role_id: StrictStr,
datasource_id: StrictStr,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Trigger a amorphic datasource sync job
Trigger a amorphic sync job for syncing the assets with the source
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._trigger_datasource_sync_job_serialize(
role_id=role_id,
datasource_id=datasource_id,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "MessageStatus",
'400': "Error",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
return response_data.response
def _trigger_datasource_sync_job_serialize(
self,
role_id,
datasource_id,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if datasource_id is not None:
_path_params['datasource_id'] = datasource_id
# process the query parameters
# process the header parameters
if role_id is not None:
_header_params['role_id'] = role_id
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if 'Accept' not in _header_params:
_header_params['Accept'] = self.api_client.select_header_accept(
[
'application/json'
]
)
# authentication setting
_auth_settings: List[str] = [
'LambdaAuthorizer'
]
return self.api_client.param_serialize(
method='POST',
resource_path='/datasources/{datasource_id}/sync-jobs',
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth
)
[docs]
@validate_call
def update_datasource_asset(
self,
role_id: StrictStr,
datasource_id: StrictStr,
asset_id: StrictStr,
asset_metadata: AssetMetadata,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> MessageStatus:
"""Update asset metadata
Update asset metadata like description, custom metadata, glossary details etc
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param asset_id: (required)
:type asset_id: str
:param asset_metadata: (required)
:type asset_metadata: AssetMetadata
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._update_datasource_asset_serialize(
role_id=role_id,
datasource_id=datasource_id,
asset_id=asset_id,
asset_metadata=asset_metadata,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "MessageStatus",
'400': "Error",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
def update_datasource_asset_with_http_info(
self,
role_id: StrictStr,
datasource_id: StrictStr,
asset_id: StrictStr,
asset_metadata: AssetMetadata,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[MessageStatus]:
"""Update asset metadata
Update asset metadata like description, custom metadata, glossary details etc
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param asset_id: (required)
:type asset_id: str
:param asset_metadata: (required)
:type asset_metadata: AssetMetadata
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._update_datasource_asset_serialize(
role_id=role_id,
datasource_id=datasource_id,
asset_id=asset_id,
asset_metadata=asset_metadata,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "MessageStatus",
'400': "Error",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
def update_datasource_asset_without_preload_content(
self,
role_id: StrictStr,
datasource_id: StrictStr,
asset_id: StrictStr,
asset_metadata: AssetMetadata,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Update asset metadata
Update asset metadata like description, custom metadata, glossary details etc
:param role_id: (required)
:type role_id: str
:param datasource_id: (required)
:type datasource_id: str
:param asset_id: (required)
:type asset_id: str
:param asset_metadata: (required)
:type asset_metadata: AssetMetadata
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._update_datasource_asset_serialize(
role_id=role_id,
datasource_id=datasource_id,
asset_id=asset_id,
asset_metadata=asset_metadata,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "MessageStatus",
'400': "Error",
'500': "Error",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
return response_data.response
def _update_datasource_asset_serialize(
self,
role_id,
datasource_id,
asset_id,
asset_metadata,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if datasource_id is not None:
_path_params['datasource_id'] = datasource_id
if asset_id is not None:
_path_params['asset_id'] = asset_id
# process the query parameters
# process the header parameters
if role_id is not None:
_header_params['role_id'] = role_id
# process the form parameters
# process the body parameter
if asset_metadata is not None:
_body_params = asset_metadata
# set the HTTP header `Accept`
if 'Accept' not in _header_params:
_header_params['Accept'] = self.api_client.select_header_accept(
[
'application/json'
]
)
# set the HTTP header `Content-Type`
if _content_type:
_header_params['Content-Type'] = _content_type
else:
_default_content_type = (
self.api_client.select_header_content_type(
[
'application/json'
]
)
)
if _default_content_type is not None:
_header_params['Content-Type'] = _default_content_type
# authentication setting
_auth_settings: List[str] = [
'LambdaAuthorizer'
]
return self.api_client.param_serialize(
method='PUT',
resource_path='/datasources/{datasource_id}/assets/{asset_id}',
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth
)