Verifying
Environment Setup
Since connectors are configured at a dataset level, create a sample dataset. Here is a sample to create
new-york-taxi-data
dataset.
INSERT INTO public.datasets (id,dataset_id,"type","name",validation_config,extraction_config,dedup_config,data_schema,denorm_config,router_config,dataset_config,tags,data_version,status,created_by,updated_by,created_date,updated_date,published_date,api_version,"version",sample_data,entry_topic) VALUES
('new-york-taxi-data','new-york-taxi-data','event','new-york-taxi-data','{"validate": true, "mode": "Strict"}','{"is_batch_event": true, "batch_id": "id", "extraction_key": "events", "dedup_config": {"dedup_key": "id", "drop_duplicates": true, "dedup_period": 604800}}','{"dedup_key": "tripID", "drop_duplicates": true, "dedup_period": 604800}','{"$schema": "https://json-schema.org/draft/2020-12/schema", "type": "object", "properties": {"tripID": {"type": "string", "suggestions": [{"message": "The Property ''''tripID'''' appears to be ''''uuid'''' format type.", "advice": "Suggest to not to index the high cardinal columns", "resolutionType": "DEDUP", "severity": "LOW", "path": "properties.tripID"}], "arrival_format": "text", "data_type": "string"}, "VendorID": {"type": "string", "arrival_format": "text", "data_type": "string"}, "tpep_pickup_datetime": {"type": "string", "suggestions": [{"message": "The Property ''''tpep_pickup_datetime'''' appears to be ''''date-time'''' format type.", "advice": "The System can index all data on this column", "resolutionType": "INDEX", "severity": "LOW", "path": "properties.tpep_pickup_datetime"}], "arrival_format": "text", "data_type": "date-time"}, "tpep_dropoff_datetime": {"type": "string", "suggestions": [{"message": "The Property ''''tpep_dropoff_datetime'''' appears to be ''''date-time'''' format type.", "advice": "The System can index all data on this column", "resolutionType": "INDEX", "severity": "LOW", "path": "properties.tpep_dropoff_datetime"}], "arrival_format": "text", "data_type": "date-time"}, "passenger_count": {"type": "string", "arrival_format": "text", "data_type": "string"}, "trip_distance": {"type": "string", "arrival_format": "text", "data_type": "string"}, "RatecodeID": {"type": "string", "arrival_format": "text", "data_type": "string"}, "store_and_fwd_flag": {"type": "string", "arrival_format": "text", "data_type": "string"}, "PULocationID": {"type": "string", "arrival_format": "text", "data_type": "string"}, "DOLocationID": {"type": "string", "arrival_format": "text", "data_type": "string"}, "payment_type": {"type": "string", "arrival_format": "text", "data_type": "string"}, "primary_passenger": {"type": "object", "properties": {"email": {"type": "string", "arrival_format": "text", "data_type": "string"}, "mobile": {"type": "string", "arrival_format": "text", "data_type": "string"}}, "arrival_format": "object", "data_type": "object", "additionalProperties": false}, "fare_details": {"type": "object", "properties": {"fare_amount": {"type": "string", "arrival_format": "text", "data_type": "string"}, "extra": {"type": "string", "arrival_format": "text", "data_type": "string"}, "mta_tax": {"type": "string", "arrival_format": "text", "data_type": "string"}, "tip_amount": {"type": "string", "arrival_format": "text", "data_type": "string"}, "tolls_amount": {"type": "string", "arrival_format": "text", "data_type": "string"}, "improvement_surcharge": {"type": "string", "arrival_format": "text", "data_type": "string"}, "total_amount": {"type": "string", "arrival_format": "text", "data_type": "string"}, "congestion_surcharge": {"type": "string", "arrival_format": "text", "data_type": "string"}}, "arrival_format": "object", "data_type": "object", "additionalProperties": false}}, "additionalProperties": false}','{"denorm_fields": [], "redis_db_host": "localhost", "redis_db_port": 6379}','{"topic": "new-york-taxi-data"}','{"keys_config": {"timestamp_key": "obsrv_meta.syncts", "data_key": "", "partition_key": ""}, "indexing_config": {"olap_store_enabled": true, "lakehouse_enabled": false, "cache_enabled": false}, "cache_config": {"redis_db_host": "localhost", "redis_db_port": 6379, "redis_db": 0}, "file_upload_path": []}','{}',2,'Live','SYSTEM','SYSTEM','2024-09-12 13:13:15.400866','2024-09-17 11:26:00.774249','2024-09-17 11:26:00.774249','v2',1,'{"tripID": "0de066f6-e7e4-44e3-9d5a-af8be8cd360a", "VendorID": "2", "tpep_pickup_datetime": "2023-04-12 13:48:30", "tpep_dropoff_datetime": "2023-11-17 13:52:40", "passenger_count": "3", "trip_distance": ".00", "RatecodeID": "1", "store_and_fwd_flag": "N", "PULocationID": "236", "DOLocationID": "236", "payment_type": "1", "primary_passenger": {"email": "[email protected]", "mobile": "494-699-7052"}, "fare_details": {"fare_amount": "4.5", "extra": "0.5", "mta_tax": "0.5", "tip_amount": "0", "tolls_amount": "0", "improvement_surcharge": "0.3", "total_amount": "5.8", "congestion_surcharge": ""}, "suggestedPii": []}','local.ingest');
In order to run a connector, the connector must me registered in Obsrv. To do so, fill in the values and run the below query
INSERT INTO public.connector_registry (id,connector_id,"name","type",category,"version",description,technology,runtime,licence,"owner",iconurl,status,ui_spec,source_url,"source",created_by,updated_by,created_date,updated_date,live_date) VALUES
('example-connector',
'example-connector',
'Example Connector',
'source',
'stream',
'1.0.0',
'Pull data from a Source',
'scala',
'flink',
'MIT',
'Sunbird',
'data:image/svg+xml;base64,,',
'Live','{}','connector-1.0.0-distribution.tar.gz','{}',
'SYSTEM','SYSTEM',
now(), now(), now()
);
The connector must have an active instance inorder where we specify the mapping of the dataset and the connector created above.
INSERT INTO public.connector_instances (id,dataset_id,connector_id,connector_config,operations_config,status,connector_state,connector_stats,created_by,updated_by,created_date,updated_date,published_date) VALUES
('example-instance-1','new-york-taxi-data','example-connector-1.0.0',
'<aes-256-ecb encrypted string>',
'{}','Live','{}','{}','SYSTEM','SYSTEM', now(), now(), now()
);
To verify if the encryption is correct use an online tool like this where algorithm is AES, mode is ECB, No-Padding and key size is 256 bits (32 character).
For example the following JSON Config
{"source_ip": "localhost", "source_port": 5432, "source_db": "obsrv", "table": "datasets"}
with a secret
strong_encryption_key_to_encrypt
should generate an encrypted string equal to
mmT3tqwpq798ywgsdfEdhtp5VXcFzFjutmaEuFqAw9fl4G4OtK6vz89Nhd/deChFWzW3yJWB3Y//SpDzwc3RY31FGqwnAhhD3fpjXP+XYHthCWxBIKar5j8pdTBZ867J
which can be verfied online, before you encrypt your config. Upon decrypting it should be a valid JSON
Make sure you have the configuration files based on the language you are writing the connector in
Here is a sample file to be used in Java / Scala
postgres {
host = localhost
port = 5432
maxConnections = 2
user = "postgres"
password = "postgres"
database = "obsrv"
}
kafka {
producer = {
broker-servers = "localhost:9092"
compression = "snappy"
max-request-size = 1000000 # 1MB
}
output = {
connector = {
failed.topic = "connector.failed"
metric.topic = "obsrv-connectors-metrics"
}
}
}
obsrv.encryption.key = "strong_encryption_key_to_encrypt"
task {
checkpointing.compressed = true
checkpointing.interval = 60000
checkpointing.pause.between.seconds = 30000
restart-strategy.attempts = 3
restart-strategy.delay = 30000 # in milli-seconds
parallelism = 1
consumer.parallelism = 1
downstream.operators.parallelism = 1
}
env = local
building-block = obsrv
Please make sure you update the database and kafka connection values as per your setup
Running Stream Connectors
Make sure you have Apache Flink running and is ready to accept jobs for processing. Make sure you have atleast ONE task slot for submitting the job
Submit the JAR using flink submit.
$FLINK_HOME/bin/flink run <path_to_jar> --port <random_port> --config.file.path <path_to_obsrv-connector-config.conf> --metadata.id <id_from_connector_registry>
Running Batch Connectors
Make sure you have spark installed
Submit the JAR to Spark using the following command
spark-submit --master=local[*] --jars <include_dependent_jars> --class <main_class> <path_to_jar> -f <path_to_obsrv-connector-config.conf> -c <id_from_connector_instances>
Last updated
Was this helpful?