refactored server
This commit is contained in:
parent
6d470b8eba
commit
3fd2537311
36 changed files with 2944 additions and 792 deletions
50
server/storage/fs.js
Normal file
50
server/storage/fs.js
Normal file
|
@ -0,0 +1,50 @@
|
|||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
const promisify = require('util').promisify;
|
||||
const mkdirp = require('mkdirp');
|
||||
|
||||
const stat = promisify(fs.stat);
|
||||
|
||||
class FSStorage {
|
||||
constructor(config, log) {
|
||||
this.log = log;
|
||||
this.dir = config.file_dir;
|
||||
mkdirp.sync(this.dir);
|
||||
}
|
||||
|
||||
async length(id) {
|
||||
const result = await stat(path.join(this.dir, id));
|
||||
return result.size;
|
||||
}
|
||||
|
||||
getStream(id) {
|
||||
return fs.createReadStream(path.join(this.dir, id));
|
||||
}
|
||||
|
||||
set(id, file) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const filepath = path.join(this.dir, id);
|
||||
const fstream = fs.createWriteStream(filepath);
|
||||
file.pipe(fstream);
|
||||
file.on('limit', () => {
|
||||
file.unpipe(fstream);
|
||||
fstream.destroy(new Error('limit'));
|
||||
});
|
||||
fstream.on('error', err => {
|
||||
fs.unlinkSync(filepath);
|
||||
reject(err);
|
||||
});
|
||||
fstream.on('finish', resolve);
|
||||
});
|
||||
}
|
||||
|
||||
del(id) {
|
||||
return Promise.resolve(fs.unlinkSync(path.join(this.dir, id)));
|
||||
}
|
||||
|
||||
ping() {
|
||||
return Promise.resolve();
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = FSStorage;
|
57
server/storage/index.js
Normal file
57
server/storage/index.js
Normal file
|
@ -0,0 +1,57 @@
|
|||
const config = require('../config');
|
||||
const Metadata = require('../metadata');
|
||||
const mozlog = require('../log');
|
||||
const createRedisClient = require('./redis');
|
||||
|
||||
class DB {
|
||||
constructor(config) {
|
||||
const Storage = config.s3_bucket ? require('./s3') : require('./fs');
|
||||
this.log = mozlog('send.storage');
|
||||
this.expireSeconds = config.expire_seconds;
|
||||
this.storage = new Storage(config, this.log);
|
||||
this.redis = createRedisClient(config);
|
||||
this.redis.on('error', err => {
|
||||
this.log.error('Redis:', err);
|
||||
});
|
||||
}
|
||||
|
||||
async ttl(id) {
|
||||
const result = await this.redis.ttlAsync(id);
|
||||
return Math.ceil(result) * 1000;
|
||||
}
|
||||
|
||||
length(id) {
|
||||
return this.storage.length(id);
|
||||
}
|
||||
|
||||
get(id) {
|
||||
return this.storage.getStream(id);
|
||||
}
|
||||
|
||||
async set(id, file, meta) {
|
||||
await this.storage.set(id, file);
|
||||
this.redis.hmset(id, meta);
|
||||
this.redis.expire(id, this.expireSeconds);
|
||||
}
|
||||
|
||||
setField(id, key, value) {
|
||||
this.redis.hset(id, key, value);
|
||||
}
|
||||
|
||||
del(id) {
|
||||
this.redis.del(id);
|
||||
return this.storage.del(id);
|
||||
}
|
||||
|
||||
async ping() {
|
||||
await this.redis.pingAsync();
|
||||
await this.storage.ping();
|
||||
}
|
||||
|
||||
async metadata(id) {
|
||||
const result = await this.redis.hgetallAsync(id);
|
||||
return result && new Metadata(result);
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = new DB(config);
|
21
server/storage/redis.js
Normal file
21
server/storage/redis.js
Normal file
|
@ -0,0 +1,21 @@
|
|||
const promisify = require('util').promisify;
|
||||
|
||||
module.exports = function(config) {
|
||||
const redis_lib =
|
||||
config.env === 'development' && config.redis_host === 'localhost'
|
||||
? 'redis-mock'
|
||||
: 'redis';
|
||||
|
||||
//eslint-disable-next-line security/detect-non-literal-require
|
||||
const redis = require(redis_lib);
|
||||
const client = redis.createClient({
|
||||
host: config.redis_host,
|
||||
connect_timeout: 10000
|
||||
});
|
||||
|
||||
client.ttlAsync = promisify(client.ttl);
|
||||
client.hgetallAsync = promisify(client.hgetall);
|
||||
client.hgetAsync = promisify(client.hget);
|
||||
client.pingAsync = promisify(client.ping);
|
||||
return client;
|
||||
};
|
51
server/storage/s3.js
Normal file
51
server/storage/s3.js
Normal file
|
@ -0,0 +1,51 @@
|
|||
const AWS = require('aws-sdk');
|
||||
const s3 = new AWS.S3();
|
||||
|
||||
class S3Storage {
|
||||
constructor(config, log) {
|
||||
this.bucket = config.s3_bucket;
|
||||
this.log = log;
|
||||
}
|
||||
|
||||
async length(id) {
|
||||
const result = await s3
|
||||
.headObject({ Bucket: this.bucket, Key: id })
|
||||
.promise();
|
||||
return result.ContentLength;
|
||||
}
|
||||
|
||||
getStream(id) {
|
||||
return s3.getObject({ Bucket: this.bucket, Key: id }).createReadStream();
|
||||
}
|
||||
|
||||
async set(id, file) {
|
||||
let hitLimit = false;
|
||||
const upload = s3.upload({
|
||||
Bucket: this.bucket,
|
||||
Key: id,
|
||||
Body: file
|
||||
});
|
||||
file.on('limit', () => {
|
||||
hitLimit = true;
|
||||
upload.abort();
|
||||
});
|
||||
try {
|
||||
await upload.promise();
|
||||
} catch (e) {
|
||||
if (hitLimit) {
|
||||
throw new Error('limit');
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
del(id) {
|
||||
return s3.deleteObject({ Bucket: this.bucket, Key: id }).promise();
|
||||
}
|
||||
|
||||
ping() {
|
||||
return s3.headBucket({ Bucket: this.bucket }).promise();
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = S3Storage;
|
Loading…
Add table
Add a link
Reference in a new issue