Stream Interfaces
The SDK's expose an interface that has to be extended inorder to build a connector. A sample is shown below
Imports
package org.sunbird.obsrv.connector;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.sunbird.obsrv.connector.model.Models;
import org.sunbird.obsrv.connector.source.IConnectorSource;
import org.sunbird.obsrv.connector.source.SourceConnectorFunction;
import org.sunbird.obsrv.job.exception.UnsupportedDataFormatException;
import java.util.List;
SourceConnectorFunction
public class ExampleSourceFunction extends SourceConnectorFunction {
public ExampleSourceFunction(List<Models.ConnectorContext> connectorContexts) {
super(connectorContexts);
}
@Override
public void processEvent(
String event,
Function1<String, BoxedUnit> onSuccess,
Function2<String, org.sunbird.obsrv.job.model.Models.ErrorData, BoxedUnit> onFailure,
Function2<String, Object, BoxedUnit> incMetric
){
// TODO: Implement this method to process the event
// Call onSuccess.apply(event) if the event is processed successfully
// Call onFailure.apply(event, errorData) if the event processing fails
// Call incMetric.apply(event, metricData) to increment the metric
}
@Override
public List<String> getMetrics() {
// TODO: Return the list of metrics
return List.empty();
}
}
Ref: https://github.com/Sunbird-Obsrv/connector-sdk-scala/
IConnectorSource Class
public class ExampleSourceConnector extends IConnectorSource {
@Override
public SingleOutputStreamOperator<String> getSourceStream(
StreamExecutionEnvironment env, Config config
) throws UnsupportedDataFormatException {
// TODO: Implement this method to return the source stream
// env.fromSource(...)
}
@Override
public SourceConnectorFunction getSourceFunction(
List<Models.ConnectorContext> contexts, Config config)
{
return ExampleSourceFunction(contexts)
}
}
Last updated
Was this helpful?