
import ndjsonStream from 'can-ndjson-stream';
import { featureActive } from '@sb-itops/feature';
import { fetchPostP } from '@sb-itops/redux/fetch';
import uuid from '@sb-itops/uuid';

// Timeout is time since last progress notification, i.e. last chunk read from DB.
const SYNC_TIMEOUT_MS = 180000; // 3 minutes.

/**
 * Persist function has been put here so I can ride in on the generic caches implementation, but keep them entirely separate
 *
 * @param {object} param
 * @param {object} param.cache
 * @param {Function} param.notifiedOperation See `sbNotifiedOperation`
 */
export const fastSync = async ({ cache, notifiedOperation }) => {
  try {
    const desiredFileFormat = featureActive('BB-7526') ? 'jsonl' : undefined;

    // Send the request to the bulk sync endpoint and wait for notification indicating that the entity
    // data has been uploaded to S3.
    // desiredFileFormat is a hint to the endpoint. The endpoint may not support the specified format,
    // therefore we need to check the `fileFormat` from the response - undefined means regular json
    const { url, lastUpdated, fileFormat } = await waitForNotification(cache, notifiedOperation, desiredFileFormat);

    // Download the entity data from s3
    const response = await fetch(url);

    let entities;
    if (fileFormat === 'jsonl') {
      const stream = ndjsonStream(response.body);
      entities = await streamToEntityArray(stream);
    } else {
      const result = await response.json();
      entities = result.entities;
    }

    return {
      lastUpdated: lastUpdated || '',
      entityChanges: entities,
    };
  }
  catch (err) {
    console.error('FAST SYNC ERROR:', cache && cache.name, err); //eslint-disable-line no-console
    // if something goes wrong, set lastUpdated to the default initial value
    // so that the natural syncing process can occur
    return { lastUpdated: '', entityChanges: [] };
  }
};

async function waitForNotification (cache, notifiedOperation, fileFormat) {
  const requestObj = await cache.requestConstructor(cache.lastUpdated);
  const path = requestObj.url
    .replace('v2/', '')
    .replace('/since', '/bulk-since');

  const requestId = uuid();
  const fetchOptions = { body: JSON.stringify({ requestId, fileFormat }) };
  const notification = await notifiedOperation(async () => fetchPostP({ path, fetchOptions }), {
    completionNotification: 'DataCollectionFinished',
    progressNotification: 'DataCollectionProgress',
    requestId,
    timeoutMs: SYNC_TIMEOUT_MS,
    progressResetsTimeout: true,
  });

  if (notification.status === 'ERROR') {
    throw notification.payload.message;
  }

  return {
    url: notification.payload.preSignedUrl,
    lastUpdated: notification.payload.syncValue,
    fileFormat: notification.payload.fileFormat,
  };
}

async function streamToEntityArray(readableStream) {
  const reader = readableStream.getReader();
  const chunks = [];

  let finished = false;
  while (!finished) {
    const { value, done } = await reader.read();

    if (done) {
      finished = true;
      return chunks;
    }

    chunks.push(value);
  }
}
