feat: add more support for adapter API to Go client (#3204)
diff --git a/streampipes-client-go/streampipes/adapter_api.go b/streampipes-client-go/streampipes/adapter_api.go
new file mode 100644
index 0000000..06fb42d
--- /dev/null
+++ b/streampipes-client-go/streampipes/adapter_api.go
@@ -0,0 +1,175 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package streampipes
+
+import (
+ "github.com/apache/streampipes/streampipes-client-go/streampipes/config"
+ "github.com/apache/streampipes/streampipes-client-go/streampipes/internal/serializer"
+ "github.com/apache/streampipes/streampipes-client-go/streampipes/internal/util"
+ "github.com/apache/streampipes/streampipes-client-go/streampipes/model/adapter"
+ "io"
+ "log"
+ "net/http"
+)
+
+type Adapter struct {
+ endpoint
+}
+
+func NewAdapter(clientConfig config.StreamPipesClientConfig) *Adapter {
+
+ return &Adapter{
+ endpoint{config: clientConfig},
+ }
+}
+
+// GetSingleAdapter get a specific adapter with the given id
+func (a *Adapter) GetSingleAdapter(adapterId string) (adapter.AdapterDescription, error) {
+
+ endPointUrl := util.NewStreamPipesApiPath(a.config.Url, "streampipes-backend/api/v2/connect/master/adapters", []string{adapterId})
+ log.Printf("Get data from: %s", endPointUrl)
+
+ response, err := a.executeRequest("GET", endPointUrl, nil)
+ if err != nil {
+ return adapter.AdapterDescription{}, err
+ }
+
+ if response.StatusCode != http.StatusOK {
+ err = a.handleStatusCode(response)
+ if err != nil {
+ return adapter.AdapterDescription{}, err
+ }
+ }
+
+ body, err := io.ReadAll(response.Body)
+ if err != nil {
+ return adapter.AdapterDescription{}, err
+ }
+
+ unmarshalData, err := serializer.NewAdapterDeserializer().Unmarshal(body)
+ if err != nil {
+ return adapter.AdapterDescription{}, err
+ }
+ adapterDescription := unmarshalData.(adapter.AdapterDescription)
+
+ return adapterDescription, nil
+}
+
+// GetAllAdapter get all adapters of the current user
+func (a *Adapter) GetAllAdapter() ([]adapter.AdapterDescription, error) {
+ endPointUrl := util.NewStreamPipesApiPath(a.config.Url, "streampipes-backend/api/v2/connect/master/adapters", nil)
+
+ response, err := a.executeRequest("GET", endPointUrl, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ if response.StatusCode != http.StatusOK {
+ err = a.handleStatusCode(response)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ body, err := io.ReadAll(response.Body)
+ if err != nil {
+ return nil, err
+ }
+
+ unmarshalData, err := serializer.NewAdaptersDeserializer().Unmarshal(body)
+ if err != nil {
+ return nil, err
+ }
+ adapters := unmarshalData.([]adapter.AdapterDescription)
+
+ return adapters, nil
+}
+
+// DeleteSingleAdapter delete a adapter with a given id
+func (a *Adapter) DeleteSingleAdapter(adapterId string) error {
+
+ endPointUrl := util.NewStreamPipesApiPath(a.config.Url, "streampipes-backend/api/v2/connect/master/adapters", []string{adapterId})
+ log.Printf("Delete data from: %s", endPointUrl)
+
+ response, err := a.executeRequest("DELETE", endPointUrl, nil)
+ if err != nil {
+ return err
+ }
+
+ if response.StatusCode != http.StatusOK {
+ err = a.handleStatusCode(response)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (a *Adapter) CreateAdapter(adapters adapter.AdapterDescription) error {
+ endPointUrl := util.NewStreamPipesApiPath(a.config.Url, "streampipes-backend/api/v2/connect/master/adapters", nil)
+ body, err := serializer.NewAdapterSerializer().Marshal(adapters)
+ if err != nil {
+ return err
+ }
+ response, err := a.executeRequest("PUT", endPointUrl, body)
+ if err != nil {
+ return err
+ }
+
+ if response.StatusCode != http.StatusOK {
+ err = a.handleStatusCode(response)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (a *Adapter) StopSingleAdapter(adapterId string) error {
+ endPointUrl := util.NewStreamPipesApiPath(a.config.Url, "streampipes-backend/api/v2/connect/master/adapters", []string{adapterId, "stop"})
+
+ response, err := a.executeRequest("POST", endPointUrl, nil)
+ if err != nil {
+ return err
+ }
+ if response.StatusCode != http.StatusOK {
+ err = a.handleStatusCode(response)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (a *Adapter) StartSingleAdapter(adapterId string) error {
+ endPointUrl := util.NewStreamPipesApiPath(a.config.Url, "streampipes-backend/api/v2/pipelines", []string{adapterId, "start"})
+
+ response, err := a.executeRequest("POST", endPointUrl, nil)
+ if err != nil {
+ return err
+ }
+ if response.StatusCode != http.StatusOK {
+ err = a.handleStatusCode(response)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/streampipes-client-go/streampipes/internal/serializer/deserializer.go b/streampipes-client-go/streampipes/internal/serializer/deserializer.go
index fd9bb40..9c47fd3 100644
--- a/streampipes-client-go/streampipes/internal/serializer/deserializer.go
+++ b/streampipes-client-go/streampipes/internal/serializer/deserializer.go
@@ -19,6 +19,7 @@
import (
"encoding/json"
+ "github.com/apache/streampipes/streampipes-client-go/streampipes/model/adapter"
"github.com/apache/streampipes/streampipes-client-go/streampipes/model"
"github.com/apache/streampipes/streampipes-client-go/streampipes/model/data_lake"
@@ -36,17 +37,13 @@
Unmarshal(body []byte) (interface{}, error)
}
-var _ Deserializer = (*DataLakeMeasuresDeserializer)(nil)
-var _ Deserializer = (*DataSeriesDeserializer)(nil)
-var _ Deserializer = (*DataLakeMeasureDeserializer)(nil)
-
type DataLakeMeasuresDeserializer struct{}
func NewDataLakeMeasuresDeserializer() *DataLakeMeasuresDeserializer {
return &DataLakeMeasuresDeserializer{}
}
-func (d *DataLakeMeasuresDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (d DataLakeMeasuresDeserializer) Unmarshal(data []byte) (interface{}, error) {
var dataLakeMeasures []data_lake.DataLakeMeasure
err := json.Unmarshal(data, &dataLakeMeasures)
if err != nil {
@@ -61,7 +58,7 @@
return &DataLakeMeasureDeserializer{}
}
-func (d *DataLakeMeasureDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (d DataLakeMeasureDeserializer) Unmarshal(data []byte) (interface{}, error) {
var dataLakeMeasure data_lake.DataLakeMeasure
err := json.Unmarshal(data, &dataLakeMeasure)
if err != nil {
@@ -76,7 +73,7 @@
return &DataSeriesDeserializer{}
}
-func (d *DataSeriesDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (d DataSeriesDeserializer) Unmarshal(data []byte) (interface{}, error) {
var dataSeries data_lake.DataSeries
err := json.Unmarshal(data, &dataSeries)
if err != nil {
@@ -91,7 +88,7 @@
return &StreamPipesVersionDeserializer{}
}
-func (d *StreamPipesVersionDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (d StreamPipesVersionDeserializer) Unmarshal(data []byte) (interface{}, error) {
var dataSeries streampipes_version.Versions
err := json.Unmarshal(data, &dataSeries)
if err != nil {
@@ -106,7 +103,7 @@
return &ResponseMessageDeserializer{}
}
-func (r *ResponseMessageDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (r ResponseMessageDeserializer) Unmarshal(data []byte) (interface{}, error) {
var responseMessage model.ResponseMessage
err := json.Unmarshal(data, &responseMessage)
if err != nil {
@@ -121,7 +118,7 @@
return &PipelineCategoriesDeserializer{}
}
-func (p *PipelineCategoriesDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (p PipelineCategoriesDeserializer) Unmarshal(data []byte) (interface{}, error) {
var pipelineCategory []pipeline.PipelineCategory
err := json.Unmarshal(data, &pipelineCategory)
if err != nil {
@@ -136,7 +133,7 @@
return &DataLakeDashboardDeserializer{}
}
-func (d *DataLakeDashboardDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (d DataLakeDashboardDeserializer) Unmarshal(data []byte) (interface{}, error) {
var dashborad data_lake.Dashboard
err := json.Unmarshal(data, &dashborad)
if err != nil {
@@ -151,7 +148,7 @@
return &DataLakeDashboardsDeserializer{}
}
-func (d *DataLakeDashboardsDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (d DataLakeDashboardsDeserializer) Unmarshal(data []byte) (interface{}, error) {
var dashborad []data_lake.Dashboard
err := json.Unmarshal(data, &dashborad)
if err != nil {
@@ -167,7 +164,7 @@
return &DataLakeWidgetDeserializer{}
}
-func (d *DataLakeWidgetDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (d DataLakeWidgetDeserializer) Unmarshal(data []byte) (interface{}, error) {
var widget data_lake.DataExplorerWidgetModel
err := json.Unmarshal(data, &widget)
if err != nil {
@@ -182,7 +179,7 @@
return &DataLakeWidgetsDeserializer{}
}
-func (d *DataLakeWidgetsDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (d DataLakeWidgetsDeserializer) Unmarshal(data []byte) (interface{}, error) {
var widget []data_lake.DataExplorerWidgetModel
err := json.Unmarshal(data, &widget)
if err != nil {
@@ -197,7 +194,7 @@
return &SpLogEntriesDeserializer{}
}
-func (p *SpLogEntriesDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (p SpLogEntriesDeserializer) Unmarshal(data []byte) (interface{}, error) {
var userAccount []functions.SpLogEntry
err := json.Unmarshal(data, &userAccount)
if err != nil {
@@ -212,7 +209,7 @@
return &SpMetricsEntryDeserializer{}
}
-func (p *SpMetricsEntryDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (p SpMetricsEntryDeserializer) Unmarshal(data []byte) (interface{}, error) {
var spMetricsEntry functions.SpMetricsEntry
err := json.Unmarshal(data, &spMetricsEntry)
if err != nil {
@@ -227,7 +224,7 @@
return &FunctionDefinitionsDeserializer{}
}
-func (p *FunctionDefinitionsDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (p FunctionDefinitionsDeserializer) Unmarshal(data []byte) (interface{}, error) {
var functionDefinitions []functions.FunctionDefinition
err := json.Unmarshal(data, &functionDefinitions)
if err != nil {
@@ -243,7 +240,7 @@
return &ShortUserInfosDeserializer{}
}
-func (s *ShortUserInfosDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (s ShortUserInfosDeserializer) Unmarshal(data []byte) (interface{}, error) {
var shortUserInfo []streampipes_user.ShortUserInfo
err := json.Unmarshal(data, &shortUserInfo)
if err != nil {
@@ -258,7 +255,7 @@
return &UserAccountDeserializer{}
}
-func (p *UserAccountDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (p UserAccountDeserializer) Unmarshal(data []byte) (interface{}, error) {
var userAccount streampipes_user.UserAccount
err := json.Unmarshal(data, &userAccount)
if err != nil {
@@ -274,7 +271,7 @@
return &PipelineDeserializer{}
}
-func (p *PipelineDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (p PipelineDeserializer) Unmarshal(data []byte) (interface{}, error) {
var pipeLine pipeline.Pipeline
err := json.Unmarshal(data, &pipeLine)
if err != nil {
@@ -290,7 +287,7 @@
return &PipelinesDeserializer{}
}
-func (p *PipelinesDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (p PipelinesDeserializer) Unmarshal(data []byte) (interface{}, error) {
var pipelines []pipeline.Pipeline
err := json.Unmarshal(data, &pipelines)
if err != nil {
@@ -306,7 +303,7 @@
return &PipelineStatusMessagesDeserializer{}
}
-func (p *PipelineStatusMessagesDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (p PipelineStatusMessagesDeserializer) Unmarshal(data []byte) (interface{}, error) {
var pipelineStatusMessage []pipeline.PipelineStatusMessage
err := json.Unmarshal(data, &pipelineStatusMessage)
if err != nil {
@@ -321,7 +318,7 @@
return &PipelineOperationStatusDeserializer{}
}
-func (p *PipelineOperationStatusDeserializer) Unmarshal(data []byte) (interface{}, error) {
+func (p PipelineOperationStatusDeserializer) Unmarshal(data []byte) (interface{}, error) {
var pipelineOperationStatus pipeline.PipelineOperationStatus
err := json.Unmarshal(data, &pipelineOperationStatus)
if err != nil {
@@ -329,3 +326,35 @@
}
return pipelineOperationStatus, nil
}
+
+type AdapterDeserializer struct{}
+
+func NewAdapterDeserializer() *AdapterDeserializer {
+ return &AdapterDeserializer{}
+}
+
+func (a AdapterDeserializer) Unmarshal(data []byte) (interface{}, error) {
+ var adapterDescription adapter.AdapterDescription
+ err := json.Unmarshal(data, &adapterDescription)
+ if err != nil {
+ return nil, err
+ }
+ return adapterDescription, nil
+
+}
+
+type AdaptersDeserializer struct{}
+
+func NewAdaptersDeserializer() *AdaptersDeserializer {
+ return &AdaptersDeserializer{}
+}
+
+func (a AdaptersDeserializer) Unmarshal(data []byte) (interface{}, error) {
+ var adapters []adapter.AdapterDescription
+ err := json.Unmarshal(data, &adapters)
+ if err != nil {
+ return nil, err
+ }
+ return adapters, nil
+
+}
diff --git a/streampipes-client-go/streampipes/internal/serializer/serializer.go b/streampipes-client-go/streampipes/internal/serializer/serializer.go
index 24439a1..7d8da41 100644
--- a/streampipes-client-go/streampipes/internal/serializer/serializer.go
+++ b/streampipes-client-go/streampipes/internal/serializer/serializer.go
@@ -19,6 +19,7 @@
import (
"encoding/json"
+ "github.com/apache/streampipes/streampipes-client-go/streampipes/model/adapter"
"github.com/apache/streampipes/streampipes-client-go/streampipes/model/pipeline"
)
@@ -35,3 +36,17 @@
}
return data, nil
}
+
+type AdapterSerializer struct{}
+
+func NewAdapterSerializer() AdapterSerializer {
+ return AdapterSerializer{}
+}
+
+func (a AdapterSerializer) Marshal(adapters adapter.AdapterDescription) ([]byte, error) {
+ data, err := json.Marshal(adapters)
+ if err != nil {
+ return nil, err
+ }
+ return data, nil
+}
diff --git a/streampipes-client-go/streampipes/model/adapter/adapter_description.go b/streampipes-client-go/streampipes/model/adapter/adapter_description.go
new file mode 100644
index 0000000..4b856d3
--- /dev/null
+++ b/streampipes-client-go/streampipes/model/adapter/adapter_description.go
@@ -0,0 +1,51 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package adapter
+
+import "github.com/apache/streampipes/streampipes-client-go/streampipes/model"
+
+type AdapterDescription struct {
+ ElementID string `json:"elementId"`
+ Rev string `json:"rev"`
+ DOM string `json:"dom"`
+ ConnectedTo []string `json:"connectedTo"`
+ Name string `json:"name"`
+ Description string `json:"description"`
+ AppID string `json:"appId"`
+ IncludesAssets bool `json:"includesAssets"`
+ IncludesLocales bool `json:"includesLocales"`
+ IncludedAssets []string `json:"includedAssets"`
+ IncludedLocales []string `json:"includedLocales"`
+ InternallyManaged bool `json:"internallyManaged"`
+ Version int32 `json:"version"`
+ DataStream model.SpDataStream `json:"dataStream"`
+ Running bool `json:"running"`
+ EventGrounding model.EventGrounding `json:"eventGrounding"`
+ Icon string `json:"icon"`
+ Config []model.StaticProperty `json:"config"`
+ Rules []model.TransformationRuleDescription `json:"rules"`
+ Category []string `json:"category"`
+ CreatedAt int64 `json:"createdAt"`
+ SelectedEndpointURL string `json:"selectedEndpointUrl"`
+ DeploymentConfiguration model.ExtensionDeploymentConfiguration `json:"deploymentConfiguration"`
+ CorrespondingDataStreamElementID string `json:"correspondingDataStreamElementId"`
+ EventSchema model.EventSchema `json:"eventSchema"`
+ ValueRules []model.TransformationRuleDescription `json:"valueRules"`
+ StreamRules []model.TransformationRuleDescription `json:"streamRules"`
+ SchemaRules []model.TransformationRuleDescription `json:"schemaRules"`
+}
diff --git a/streampipes-client-go/streampipes/model/common.go b/streampipes-client-go/streampipes/model/common.go
index 7d71666..0cb1d96 100644
--- a/streampipes-client-go/streampipes/model/common.go
+++ b/streampipes-client-go/streampipes/model/common.go
@@ -75,40 +75,15 @@
AdditionalInformation string `json:"additionalInformation"`
}
-type StaticPropertyType string
-
-const (
- AnyStaticProperty StaticPropertyType = "AnyStaticProperty"
- CodeInputStaticProperty StaticPropertyType = "CodeInputStaticProperty"
- CollectionStaticProperty StaticPropertyType = "CollectionStaticProperty"
- ColorPickerStaticProperty StaticPropertyType = "ColorPickerStaticProperty"
- DomainStaticProperty StaticPropertyType = "DomainStaticProperty"
- FreeTextStaticProperty StaticPropertyType = "FreeTextStaticProperty"
- FileStaticProperty StaticPropertyType = "FileStaticProperty"
- MappingPropertyUnary StaticPropertyType = "MappingPropertyUnary"
- MappingPropertyNary StaticPropertyType = "MappingPropertyNary"
- MatchingStaticProperty StaticPropertyType = "MatchingStaticProperty"
- OneOfStaticProperty StaticPropertyType = "OneOfStaticProperty"
- RuntimeResolvableAnyStaticProperty StaticPropertyType = "RuntimeResolvableAnyStaticProperty"
- RuntimeResolvableGroupStaticProperty StaticPropertyType = "RuntimeResolvableGroupStaticProperty"
- RuntimeResolvableOneOfStaticProperty StaticPropertyType = "RuntimeResolvableOneOfStaticProperty"
- RuntimeResolvableTreeInputStaticProperty StaticPropertyType = "RuntimeResolvableTreeInputStaticProperty"
- StaticPropertyGroup StaticPropertyType = "StaticPropertyGroup"
- StaticPropertyAlternatives StaticPropertyType = "StaticPropertyAlternatives"
- StaticPropertyAlternative StaticPropertyType = "StaticPropertyAlternative"
- SecretStaticProperty StaticPropertyType = "SecretStaticProperty"
- SlideToggleStaticProperty StaticPropertyType = "SlideToggleStaticProperty"
-)
-
type StaticProperty struct {
- Optional bool `json:"optional,omitempty"`
- StaticPropertyType StaticPropertyType `json:"staticPropertyType"`
- Index int32 `json:"index"`
- Label string `json:"label"`
- Description string `json:"description"`
- InternalName string `json:"internalName"`
- Predefined bool `json:"predefined"`
- Class string `json:"@class"`
+ Optional bool `json:"optional,omitempty"`
+ StaticPropertyType string `json:"staticPropertyType"`
+ Index int32 `json:"index"`
+ Label string `json:"label"`
+ Description string `json:"description"`
+ InternalName string `json:"internalName"`
+ Predefined bool `json:"predefined"`
+ Class string `json:"@class"`
}
type SpDataStream struct {
@@ -153,3 +128,17 @@
type TransportFormat struct {
RdfType []string `json:"rdfType"`
}
+
+type TransformationRuleDescription struct {
+ RulePriority int32
+}
+
+type SpServiceTag struct {
+ Prefix string `json:"prefix"`
+ Value string `json:"value"`
+}
+
+type ExtensionDeploymentConfiguration struct {
+ DesiredServiceTags []SpServiceTag `json:"desiredServiceTags"`
+ SelectedEndpointUrl string `json:"selectedEndpointUrl"`
+}
diff --git a/streampipes-client-go/streampipes/streampipes_client.go b/streampipes-client-go/streampipes/streampipes_client.go
index b0286ea..e80e0a4 100644
--- a/streampipes-client-go/streampipes/streampipes_client.go
+++ b/streampipes-client-go/streampipes/streampipes_client.go
@@ -78,6 +78,10 @@
return NewPipeline(s.config)
}
+func (s *StreamPipesClient) Adapter() *Adapter {
+ return NewAdapter(s.config)
+}
+
func (s *StreamPipesClient) DataLakeDashboard() *DataLakeDashboard {
return NewDataLakeDashborad(s.config)