Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,19 @@ export const sanitizeSQL = sql => {
};

const _formatSQL = sql => sql.toLowerCase().trim();

export const formatRows = async (format, rows) => {
let formattedRows;
switch (format) {
case 'csv':
formattedRows = await rowsToCSV(rows);
break;
case 'ndjson':
formattedRows = rowsToNDJSON(rows);
break;
case 'json':
formattedRows = JSON.stringify(rows);
break;
}
return formattedRows;
};
56 changes: 38 additions & 18 deletions src/modules/datasources.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { getLogger } from '../lib/logger';
const logger = getLogger('datasources-module');
import { fetch } from '../lib/http';
import Exceptions from '../lib/exceptions';
import { rowsToCSV, rowsToNDJSON } from '../lib/utils';
import { formatRows } from '../lib/utils';
import FormData from 'form-data';
import fs from 'fs';

Expand Down Expand Up @@ -166,23 +166,12 @@ export default {
* @function appendRows
* @param { string } name Datasource name
* @param { object } rows Rows to append
* @param { string } [format=csv] Datasource format, one of: json, csv, ndjson, parquet.
* @param { string } [format=csv] Rows format, one of: json, csv, ndjson, parquet.
* @return { Promise<boolean> } Result as boolean
*/
appendRows: async (name, rows, format='csv') => {
try {
let formattedRows;
switch(format) {
case 'csv':
formattedRows = await rowsToCSV(rows);
break;
case 'ndjson':
formattedRows = rowsToNDJSON(rows);
break;
case 'json':
formattedRows = JSON.stringify(rows);
break;
}
let formattedRows = await formatRows(format, rows);

// Add delimiter when format is csv
if (format === 'csv') {
Expand All @@ -208,16 +197,47 @@ export default {


/**
* Replace datasource with these rows, deleting everything else as an idempotent operation
* Replace datasource with these rows, deleting everything else as an idempotent operation or optionally those rows matching a replace condition
*
* @function replaceWithRows
* @param { string } name Datasource name
* @param { object } rows Rows to replace with
* @param { string } [format=csv] Rows format, one of: json, csv, ndjson, parquet.
* @param { string } [replaceCondition] When used in combination with the replace mode it allows you to replace a portion of your Data Source that matches the replace_condition SQL statement
* @return { Promise<boolean> } Result as boolean
*/
replaceWithRows: () => {
// TODO: implement
throw new Error(Exceptions.METHOD_NOT_IMPLEMENTED);
replaceWithRows: async (name, rows, format='csv', replaceCondition) => {
try {
let formattedRows = await formatRows(format, rows);
let replaceStatement = '';

// Add delimiter when format is csv
if (format === 'csv') {
format += '&dialect_delimiter=,';
}

// Add replace condition if exists
if (replaceCondition) {
replaceStatement = `&replace_condition=(${replaceCondition})`;
}

const result = await fetch(`/v0/datasources?name=${name}&format=${format}&mode=replace${replaceStatement}`, {
method: 'POST',
headers: {
'Content-Type': 'text/csv;charset=utf-8',
},
body: formattedRows
});

logger.debug(`Datasource ${name} replaced with rows: ${rows.length}`);
return result['error'] === false;
} catch (error) {
logger.error(`Error while replacing rows to datasource ${name}`);
logger.debug(`Format: ${format}`);
logger.debug(`Replace condition: ${replaceCondition}`);
logger.debug('Request: /v0/datasources/(.+)');
logger.debug(error);
}
},

/**
Expand Down
43 changes: 41 additions & 2 deletions test/datasources-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,47 @@ describe('Test Datasources API', () => {

expect(characters.length).to.eq(0);
} catch (error) {
console.log(error)
expect(error).to.be.null;
}
});

it('should replace datasource with rows', async () => {
try {
const rows = [
{ name: 'Leia', profession: 'Princess', age: 32 },
{ name: 'Anakin', profession: 'Jedi', age: 50 },
{ name: 'Obi', profession: 'Jedi', age: 65 }
];

// Avoid API throttling (429)
await new Promise(resolve => setTimeout(resolve, 30000));
await tb.replaceWithRows(datasourceName, rows);

const result = await tb.query(`select * from ${datasourceName}`);
const characters = result['data'];

expect(characters.length).to.eq(3);
} catch (error) {
expect(error).to.be.null;
}
});

it('should replace datasource with rows matching a condition', async () => {
try {
const rows = [
{ name: 'Mace Windu', profession: 'Jedi', age: 50 }
];

// Avoid API throttling (429)
await new Promise(resolve => setTimeout(resolve, 30000));
await tb.replaceWithRows(datasourceName, rows, 'csv', 'profession = \'Jedi\'');

const result = await tb.query(`select * from ${datasourceName} where profession = 'Jedi'`);
const characters = result['data'];

expect(characters.length).to.eq(1);
expect(characters[0]['name']).to.eq('Mace Windu');
} catch (error) {
expect(error).to.be.null;
}
});
Expand All @@ -179,7 +219,6 @@ describe('Test Datasources API', () => {

expect(characters.length).to.eq(0);
} catch (error) {
console.log(error)
expect(error).to.be.null;
}
});
Expand Down