Skip to main content

How to copy feeding data

There are multiple ways to copy feeding data, including:

  • Using a Python script
  • Using a feeding pipeline

Using a python script to copy the data

  • open a tunnel to your elastic cluster
ssh elastic01 -L9200:elastic01:9200

  • import qsf-commons-py
import sys
sys.path.append("/home/user/projects/qsf-commons-py/qsf_commons/qsc")
  • export the data from elastic to a file
  • extract the payload
  • ignore delete events
  • store the data as jsonl

from elastic_exporter import ElasticExporter
import json

class CustomExporter(ElasticExporter):
def process_hits(self, hits):
"""Custom callback to process hits."""
for hit in hits:
#self.processed_docs_count += 1
# Do custom processing here, for example:
self.status.inc_processed_docs_count()
source = hit["_source"]
if "payload" in source:
payload = json.loads(source['payload'])
if "update" == payload["header"]["action"]:
yield json.dumps(payload) + "\n"



es_url = "http://localhost:9200"
index_name = "feeding.prod.demo.products_data"
exporter = CustomExporter(es_url, index_name)
exporter.export_to_jsonl()
  • configure the token
  • configure the feeding_url
  • configure the file_path
from qsc_feeder import QscFeeder

def feed_data(file_path, feeding_url, token):
feeder = QscFeeder(file_path, feeding_url, token, batchSize=100)
#feeder.set_format("payload")
#feeder.set_id_field("id")
#feeder.fullfeed_start()
feeder.set_offset(148100)
feeder.feed()
feeder.fullfeed_end()

token="****"
feeding_url="https://qsc.quasiris.de/api/v1/data/bulk/qsc/demo/products"
file_path = "elastic-export-feeding.prod.demo.products_data.json"
feed_data(file_path, feeding_url, token)

Using a feeding pipeline to copy the data

name: "copy data (for dev purposes) from feeding queue and push another queue."
variables:
qscSourceUrl: "https://qsc.quasiris.de"
qscTargetUrl: "https://qsc2.quasiris.de"
qscToken: "xyz"
source: "ab/products"
target: "ab/products"
fullCopy: "true"
feedingType: "qsc"
reader:
name: "QscFeedingQueueReader"
clazz: "com.quasiris.qsc.feeding.pipeline.QscFeedingQueuePayloadReader"
property:
url: "${variables.qscSourceUrl}/api/v1/feeding/${variables.source}/_data-full"
paginate: "${variables.fullCopy}"
filter:
- name: "Status filter"
clazz: "solrcmdutils.StatusTimeFilter"
- name: "QSC Data Push Writer"
clazz: "com.quasiris.qsc.feeding.pipeline.QscPayloadDataPushWriter"
property:
url: "${variables.qscTargetUrl}/api/v1/data/bulk/${variables.feedingType}/${variables.target}"
header:
- "content-type: application/json"
- "X-QSC-Token: ${variables.qscToken}"
batchSize: "100"
payloadField: "payload"
fullfeedEnabled: "false"