👀
Sunbird Obsrv
  • Introduction
    • The Value of Data
    • Data Value Chain
    • Challenges
    • The Solution: Obsrv
  • Core Concepts
    • Obsrv Overview
    • Key Capabilities
    • Datasets
    • Connectors
    • High Level Architecture
    • Tech Stack
    • Monitoring
  • Explore
    • Roadmap
    • Case Studies
      • Agri Climate Advisory
      • Learning Analytics at Population Scale
      • IOT Observations Infra
      • Data Driven Features in Learning Platform
      • Network Observability
      • Fraud Detection
    • Performance Benchmarks
  • Guides
    • Installation
      • AWS Installation Guide
      • Azure Installation Guide
      • GCP Installation Guide
      • OCI Installation Guide
      • Data Center Installation Guide
    • Dataset Management APIs
    • Dataset Management Console
    • Connector APIs
    • Data In & Out APIs
    • Alerts and Notification Channels APIs
    • Developer Guide
    • Example Datasets
    • Connectors Developer Guide
      • SDK Assumptions
      • Required Files
        • metadata.json
        • ui-config.json
        • metrics.yaml
        • alerts.yaml
      • Obsrv Base Setup
      • Dev Requirements
      • Interfaces
        • Stream Interfaces
        • Batch Interfaces
      • Classes
        • ConnectorContext Class
        • ConnectorStats Class
        • ConnectorState Class
        • ErrorData Class
        • MetricData Class
      • Verifying
      • Packaging Guide
      • Reference Implementations
    • Coming Soon!
  • Community
  • Previous Versions
    • SB-5.0 Version
      • Overview
      • USE
        • Release Notes
          • Obsrv 2.0-Beta
          • Obsrv 2.1.0
          • Obsrv 2.2.0
          • Obsrv 2.0.0-GA
          • Obsrv 5.3.0-GA
          • Release V 5.1.0
          • Release V 5.1.2
          • Release V 5.1.3
          • Release V 5.0.0
          • Release V 4.10.0
        • Installation Guide
        • Obsrv 2.0 Installation Guide
          • Getting Started with Obsrv Deployment Using Helm
        • System Requirements
      • LEARN
        • Functional Capabilities
        • Dependencies
        • Product Roadmap
        • Product & Developer Guide
          • Telemetry Service
          • Data Pipeline
          • Data Service
          • Data Product
            • On Demand Druid Exhaust Job
              • Component Diagram
              • ML CSV Reports
              • Folder Struture
          • Report Service
          • Report Configurator
          • Summarisers
      • ENGAGE
        • Discuss
        • Contribute to Obsrv
      • Raise an Issue
  • Release Notes
    • Obsrv 1.1.0 Beta Release
    • Obsrv 1.2.0-RC Release
Powered by GitBook
On this page

Was this helpful?

Edit on GitHub
  1. Guides
  2. Connectors Developer Guide
  3. Interfaces

Stream Interfaces

The SDK's expose an interface that has to be extended inorder to build a connector. A sample is shown below

Imports

package org.sunbird.obsrv.connector;

import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.sunbird.obsrv.connector.model.Models;
import org.sunbird.obsrv.connector.source.IConnectorSource;
import org.sunbird.obsrv.connector.source.SourceConnectorFunction;
import org.sunbird.obsrv.job.exception.UnsupportedDataFormatException;

import java.util.List;

SourceConnectorFunction

public class ExampleSourceFunction extends SourceConnectorFunction {
    public ExampleSourceFunction(List<Models.ConnectorContext> connectorContexts) {
        super(connectorContexts);
    }

    @Override
    public void processEvent(
        String event, 
        Function1<String, BoxedUnit> onSuccess, 
        Function2<String, org.sunbird.obsrv.job.model.Models.ErrorData, BoxedUnit> onFailure, 
        Function2<String, Object, BoxedUnit> incMetric
    ){
        // TODO: Implement this method to process the event
        // Call onSuccess.apply(event) if the event is processed successfully
        // Call onFailure.apply(event, errorData) if the event processing fails
        // Call incMetric.apply(event, metricData) to increment the metric
    }

    @Override
    public List<String> getMetrics() {
        // TODO: Return the list of metrics
        return List.empty();
    }
}

IConnectorSource Class

public class ExampleSourceConnector extends IConnectorSource {
    @Override
    public SingleOutputStreamOperator<String> getSourceStream(
        StreamExecutionEnvironment env, Config config
    ) throws UnsupportedDataFormatException {
        // TODO: Implement this method to return the source stream
        // env.fromSource(...)
    }

    @Override
    public SourceConnectorFunction getSourceFunction(
        List<Models.ConnectorContext> contexts, Config config) 
    {
        return ExampleSourceFunction(contexts)
    }
}

Imports

package org.sunbird.obsrv.connector

import com.typesafe.config.Config
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.json.{JSONException, JSONObject}
import org.sunbird.obsrv.connector.model.Models
import org.sunbird.obsrv.connector.source.{IConnectorSource, SourceConnector, SourceConnectorFunction}
import org.sunbird.obsrv.job.exception.UnsupportedDataFormatException
import org.sunbird.obsrv.job.model.Models.ErrorData

SourceConnectorFunction


class ExampleSourceConnectorFunction(connectorContexts: List[ConnectorContext]) extends SourceConnectorFunction(connectorContexts) {

  /**
   * This method processes the incoming event.
   *
   * @param event The event to be processed.
   * @param onSuccess Callback function to be called on successful processing of the event.
   * @param onFailure Callback function to be called on failure in processing the event.
   * @param incMetric Function to increment the metric counter.
   */
  override def processEvent(event: String, 
                            onSuccess: String => Unit, 
                            onFailure: (String, ErrorData) => Unit, 
                            incMetric: (String, Long) => Unit): Unit = {
    // Implement your event processing logic here.
  }
  
  // TODO: Returns a list of custom metrics if any
  override def getMetrics(): List[String] = List[String]()
}

IConnectorSource

class ExampleConnectorSource extends IConnectorSource {

  @throws[UnsupportedDataFormatException]
  override def getSourceStream(env: StreamExecutionEnvironment, config: Config): SingleOutputStreamOperator[String] = {
    // Implement the logic to create and return the source stream
    // Example:
    // env.fromElements("event1", "event2", "event3")
  }

  override def getSourceFunction(contexts: List[ConnectorContext], config: Config): SourceConnectorFunction = {
    new ExampleSourceConnectorFunction(contexts)
  }
}

PreviousInterfacesNextBatch Interfaces

Last updated 5 months ago

Was this helpful?

Ref:

Ref:

Ref:

Ref:

https://github.com/Sunbird-Obsrv/connector-sdk-scala/
https://github.com/Sunbird-Obsrv/connector-sdk-scala/
https://github.com/Sunbird-Obsrv/connector-sdk-scala/
https://github.com/Sunbird-Obsrv/connector-sdk-scala/