Source code for ni.measurements.metadata.v1.client._client

"""Client for accessing the NI Metadata Store Service."""

from __future__ import annotations

import grpc
import ni.measurements.metadata.v1.metadata_store_service_pb2 as metadata_store_service_pb2
import ni.measurements.metadata.v1.metadata_store_service_pb2_grpc as metadata_store_service_pb2_grpc
from ni.measurementlink.discovery.v1.client import DiscoveryClient
from ni_grpc_extensions.channelpool import GrpcChannelPool

from ni.measurements.metadata.v1.client._client_base import GrpcServiceClientBase


[docs] class MetadataStoreClient( GrpcServiceClientBase[metadata_store_service_pb2_grpc.MetadataStoreServiceStub] ): """Client for accessing the NI Metadata Store Service.""" __slots__ = () def __init__( self, *, discovery_client: DiscoveryClient | None = None, grpc_channel: grpc.Channel | None = None, grpc_channel_pool: GrpcChannelPool | None = None, ) -> None: """Initialize the Metadata Store Client. Args: discovery_client: An optional discovery client (recommended). grpc_channel: An optional metadata store gRPC channel. grpc_channel_pool: An optional gRPC channel pool (recommended). """ super().__init__( discovery_client=discovery_client, grpc_channel=grpc_channel, grpc_channel_pool=grpc_channel_pool, service_interface_name="ni.measurements.metadata.v1.MetadataStoreService", service_class="", stub_class=metadata_store_service_pb2_grpc.MetadataStoreServiceStub, )
[docs] def get_uut_instance( self, request: metadata_store_service_pb2.GetUutInstanceRequest ) -> metadata_store_service_pb2.GetUutInstanceResponse: """Gets the UUT instance associated with the identifier given in the request.""" return self._get_stub().GetUutInstance(request)
[docs] def query_uut_instances( self, request: metadata_store_service_pb2.QueryUutInstancesRequest ) -> metadata_store_service_pb2.QueryUutInstancesResponse: """Perform an OData query on UUT instances.""" return self._get_stub().QueryUutInstances(request)
[docs] def create_uut_instance( self, request: metadata_store_service_pb2.CreateUutInstanceRequest ) -> metadata_store_service_pb2.CreateUutInstanceResponse: """Creates a new UUT instance in the metadata store.""" return self._get_stub().CreateUutInstance(request)
[docs] def get_uut( self, request: metadata_store_service_pb2.GetUutRequest ) -> metadata_store_service_pb2.GetUutResponse: """Gets the UUT associated with the identifier given in the request.""" return self._get_stub().GetUut(request)
[docs] def query_uuts( self, request: metadata_store_service_pb2.QueryUutsRequest ) -> metadata_store_service_pb2.QueryUutsResponse: """Perform an OData query on UUTs.""" return self._get_stub().QueryUuts(request)
[docs] def create_uut( self, request: metadata_store_service_pb2.CreateUutRequest ) -> metadata_store_service_pb2.CreateUutResponse: """Creates a new UUT in the metadata store.""" return self._get_stub().CreateUut(request)
[docs] def get_operator( self, request: metadata_store_service_pb2.GetOperatorRequest ) -> metadata_store_service_pb2.GetOperatorResponse: """Gets the operator associated with the identifier given in the request.""" return self._get_stub().GetOperator(request)
[docs] def query_operators( self, request: metadata_store_service_pb2.QueryOperatorsRequest ) -> metadata_store_service_pb2.QueryOperatorsResponse: """Perform an OData query on operators.""" return self._get_stub().QueryOperators(request)
[docs] def create_operator( self, request: metadata_store_service_pb2.CreateOperatorRequest ) -> metadata_store_service_pb2.CreateOperatorResponse: """Creates a new operator in the metadata store.""" return self._get_stub().CreateOperator(request)
[docs] def get_test_description( self, request: metadata_store_service_pb2.GetTestDescriptionRequest ) -> metadata_store_service_pb2.GetTestDescriptionResponse: """Gets the test description associated with the identifier given in the request.""" return self._get_stub().GetTestDescription(request)
[docs] def query_test_descriptions( self, request: metadata_store_service_pb2.QueryTestDescriptionsRequest ) -> metadata_store_service_pb2.QueryTestDescriptionsResponse: """Perform an OData query on test descriptions.""" return self._get_stub().QueryTestDescriptions(request)
[docs] def create_test_description( self, request: metadata_store_service_pb2.CreateTestDescriptionRequest ) -> metadata_store_service_pb2.CreateTestDescriptionResponse: """Creates a new test description in the metadata store.""" return self._get_stub().CreateTestDescription(request)
[docs] def get_test( self, request: metadata_store_service_pb2.GetTestRequest ) -> metadata_store_service_pb2.GetTestResponse: """Gets the test associated with the identifier given in the request.""" return self._get_stub().GetTest(request)
[docs] def query_tests( self, request: metadata_store_service_pb2.QueryTestsRequest ) -> metadata_store_service_pb2.QueryTestsResponse: """Perform an OData query on tests.""" return self._get_stub().QueryTests(request)
[docs] def create_test( self, request: metadata_store_service_pb2.CreateTestRequest ) -> metadata_store_service_pb2.CreateTestResponse: """Creates a new test in the metadata store.""" return self._get_stub().CreateTest(request)
[docs] def get_test_station( self, request: metadata_store_service_pb2.GetTestStationRequest ) -> metadata_store_service_pb2.GetTestStationResponse: """Gets the test station associated with the identifier given in the request.""" return self._get_stub().GetTestStation(request)
[docs] def query_test_stations( self, request: metadata_store_service_pb2.QueryTestStationsRequest ) -> metadata_store_service_pb2.QueryTestStationsResponse: """Perform an OData query on test stations.""" return self._get_stub().QueryTestStations(request)
[docs] def create_test_station( self, request: metadata_store_service_pb2.CreateTestStationRequest ) -> metadata_store_service_pb2.CreateTestStationResponse: """Creates a new test station in the metadata store.""" return self._get_stub().CreateTestStation(request)
[docs] def get_hardware_item( self, request: metadata_store_service_pb2.GetHardwareItemRequest ) -> metadata_store_service_pb2.GetHardwareItemResponse: """Gets the hardware item associated with the identifier given in the request.""" return self._get_stub().GetHardwareItem(request)
[docs] def query_hardware_items( self, request: metadata_store_service_pb2.QueryHardwareItemsRequest ) -> metadata_store_service_pb2.QueryHardwareItemsResponse: """Perform an OData query on hardware items.""" return self._get_stub().QueryHardwareItems(request)
[docs] def create_hardware_item( self, request: metadata_store_service_pb2.CreateHardwareItemRequest ) -> metadata_store_service_pb2.CreateHardwareItemResponse: """Creates a new hardware item in the metadata store.""" return self._get_stub().CreateHardwareItem(request)
[docs] def get_software_item( self, request: metadata_store_service_pb2.GetSoftwareItemRequest ) -> metadata_store_service_pb2.GetSoftwareItemResponse: """Gets the software item associated with the identifier given in the request.""" return self._get_stub().GetSoftwareItem(request)
[docs] def query_software_items( self, request: metadata_store_service_pb2.QuerySoftwareItemsRequest ) -> metadata_store_service_pb2.QuerySoftwareItemsResponse: """Perform an OData query on software items.""" return self._get_stub().QuerySoftwareItems(request)
[docs] def create_software_item( self, request: metadata_store_service_pb2.CreateSoftwareItemRequest ) -> metadata_store_service_pb2.CreateSoftwareItemResponse: """Creates a new software item in the metadata store.""" return self._get_stub().CreateSoftwareItem(request)
[docs] def get_test_adapter( self, request: metadata_store_service_pb2.GetTestAdapterRequest ) -> metadata_store_service_pb2.GetTestAdapterResponse: """Gets the test adapter associated with the identifier given in the request.""" return self._get_stub().GetTestAdapter(request)
[docs] def query_test_adapters( self, request: metadata_store_service_pb2.QueryTestAdaptersRequest ) -> metadata_store_service_pb2.QueryTestAdaptersResponse: """Perform an OData query on test adapters.""" return self._get_stub().QueryTestAdapters(request)
[docs] def create_test_adapter( self, request: metadata_store_service_pb2.CreateTestAdapterRequest ) -> metadata_store_service_pb2.CreateTestAdapterResponse: """Creates a new test adapter in the metadata store.""" return self._get_stub().CreateTestAdapter(request)
[docs] def register_schema( self, request: metadata_store_service_pb2.RegisterSchemaRequest ) -> metadata_store_service_pb2.RegisterSchemaResponse: """Registers a schema.""" return self._get_stub().RegisterSchema(request)
[docs] def list_schemas( self, request: metadata_store_service_pb2.ListSchemasRequest ) -> metadata_store_service_pb2.ListSchemasResponse: """List the schemas that have been previously registered.""" return self._get_stub().ListSchemas(request)
[docs] def get_alias( self, request: metadata_store_service_pb2.GetAliasRequest ) -> metadata_store_service_pb2.GetAliasResponse: """Gets the target of a given alias.""" return self._get_stub().GetAlias(request)
[docs] def query_aliases( self, request: metadata_store_service_pb2.QueryAliasesRequest ) -> metadata_store_service_pb2.QueryAliasesResponse: """Perform an OData query on the created aliases.""" return self._get_stub().QueryAliases(request)
[docs] def create_alias( self, request: metadata_store_service_pb2.CreateAliasRequest ) -> metadata_store_service_pb2.CreateAliasResponse: """Creates an alias of the specified metadata. This alias can be used when creating other metadata or publishing. """ return self._get_stub().CreateAlias(request)
[docs] def delete_alias( self, request: metadata_store_service_pb2.DeleteAliasRequest ) -> metadata_store_service_pb2.DeleteAliasResponse: """Deletes a created alias.""" return self._get_stub().DeleteAlias(request)
[docs] def create_from_json_document( self, request: metadata_store_service_pb2.CreateFromJsonDocumentRequest ) -> metadata_store_service_pb2.CreateFromJsonDocumentResponse: """Creates metadata from a JSON document.""" return self._get_stub().CreateFromJsonDocument(request)