👀
Sunbird Obsrv
  • Introduction
    • The Value of Data
    • Data Value Chain
    • Challenges
    • The Solution: Obsrv
  • Core Concepts
    • Obsrv Overview
    • Key Capabilities
    • Datasets
    • Connectors
    • High Level Architecture
    • Tech Stack
    • Monitoring
  • Explore
    • Roadmap
    • Case Studies
      • Agri Climate Advisory
      • Learning Analytics at Population Scale
      • IOT Observations Infra
      • Data Driven Features in Learning Platform
      • Network Observability
      • Fraud Detection
    • Performance Benchmarks
  • Guides
    • Installation
      • AWS Installation Guide
      • Azure Installation Guide
      • GCP Installation Guide
      • OCI Installation Guide
      • Data Center Installation Guide
    • Dataset Management APIs
    • Dataset Management Console
    • Connector APIs
    • Data In & Out APIs
    • Alerts and Notification Channels APIs
    • Developer Guide
    • Example Datasets
    • Connectors Developer Guide
      • SDK Assumptions
      • Required Files
        • metadata.json
        • ui-config.json
        • metrics.yaml
        • alerts.yaml
      • Obsrv Base Setup
      • Dev Requirements
      • Interfaces
        • Stream Interfaces
        • Batch Interfaces
      • Classes
        • ConnectorContext Class
        • ConnectorStats Class
        • ConnectorState Class
        • ErrorData Class
        • MetricData Class
      • Verifying
      • Packaging Guide
      • Reference Implementations
    • Coming Soon!
  • Community
  • Previous Versions
    • SB-5.0 Version
      • Overview
      • USE
        • Release Notes
          • Obsrv 2.0-Beta
          • Obsrv 2.1.0
          • Obsrv 2.2.0
          • Obsrv 2.0.0-GA
          • Obsrv 5.3.0-GA
          • Release V 5.1.0
          • Release V 5.1.2
          • Release V 5.1.3
          • Release V 5.0.0
          • Release V 4.10.0
        • Installation Guide
        • Obsrv 2.0 Installation Guide
          • Getting Started with Obsrv Deployment Using Helm
        • System Requirements
      • LEARN
        • Functional Capabilities
        • Dependencies
        • Product Roadmap
        • Product & Developer Guide
          • Telemetry Service
          • Data Pipeline
          • Data Service
          • Data Product
            • On Demand Druid Exhaust Job
              • Component Diagram
              • ML CSV Reports
              • Folder Struture
          • Report Service
          • Report Configurator
          • Summarisers
      • ENGAGE
        • Discuss
        • Contribute to Obsrv
      • Raise an Issue
  • Release Notes
    • Obsrv 1.1.0 Beta Release
    • Obsrv 1.2.0-RC Release
Powered by GitBook
On this page

Was this helpful?

Edit on GitHub
  1. Guides
  2. Connectors Developer Guide
  3. Interfaces

Batch Interfaces

The SDK's exposes an interface which is to be extended inorder to build a connector

Imports

package org.sunbird.obsrv.connector;

import com.typesafe.config.Config;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.sunbird.obsrv.connector.model.Models.ConnectorContext;
import org.sunbird.obsrv.connector.source.ISourceConnector;

import java.util.Collections;
import java.util.Map;

ISourceConnector

public class ExampleSourceConnector implements ISourceConnector {

    @Override
    public Map<String, String> getSparkConf(Config config) {
        // TODO: Return the SparkConf related to your connector
        return Collections.emptyMap();
    }

    @Override
    public Dataset<Row> process(SparkSession spark, ConnectorContext ctx, Config config, BiConsumer<String, Long> metricFn) {
        // TODO: Add logic to read the data and return a dataframe
        return spark.emptyDataFrame();
    }
}

Reference

Imports

package org.sunbird.obsrv.connector

import com.typesafe.config.Config
import org.apache.spark.sql.functions.{col, max}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.sunbird.obsrv.connector.model.Models.ConnectorContext
import org.sunbird.obsrv.connector.source.{ISourceConnector, SourceConnector}

ISourceConnector

class ExampleSourceConnector extends ISourceConnector {

  override def getSparkConf(config: Config): Map[String, String] = {
    // TODO: Return the SparkConf related to your connector
    Map[String, String]()
  }

  override def process(spark: SparkSession, ctx: ConnectorContext, config: Config, metricFn: (String, Long) => Unit): Dataset[Row] = {
    // TODO: Add logic to read the data and return a dataframe
    spark.emptyDataFrame
  }

Reference

Imports

from obsrv.common import ObsrvException
from obsrv.connector import ConnectorContext, MetricsCollector
from obsrv.connector.batch import ISourceConnector
from obsrv.job.batch import get_base_conf
from obsrv.models import ErrorData, StatusCode
from obsrv.utils import LoggerController
from pyspark.conf import SparkConf
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import lit

ISourceConnector

class ExampleSource(ISourceConnector):
    def process(
        self,
        sc: SparkSession,
        ctx: ConnectorContext,
        connector_config: Dict[Any, Any],
        metrics_collector: MetricsCollector,
    ) -> Iterator[DataFrame]:
        # TODO: return or yield dataframe
        # yield sc.createDataFrame([], schema=None)
        return sc.createDataFrame([], schema=None)
        
    def get_spark_conf(self, connector_config) -> SparkConf:
        conf = get_base_conf()
        # TODO: Extend or Add the SparkConf related to your connector
        return conf

Reference

PreviousStream InterfacesNextClasses

Last updated 5 months ago

Was this helpful?

https://github.com/Sunbird-Obsrv/connector-sdk-scala/
https://github.com/Sunbird-Obsrv/connector-sdk-scala/
https://github.com/Sunbird-Obsrv/obsrv-python-sdk