Stream Interfaces

The SDK's expose an interface that has to be extended inorder to build a connector. A sample is shown below

Imports

package org.sunbird.obsrv.connector;

import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.sunbird.obsrv.connector.model.Models;
import org.sunbird.obsrv.connector.source.IConnectorSource;
import org.sunbird.obsrv.connector.source.SourceConnectorFunction;
import org.sunbird.obsrv.job.exception.UnsupportedDataFormatException;

import java.util.List;

SourceConnectorFunction

public class ExampleSourceFunction extends SourceConnectorFunction {
    public ExampleSourceFunction(List<Models.ConnectorContext> connectorContexts) {
        super(connectorContexts);
    }

    @Override
    public void processEvent(
        String event, 
        Function1<String, BoxedUnit> onSuccess, 
        Function2<String, org.sunbird.obsrv.job.model.Models.ErrorData, BoxedUnit> onFailure, 
        Function2<String, Object, BoxedUnit> incMetric
    ){
        // TODO: Implement this method to process the event
        // Call onSuccess.apply(event) if the event is processed successfully
        // Call onFailure.apply(event, errorData) if the event processing fails
        // Call incMetric.apply(event, metricData) to increment the metric
    }

    @Override
    public List<String> getMetrics() {
        // TODO: Return the list of metrics
        return List.empty();
    }
}

Ref: https://github.com/Sunbird-Obsrv/connector-sdk-scala/

IConnectorSource Class

public class ExampleSourceConnector extends IConnectorSource {
    @Override
    public SingleOutputStreamOperator<String> getSourceStream(
        StreamExecutionEnvironment env, Config config
    ) throws UnsupportedDataFormatException {
        // TODO: Implement this method to return the source stream
        // env.fromSource(...)
    }

    @Override
    public SourceConnectorFunction getSourceFunction(
        List<Models.ConnectorContext> contexts, Config config) 
    {
        return ExampleSourceFunction(contexts)
    }
}

Ref: https://github.com/Sunbird-Obsrv/connector-sdk-scala/

Last updated