Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/LogCollector.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import "core-js";
import config from "./config.js";


export default class LogCollector {

/**
* @returns {LogCollector}
*/
constructor(bucket, key) {
this.bucket = bucket;
this.key = key;
return this;
}

/**
* Insert data async
*
* @param parser Parser
* @returns {Promise}
*/
insertAsync(parser) {
return parser.parseAsync();
}
}
17 changes: 17 additions & 0 deletions src/aws/Elasticsearch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import "core-js";
import AWS from "./index.js";


export default class Elasticsearch extends LogCollector {
constructor(bucket, key) {
super(bucket, key);
}

insertAsync(parser) {
let self = this;
return parser.parseAsync().then(parsedParser => {
let jsonString = parsedParser.toJSON();

});
}
}
70 changes: 32 additions & 38 deletions src/gcp/BigQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,23 @@ import "core-js";
import Google from "googleapis";
import GcpOAuth2 from "./OAuth2.js";
import config from "../config.js";
import LogCollector from "../LogCollector.js";


export default class BigQuery {
constructor(tableId=config.gcp.bigQuery.tableId, datasetId=config.gcp.bigQuery.datasetId, projectId=config.gcp.projectId) {
export default class BigQuery extends LogCollector {

/**
* @param bucket
* @param key
* @param tableId
* @param datasetId
* @param projectId
*/
constructor(bucket, key,
tableId=config.gcp.bigQuery.tableId,
datasetId=config.gcp.bigQuery.datasetId,
projectId=config.gcp.projectId) {
super(bucket, key);
this.auth = new GcpOAuth2().auth;
this.tableId = tableId;
this.datasetId = datasetId;
Expand All @@ -14,55 +27,36 @@ export default class BigQuery {
version: "v2",
auth: this.auth
});

if (config.gcp.bigQuery.decideTableIdFromBucketName) {
this.tableId = bucket.replace(/\./g, "_");
}
}

insert(jsonObject, callback=()=>{}) {
console.log(jsonObject);
/**
* Insert json string given to BigQuery.
*
* @param parser Parser
* @returns {Promise}
*/
insertAsync(parser) {
let self = this;
this.bigquery.tabledata.insertAll({
"projectId": self.projectId,
"datasetId": self.datasetId,
"tableId": self.tableId,
"resource": {
"kind": "bigquery#tableDataInsertAllRequest",
"rows": [
{
"json": jsonObject
}
]
}
}, (error, result) => {
if (error) {
return console.error(error);
}
console.log(result);
callback(result);
});
}
return parser.parseAsync().then(parsedParser => {
let jsonString = parsedParser.toJSON();

insertAsync(jsonObject) {
return new Promise((resolve, reject) => {
console.log(`[insert] BigQuery`);
console.log(jsonObject);
let self = this;
this.bigquery.tabledata.insertAll({
return new Promise((resolve, reject) => self.bigquery.tabledata.insertAll({
"projectId": self.projectId,
"datasetId": self.datasetId,
"tableId": self.tableId,
"tableId": self.tableId,
"resource": {
"kind": "bigquery#tableDataInsertAllRequest",
"rows": [
{
"json": jsonObject
"json": jsonString
}
]
}
}, (error, result) => {
if (error) {
return reject(error);
}
return resolve(result);
})
}, (error, result) => error ? reject(error) : resolve(result)));
});
}
}
17 changes: 5 additions & 12 deletions src/main.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import "core-js";
import Parser from "./parser.js";
import Parser from "./Parser.js";
import BigQuery from "./gcp/BigQuery.js";
import S3 from "./aws/S3.js";

Expand All @@ -10,19 +10,12 @@ export function handler(event, context) {
let bucket = event.Records[0].s3.bucket.name;
let key = event.Records[0].s3.object.key;

let [s3, bq] = [new S3(), new BigQuery()];
let [s3, bq] = [new S3(), new BigQuery(bucket, key)];

if (config.gcp.bigQuery.decideTableIdFromBucketName) {
bq.tableId = bucket.replace(/\./g, "_");
}

s3.get(key, bucket).then(data => {
return Promise.all(data.toString().split(/\r\n|\r|\n/).slice(0, -1).map(line => {
console.log("splitted!", line);
return new Parser(line).parseAsync().then(data =>
bq.insertAsync(data));
}))
})
s3.get(key, bucket).then(data =>
Promise.all(data.toString().split(/\r\n|\r|\n/).slice(0, -1).map(line =>
bq.insertAsync(new Parser(line)))))
.catch(error =>
context.fail(error))
.then(msg =>
Expand Down
35 changes: 32 additions & 3 deletions src/parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ export default class Parser {
this.line = line;
}

_object(parts) {
/**
* Mapping parts
*
* @param parts
* @returns {{}}
* @private
*/
_mapping(parts) {
let data = {};
data.BucketOwner = parts[0];
data.Bucket = parts[1];
Expand All @@ -32,20 +39,37 @@ export default class Parser {
return data;
}

/**
* Set Time to unixtime
*
* @param data
* @returns {*}
* @private
*/
_format(data) {

data.Time = Moment(data.Time, "DD/MMM/YYYY:HH:mm:ss Z").utc().unix();

return data;
}

/**
* Parse async
*
* @returns {Promise}, Parser(self)
*/
parseAsync() {
let self = this;
return new Promise(resolve => {
resolve(self.parse())
});
}

/**
* Parse and return self
*
* @returns {Parser}
*/
parse() {
let parts = [];
let restString = this.line;
Expand All @@ -72,10 +96,15 @@ export default class Parser {
}
}

this.data = this._format(this._object(parts));
return this.data;
this.data = this._format(this._mapping(parts));
return this;
}

/**
* Get json string
*
* @returns string
*/
toJSON() {
return JSON.stringify(this.data);
}
Expand Down