Source code for openapi_client.api.datasources_api

# coding: utf-8

"""
    Amorphic Data Platform

    Amorphic Data Platform - API Definition documentation

    The version of the OpenAPI document: 1.0
    Generated by OpenAPI Generator (https://openapi-generator.tech)

    Do not edit the class manually.
"""  # noqa: E501

import warnings
from pydantic import validate_call, Field, StrictFloat, StrictStr, StrictInt
from typing import Any, Dict, List, Optional, Tuple, Union
from typing_extensions import Annotated

from pydantic import StrictStr
from typing import Optional
from openapi_client.models.asset_comment_input import AssetCommentInput
from openapi_client.models.asset_comments_list import AssetCommentsList
from openapi_client.models.asset_details import AssetDetails
from openapi_client.models.asset_metadata import AssetMetadata
from openapi_client.models.datasource_sync_job_details import DatasourceSyncJobDetails
from openapi_client.models.datasource_sync_jobs import DatasourceSyncJobs
from openapi_client.models.list_asset_response import ListAssetResponse
from openapi_client.models.message_status import MessageStatus

from openapi_client.api_client import ApiClient, RequestSerialized
from openapi_client.api_response import ApiResponse
from openapi_client.rest import RESTResponseType


class DatasourcesApi:
    """NOTE: This class is auto generated by OpenAPI Generator
    Ref: https://openapi-generator.tech

    Do not edit the class manually.
    """

    def __init__(self, api_client=None) -> None:
        if api_client is None:
            api_client = ApiClient.get_default()
        self.api_client = api_client


