How to copy feeding data
There are multiple ways to copy feeding data, including:
- Using a Python script
- Using a feeding pipeline
Using a python script to copy the data
- open a tunnel to your elastic cluster
ssh elastic01 -L9200:elastic01:9200
- import qsf-commons-py
import sys
sys.path.append("/home/user/projects/qsf-commons-py/qsf_commons/qsc")
- export the data from elastic to a file
- extract the payload
- ignore delete events
- store the data as jsonl
from elastic_exporter import ElasticExporter
import json
class CustomExporter(ElasticExporter):
def process_hits(self, hits):
"""Custom callback to process hits."""
for hit in hits:
#self.processed_docs_count += 1
# Do custom processing here, for example:
self.status.inc_processed_docs_count()
source = hit["_source"]
if "payload" in source:
payload = json.loads(source['payload'])
if "update" == payload["header"]["action"]:
yield json.dumps(payload) + "\n"
es_url = "http://localhost:9200"
index_name = "feeding.prod.demo.products_data"
exporter = CustomExporter(es_url, index_name)
exporter.export_to_jsonl()
- configure the token
- configure the feeding_url
- configure the file_path
from qsc_feeder import QscFeeder
def feed_data(file_path, feeding_url, token):
feeder = QscFeeder(file_path, feeding_url, token, batchSize=100)
#feeder.set_format("payload")
#feeder.set_id_field("id")
#feeder.fullfeed_start()
feeder.set_offset(148100)
feeder.feed()
feeder.fullfeed_end()
token="****"
feeding_url="https://qsc.quasiris.de/api/v1/data/bulk/qsc/demo/products"
file_path = "elastic-export-feeding.prod.demo.products_data.json"
feed_data(file_path, feeding_url, token)
Using a feeding pipeline to copy the data
name: "copy data (for dev purposes) from feeding queue and push another queue."
variables:
qscSourceUrl: "https://qsc.quasiris.de"
qscTargetUrl: "https://qsc2.quasiris.de"
qscToken: "xyz"
source: "ab/products"
target: "ab/products"
fullCopy: "true"
feedingType: "qsc"
reader:
name: "QscFeedingQueueReader"
clazz: "com.quasiris.qsc.feeding.pipeline.QscFeedingQueuePayloadReader"
property:
url: "${variables.qscSourceUrl}/api/v1/feeding/${variables.source}/_data-full"
paginate: "${variables.fullCopy}"
filter:
- name: "Status filter"
clazz: "solrcmdutils.StatusTimeFilter"
- name: "QSC Data Push Writer"
clazz: "com.quasiris.qsc.feeding.pipeline.QscPayloadDataPushWriter"
property:
url: "${variables.qscTargetUrl}/api/v1/data/bulk/${variables.feedingType}/${variables.target}"
header:
- "content-type: application/json"
- "X-QSC-Token: ${variables.qscToken}"
batchSize: "100"
payloadField: "payload"
fullfeedEnabled: "false"