[docs] @validate_call def add_datasource_asset_comment( self, role_id: StrictStr, datasource_id: StrictStr, asset_id: StrictStr, asset_comment_input: AssetCommentInput, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> MessageStatus: """Add a comment to an asset Add a comment to an asset :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param asset_comment_input: (required) :type asset_comment_input: AssetCommentInput :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._add_datasource_asset_comment_serialize( role_id=role_id, datasource_id=datasource_id, asset_id=asset_id, asset_comment_input=asset_comment_input, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "MessageStatus", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data
@validate_call def add_datasource_asset_comment_with_http_info( self, role_id: StrictStr, datasource_id: StrictStr, asset_id: StrictStr, asset_comment_input: AssetCommentInput, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[MessageStatus]: """Add a comment to an asset Add a comment to an asset :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param asset_comment_input: (required) :type asset_comment_input: AssetCommentInput :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._add_datasource_asset_comment_serialize( role_id=role_id, datasource_id=datasource_id, asset_id=asset_id, asset_comment_input=asset_comment_input, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "MessageStatus", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call def add_datasource_asset_comment_without_preload_content( self, role_id: StrictStr, datasource_id: StrictStr, asset_id: StrictStr, asset_comment_input: AssetCommentInput, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Add a comment to an asset Add a comment to an asset :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param asset_comment_input: (required) :type asset_comment_input: AssetCommentInput :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._add_datasource_asset_comment_serialize( role_id=role_id, datasource_id=datasource_id, asset_id=asset_id, asset_comment_input=asset_comment_input, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "MessageStatus", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _add_datasource_asset_comment_serialize( self, role_id, datasource_id, asset_id, asset_comment_input, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if datasource_id is not None: _path_params['datasource_id'] = datasource_id if asset_id is not None: _path_params['asset_id'] = asset_id # process the query parameters # process the header parameters if role_id is not None: _header_params['role_id'] = role_id # process the form parameters # process the body parameter if asset_comment_input is not None: _body_params = asset_comment_input # set the HTTP header `Accept` if 'Accept' not in _header_params: _header_params['Accept'] = self.api_client.select_header_accept( [ 'application/json' ] ) # set the HTTP header `Content-Type` if _content_type: _header_params['Content-Type'] = _content_type else: _default_content_type = ( self.api_client.select_header_content_type( [ 'application/json' ] ) ) if _default_content_type is not None: _header_params['Content-Type'] = _default_content_type # authentication setting _auth_settings: List[str] = [ 'LambdaAuthorizer' ] return self.api_client.param_serialize( method='POST', resource_path='/datasources/{datasource_id}/assets/{asset_id}/comments', path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth )
[docs] @validate_call def delete_datasource_asset_comment( self, datasource_id: StrictStr, asset_id: StrictStr, comment_id: StrictStr, role_id: StrictStr, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> MessageStatus: """Delete the comment on an asset Deletes a comment on an asset :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param comment_id: (required) :type comment_id: str :param role_id: (required) :type role_id: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._delete_datasource_asset_comment_serialize( datasource_id=datasource_id, asset_id=asset_id, comment_id=comment_id, role_id=role_id, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "MessageStatus", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data
@validate_call def delete_datasource_asset_comment_with_http_info( self, datasource_id: StrictStr, asset_id: StrictStr, comment_id: StrictStr, role_id: StrictStr, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[MessageStatus]: """Delete the comment on an asset Deletes a comment on an asset :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param comment_id: (required) :type comment_id: str :param role_id: (required) :type role_id: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._delete_datasource_asset_comment_serialize( datasource_id=datasource_id, asset_id=asset_id, comment_id=comment_id, role_id=role_id, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "MessageStatus", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call def delete_datasource_asset_comment_without_preload_content( self, datasource_id: StrictStr, asset_id: StrictStr, comment_id: StrictStr, role_id: StrictStr, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Delete the comment on an asset Deletes a comment on an asset :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param comment_id: (required) :type comment_id: str :param role_id: (required) :type role_id: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._delete_datasource_asset_comment_serialize( datasource_id=datasource_id, asset_id=asset_id, comment_id=comment_id, role_id=role_id, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "MessageStatus", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _delete_datasource_asset_comment_serialize( self, datasource_id, asset_id, comment_id, role_id, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if datasource_id is not None: _path_params['datasource_id'] = datasource_id if asset_id is not None: _path_params['asset_id'] = asset_id if comment_id is not None: _path_params['comment_id'] = comment_id # process the query parameters # process the header parameters if role_id is not None: _header_params['role_id'] = role_id # process the form parameters # process the body parameter # set the HTTP header `Accept` if 'Accept' not in _header_params: _header_params['Accept'] = self.api_client.select_header_accept( [ 'application/json' ] ) # authentication setting _auth_settings: List[str] = [ 'LambdaAuthorizer' ] return self.api_client.param_serialize( method='DELETE', resource_path='/datasources/{datasource_id}/assets/{asset_id}/comments/{comment_id}', path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth )
[docs] @validate_call def get_datasource_asset( self, role_id: StrictStr, datasource_id: StrictStr, asset_id: StrictStr, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> AssetDetails: """Get details of an asset in a datasource Returns details of an asset in amorphic datasource :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._get_datasource_asset_serialize( role_id=role_id, datasource_id=datasource_id, asset_id=asset_id, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "AssetDetails", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data
@validate_call def get_datasource_asset_with_http_info( self, role_id: StrictStr, datasource_id: StrictStr, asset_id: StrictStr, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[AssetDetails]: """Get details of an asset in a datasource Returns details of an asset in amorphic datasource :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._get_datasource_asset_serialize( role_id=role_id, datasource_id=datasource_id, asset_id=asset_id, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "AssetDetails", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call def get_datasource_asset_without_preload_content( self, role_id: StrictStr, datasource_id: StrictStr, asset_id: StrictStr, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get details of an asset in a datasource Returns details of an asset in amorphic datasource :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._get_datasource_asset_serialize( role_id=role_id, datasource_id=datasource_id, asset_id=asset_id, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "AssetDetails", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _get_datasource_asset_serialize( self, role_id, datasource_id, asset_id, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if datasource_id is not None: _path_params['datasource_id'] = datasource_id if asset_id is not None: _path_params['asset_id'] = asset_id # process the query parameters # process the header parameters if role_id is not None: _header_params['role_id'] = role_id # process the form parameters # process the body parameter # set the HTTP header `Accept` if 'Accept' not in _header_params: _header_params['Accept'] = self.api_client.select_header_accept( [ 'application/json' ] ) # authentication setting _auth_settings: List[str] = [ 'LambdaAuthorizer' ] return self.api_client.param_serialize( method='GET', resource_path='/datasources/{datasource_id}/assets/{asset_id}', path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth )
[docs] @validate_call def get_datasource_sync_job( self, role_id: StrictStr, datasource_id: StrictStr, job_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, sortby: Optional[StrictStr] = None, projection_expression: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> DatasourceSyncJobDetails: """Get get details of a specific sync job for a datasource Returns the details of a specific sync job triggered for syncing asset details for a datasource :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param job_id: (required) :type job_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param sortby: :type sortby: str :param projection_expression: :type projection_expression: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._get_datasource_sync_job_serialize( role_id=role_id, datasource_id=datasource_id, job_id=job_id, limit=limit, offset=offset, sortorder=sortorder, sortby=sortby, projection_expression=projection_expression, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "DatasourceSyncJobDetails", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data
@validate_call def get_datasource_sync_job_with_http_info( self, role_id: StrictStr, datasource_id: StrictStr, job_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, sortby: Optional[StrictStr] = None, projection_expression: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[DatasourceSyncJobDetails]: """Get get details of a specific sync job for a datasource Returns the details of a specific sync job triggered for syncing asset details for a datasource :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param job_id: (required) :type job_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param sortby: :type sortby: str :param projection_expression: :type projection_expression: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._get_datasource_sync_job_serialize( role_id=role_id, datasource_id=datasource_id, job_id=job_id, limit=limit, offset=offset, sortorder=sortorder, sortby=sortby, projection_expression=projection_expression, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "DatasourceSyncJobDetails", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call def get_datasource_sync_job_without_preload_content( self, role_id: StrictStr, datasource_id: StrictStr, job_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, sortby: Optional[StrictStr] = None, projection_expression: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get get details of a specific sync job for a datasource Returns the details of a specific sync job triggered for syncing asset details for a datasource :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param job_id: (required) :type job_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param sortby: :type sortby: str :param projection_expression: :type projection_expression: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._get_datasource_sync_job_serialize( role_id=role_id, datasource_id=datasource_id, job_id=job_id, limit=limit, offset=offset, sortorder=sortorder, sortby=sortby, projection_expression=projection_expression, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "DatasourceSyncJobDetails", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _get_datasource_sync_job_serialize( self, role_id, datasource_id, job_id, limit, offset, sortorder, sortby, projection_expression, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if datasource_id is not None: _path_params['datasource_id'] = datasource_id if job_id is not None: _path_params['job_id'] = job_id # process the query parameters if limit is not None: _query_params.append(('limit', limit)) if offset is not None: _query_params.append(('offset', offset)) if sortorder is not None: _query_params.append(('sortorder', sortorder)) if sortby is not None: _query_params.append(('sortby', sortby)) if projection_expression is not None: _query_params.append(('projectionExpression', projection_expression)) # process the header parameters if role_id is not None: _header_params['role_id'] = role_id # process the form parameters # process the body parameter # set the HTTP header `Accept` if 'Accept' not in _header_params: _header_params['Accept'] = self.api_client.select_header_accept( [ 'application/json' ] ) # authentication setting _auth_settings: List[str] = [ 'LambdaAuthorizer' ] return self.api_client.param_serialize( method='GET', resource_path='/datasources/{datasource_id}/sync-jobs/{job_id}', path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth )
[docs] @validate_call def list_datasource_asset_comments( self, role_id: StrictStr, datasource_id: StrictStr, asset_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> AssetCommentsList: """Get list of comments for an asset Returns a list of comments for an asset :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._list_datasource_asset_comments_serialize( role_id=role_id, datasource_id=datasource_id, asset_id=asset_id, limit=limit, offset=offset, sortorder=sortorder, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "AssetCommentsList", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data
@validate_call def list_datasource_asset_comments_with_http_info( self, role_id: StrictStr, datasource_id: StrictStr, asset_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[AssetCommentsList]: """Get list of comments for an asset Returns a list of comments for an asset :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._list_datasource_asset_comments_serialize( role_id=role_id, datasource_id=datasource_id, asset_id=asset_id, limit=limit, offset=offset, sortorder=sortorder, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "AssetCommentsList", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call def list_datasource_asset_comments_without_preload_content( self, role_id: StrictStr, datasource_id: StrictStr, asset_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get list of comments for an asset Returns a list of comments for an asset :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._list_datasource_asset_comments_serialize( role_id=role_id, datasource_id=datasource_id, asset_id=asset_id, limit=limit, offset=offset, sortorder=sortorder, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "AssetCommentsList", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _list_datasource_asset_comments_serialize( self, role_id, datasource_id, asset_id, limit, offset, sortorder, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if datasource_id is not None: _path_params['datasource_id'] = datasource_id if asset_id is not None: _path_params['asset_id'] = asset_id # process the query parameters if limit is not None: _query_params.append(('limit', limit)) if offset is not None: _query_params.append(('offset', offset)) if sortorder is not None: _query_params.append(('sortorder', sortorder)) # process the header parameters if role_id is not None: _header_params['role_id'] = role_id # process the form parameters # process the body parameter # set the HTTP header `Accept` if 'Accept' not in _header_params: _header_params['Accept'] = self.api_client.select_header_accept( [ 'application/json' ] ) # authentication setting _auth_settings: List[str] = [ 'LambdaAuthorizer' ] return self.api_client.param_serialize( method='GET', resource_path='/datasources/{datasource_id}/assets/{asset_id}/comments', path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth )
[docs] @validate_call def list_datasource_assets( self, role_id: StrictStr, datasource_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, sortby: Optional[StrictStr] = None, projection_expression: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ListAssetResponse: """List assets in a datasource Returns list of assets in amorphic datasource :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param sortby: :type sortby: str :param projection_expression: :type projection_expression: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._list_datasource_assets_serialize( role_id=role_id, datasource_id=datasource_id, limit=limit, offset=offset, sortorder=sortorder, sortby=sortby, projection_expression=projection_expression, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "ListAssetResponse", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data
@validate_call def list_datasource_assets_with_http_info( self, role_id: StrictStr, datasource_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, sortby: Optional[StrictStr] = None, projection_expression: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[ListAssetResponse]: """List assets in a datasource Returns list of assets in amorphic datasource :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param sortby: :type sortby: str :param projection_expression: :type projection_expression: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._list_datasource_assets_serialize( role_id=role_id, datasource_id=datasource_id, limit=limit, offset=offset, sortorder=sortorder, sortby=sortby, projection_expression=projection_expression, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "ListAssetResponse", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call def list_datasource_assets_without_preload_content( self, role_id: StrictStr, datasource_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, sortby: Optional[StrictStr] = None, projection_expression: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """List assets in a datasource Returns list of assets in amorphic datasource :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param sortby: :type sortby: str :param projection_expression: :type projection_expression: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._list_datasource_assets_serialize( role_id=role_id, datasource_id=datasource_id, limit=limit, offset=offset, sortorder=sortorder, sortby=sortby, projection_expression=projection_expression, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "ListAssetResponse", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _list_datasource_assets_serialize( self, role_id, datasource_id, limit, offset, sortorder, sortby, projection_expression, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if datasource_id is not None: _path_params['datasource_id'] = datasource_id # process the query parameters if limit is not None: _query_params.append(('limit', limit)) if offset is not None: _query_params.append(('offset', offset)) if sortorder is not None: _query_params.append(('sortorder', sortorder)) if sortby is not None: _query_params.append(('sortby', sortby)) if projection_expression is not None: _query_params.append(('projectionExpression', projection_expression)) # process the header parameters if role_id is not None: _header_params['role_id'] = role_id # process the form parameters # process the body parameter # set the HTTP header `Accept` if 'Accept' not in _header_params: _header_params['Accept'] = self.api_client.select_header_accept( [ 'application/json' ] ) # authentication setting _auth_settings: List[str] = [ 'LambdaAuthorizer' ] return self.api_client.param_serialize( method='GET', resource_path='/datasources/{datasource_id}/assets', path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth )
[docs] @validate_call def list_datasource_sync_jobs( self, role_id: StrictStr, datasource_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, sortby: Optional[StrictStr] = None, projection_expression: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> DatasourceSyncJobs: """Get list of amorphic datasource sync jobs triggered for a datasource Returns a list of amorphic datasource sync jobs triggered for syncing asset details for a datasource :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param sortby: :type sortby: str :param projection_expression: :type projection_expression: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._list_datasource_sync_jobs_serialize( role_id=role_id, datasource_id=datasource_id, limit=limit, offset=offset, sortorder=sortorder, sortby=sortby, projection_expression=projection_expression, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "DatasourceSyncJobs", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data
@validate_call def list_datasource_sync_jobs_with_http_info( self, role_id: StrictStr, datasource_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, sortby: Optional[StrictStr] = None, projection_expression: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[DatasourceSyncJobs]: """Get list of amorphic datasource sync jobs triggered for a datasource Returns a list of amorphic datasource sync jobs triggered for syncing asset details for a datasource :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param sortby: :type sortby: str :param projection_expression: :type projection_expression: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._list_datasource_sync_jobs_serialize( role_id=role_id, datasource_id=datasource_id, limit=limit, offset=offset, sortorder=sortorder, sortby=sortby, projection_expression=projection_expression, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "DatasourceSyncJobs", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call def list_datasource_sync_jobs_without_preload_content( self, role_id: StrictStr, datasource_id: StrictStr, limit: Optional[StrictStr] = None, offset: Optional[StrictStr] = None, sortorder: Optional[StrictStr] = None, sortby: Optional[StrictStr] = None, projection_expression: Optional[StrictStr] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get list of amorphic datasource sync jobs triggered for a datasource Returns a list of amorphic datasource sync jobs triggered for syncing asset details for a datasource :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param limit: :type limit: str :param offset: :type offset: str :param sortorder: :type sortorder: str :param sortby: :type sortby: str :param projection_expression: :type projection_expression: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._list_datasource_sync_jobs_serialize( role_id=role_id, datasource_id=datasource_id, limit=limit, offset=offset, sortorder=sortorder, sortby=sortby, projection_expression=projection_expression, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "DatasourceSyncJobs", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _list_datasource_sync_jobs_serialize( self, role_id, datasource_id, limit, offset, sortorder, sortby, projection_expression, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if datasource_id is not None: _path_params['datasource_id'] = datasource_id # process the query parameters if limit is not None: _query_params.append(('limit', limit)) if offset is not None: _query_params.append(('offset', offset)) if sortorder is not None: _query_params.append(('sortorder', sortorder)) if sortby is not None: _query_params.append(('sortby', sortby)) if projection_expression is not None: _query_params.append(('projectionExpression', projection_expression)) # process the header parameters if role_id is not None: _header_params['role_id'] = role_id # process the form parameters # process the body parameter # set the HTTP header `Accept` if 'Accept' not in _header_params: _header_params['Accept'] = self.api_client.select_header_accept( [ 'application/json' ] ) # authentication setting _auth_settings: List[str] = [ 'LambdaAuthorizer' ] return self.api_client.param_serialize( method='GET', resource_path='/datasources/{datasource_id}/sync-jobs', path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth )
[docs] @validate_call def trigger_datasource_sync_job( self, role_id: StrictStr, datasource_id: StrictStr, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> MessageStatus: """Trigger a amorphic datasource sync job Trigger a amorphic sync job for syncing the assets with the source :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._trigger_datasource_sync_job_serialize( role_id=role_id, datasource_id=datasource_id, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "MessageStatus", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data
@validate_call def trigger_datasource_sync_job_with_http_info( self, role_id: StrictStr, datasource_id: StrictStr, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[MessageStatus]: """Trigger a amorphic datasource sync job Trigger a amorphic sync job for syncing the assets with the source :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._trigger_datasource_sync_job_serialize( role_id=role_id, datasource_id=datasource_id, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "MessageStatus", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call def trigger_datasource_sync_job_without_preload_content( self, role_id: StrictStr, datasource_id: StrictStr, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Trigger a amorphic datasource sync job Trigger a amorphic sync job for syncing the assets with the source :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._trigger_datasource_sync_job_serialize( role_id=role_id, datasource_id=datasource_id, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "MessageStatus", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _trigger_datasource_sync_job_serialize( self, role_id, datasource_id, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if datasource_id is not None: _path_params['datasource_id'] = datasource_id # process the query parameters # process the header parameters if role_id is not None: _header_params['role_id'] = role_id # process the form parameters # process the body parameter # set the HTTP header `Accept` if 'Accept' not in _header_params: _header_params['Accept'] = self.api_client.select_header_accept( [ 'application/json' ] ) # authentication setting _auth_settings: List[str] = [ 'LambdaAuthorizer' ] return self.api_client.param_serialize( method='POST', resource_path='/datasources/{datasource_id}/sync-jobs', path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth )
[docs] @validate_call def update_datasource_asset( self, role_id: StrictStr, datasource_id: StrictStr, asset_id: StrictStr, asset_metadata: AssetMetadata, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> MessageStatus: """Update asset metadata Update asset metadata like description, custom metadata, glossary details etc :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param asset_metadata: (required) :type asset_metadata: AssetMetadata :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._update_datasource_asset_serialize( role_id=role_id, datasource_id=datasource_id, asset_id=asset_id, asset_metadata=asset_metadata, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "MessageStatus", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data
@validate_call def update_datasource_asset_with_http_info( self, role_id: StrictStr, datasource_id: StrictStr, asset_id: StrictStr, asset_metadata: AssetMetadata, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[MessageStatus]: """Update asset metadata Update asset metadata like description, custom metadata, glossary details etc :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param asset_metadata: (required) :type asset_metadata: AssetMetadata :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._update_datasource_asset_serialize( role_id=role_id, datasource_id=datasource_id, asset_id=asset_id, asset_metadata=asset_metadata, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "MessageStatus", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call def update_datasource_asset_without_preload_content( self, role_id: StrictStr, datasource_id: StrictStr, asset_id: StrictStr, asset_metadata: AssetMetadata, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ] ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Update asset metadata Update asset metadata like description, custom metadata, glossary details etc :param role_id: (required) :type role_id: str :param datasource_id: (required) :type datasource_id: str :param asset_id: (required) :type asset_id: str :param asset_metadata: (required) :type asset_metadata: AssetMetadata :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._update_datasource_asset_serialize( role_id=role_id, datasource_id=datasource_id, asset_id=asset_id, asset_metadata=asset_metadata, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index ) _response_types_map: Dict[str, Optional[str]] = { '200': "MessageStatus", '400': "Error", '500': "Error", } response_data = self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _update_datasource_asset_serialize( self, role_id, datasource_id, asset_id, asset_metadata, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if datasource_id is not None: _path_params['datasource_id'] = datasource_id if asset_id is not None: _path_params['asset_id'] = asset_id # process the query parameters # process the header parameters if role_id is not None: _header_params['role_id'] = role_id # process the form parameters # process the body parameter if asset_metadata is not None: _body_params = asset_metadata # set the HTTP header `Accept` if 'Accept' not in _header_params: _header_params['Accept'] = self.api_client.select_header_accept( [ 'application/json' ] ) # set the HTTP header `Content-Type` if _content_type: _header_params['Content-Type'] = _content_type else: _default_content_type = ( self.api_client.select_header_content_type( [ 'application/json' ] ) ) if _default_content_type is not None: _header_params['Content-Type'] = _default_content_type # authentication setting _auth_settings: List[str] = [ 'LambdaAuthorizer' ] return self.api_client.param_serialize( method='PUT', resource_path='/datasources/{datasource_id}/assets/{asset_id}', path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth )