From 272560311735607843297284c51f8feb7481067a Mon Sep 17 00:00:00 2001 From: csg01123119 Date: Tue, 11 Jul 2023 17:57:30 +0800 Subject: [PATCH] fix: during the sharding upload process, a request id is thrown during abort --- lib/browser/managed-upload.js | 14 ++- lib/common/multipart-copy.js | 11 +- lib/common/parallel.js | 5 +- lib/managed-upload.js | 4 +- test/browser/browser.test.js | 9 +- test/node/multipart.test.js | 26 +++-- test/node/multiversion.test.js | 187 ++++++++++++++++++++++----------- test/node/object.test.js | 1 - test/node/rtmp.test.js | 36 ++++--- 9 files changed, 189 insertions(+), 104 deletions(-) diff --git a/lib/browser/managed-upload.js b/lib/browser/managed-upload.js index 07974d4f3..5adf90546 100644 --- a/lib/browser/managed-upload.js +++ b/lib/browser/managed-upload.js @@ -143,8 +143,8 @@ proto._resumeMultipart = async function _resumeMultipart(checkpoint, options) { try { result = await self._uploadPart(name, uploadId, partNo, data, options); } catch (error) { - if (error.status === 404) { - throw self._makeAbortEvent(); + if (error.status === 404 && self.isCancel()) { + throw self._makeAbortEvent(error); } throw error; } @@ -187,10 +187,8 @@ proto._resumeMultipart = async function _resumeMultipart(checkpoint, options) { const parallel = options.parallel || defaultParallel; // upload in parallel - const jobErr = await this._parallel( - todo, - parallel, - value => new Promise((resolve, reject) => { + const jobErr = await this._parallel(todo, parallel, value => { + return new Promise((resolve, reject) => { uploadPartJob(that, value) .then(result => { if (result) { @@ -201,8 +199,8 @@ proto._resumeMultipart = async function _resumeMultipart(checkpoint, options) { .catch(err => { reject(err); }); - }) - ); + }); + }); multipartFinish = true; const abortEvent = jobErr.find(err => err.name === 'abort'); diff --git a/lib/common/multipart-copy.js b/lib/common/multipart-copy.js index 6dc790941..94ebfec62 100644 --- a/lib/common/multipart-copy.js +++ b/lib/common/multipart-copy.js @@ -5,7 +5,6 @@ const copy = require('copy-to'); const proto = exports; - /** * Upload a part copy in a multipart from the source bucket/object * used with initMultipartUpload and completeMultipartUpload. @@ -120,9 +119,7 @@ proto._resumeMultipartCopy = async function _resumeMultipartCopy(checkpoint, sou const metaOpt = { versionId }; - const { - copySize, partSize, uploadId, doneParts, name - } = checkpoint; + const { copySize, partSize, uploadId, doneParts, name } = checkpoint; const partOffs = this._divideMultipartCopyParts(copySize, partSize, sourceData.startOffset); const numParts = partOffs.length; @@ -149,8 +146,8 @@ proto._resumeMultipartCopy = async function _resumeMultipartCopy(checkpoint, sou try { result = await self.uploadPartCopy(name, uploadId, partNo, range, source, uploadPartCopyOptions); } catch (error) { - if (error.status === 404) { - throw self._makeAbortEvent(); + if (error.status === 404 && self.isCancel()) { + throw self._makeAbortEvent(error); } throw error; } @@ -216,7 +213,7 @@ proto._divideMultipartCopyParts = function _divideMultipartCopyParts(fileSize, p const partOffs = []; for (let i = 0; i < numParts; i++) { - const start = (partSize * i) + startOffset; + const start = partSize * i + startOffset; const end = Math.min(start + partSize, fileSize + startOffset); partOffs.push({ diff --git a/lib/common/parallel.js b/lib/common/parallel.js index 09ca156f9..c0ba9b2b4 100644 --- a/lib/common/parallel.js +++ b/lib/common/parallel.js @@ -167,11 +167,12 @@ proto._makeCancelEvent = function _makeCancelEvent() { }; // abort is not error , so create an object -proto._makeAbortEvent = function _makeAbortEvent() { +proto._makeAbortEvent = function _makeAbortEvent(error) { const abortEvent = { status: 0, name: 'abort', - message: 'upload task has been abort' + message: 'upload task has been abort', + requestId: error ? error.requestId : undefined }; return abortEvent; }; diff --git a/lib/managed-upload.js b/lib/managed-upload.js index c1a060812..4442b4c7c 100644 --- a/lib/managed-upload.js +++ b/lib/managed-upload.js @@ -152,8 +152,8 @@ proto._resumeMultipart = async function _resumeMultipart(checkpoint, options) { result = await self._uploadPart(name, uploadId, partNo, data, options); } catch (error) { removeStreamFromMultipartUploadStreams(); - if (error.status === 404) { - throw self._makeAbortEvent(); + if (error.status === 404 && self.isCancel()) { + throw self._makeAbortEvent(error); } throw error; } diff --git a/test/browser/browser.test.js b/test/browser/browser.test.js index c4e53d83d..d9c1d3ab8 100644 --- a/test/browser/browser.test.js +++ b/test/browser/browser.test.js @@ -1792,9 +1792,13 @@ describe('browser', () => { const file = new File([fileContent], 'multipart-upload-file'); const name = `${prefix}multipart/upload-file`; const uploadPart = store._uploadPart; + const requestId = 'KDJSJJSHDEEEEEEWWW'; + store._uploadPart = () => { + store._stop(); const e = new Error('TEST Not Found'); e.status = 404; + e.requestId = requestId; throw e; }; let netErrs; @@ -1803,8 +1807,11 @@ describe('browser', () => { } catch (err) { netErrs = err; } + store.resetCancelFlag(); + assert.strictEqual(netErrs.status, 0); assert.strictEqual(netErrs.name, 'abort'); + assert.strictEqual(netErrs.requestId, requestId); store._uploadPart = uploadPart; }); }); @@ -2430,7 +2437,7 @@ describe('browser', () => { .fill('a') .join(''); const fileName = new File([fileContent], 'multipart-upload-kms'); - const objectKey = 'multipart-upload-file-set-header-browser-test'; + const objectKey = `multipart-upload-file-set-header-browser-test-${Date.now()}`; const req = store.urllib.request; let header; mm(store.urllib, 'request', (url, args) => { diff --git a/test/node/multipart.test.js b/test/node/multipart.test.js index 153c5df13..37f579408 100644 --- a/test/node/multipart.test.js +++ b/test/node/multipart.test.js @@ -641,21 +641,29 @@ describe('test/multipart.test.js', () => { it('should request throw abort event', async () => { const fileName = await utils.createTempFile('multipart-upload-file', 1024 * 1024); // 1m const name = `${prefix}multipart/upload-file`; - const stubNetError = sinon.stub(store, '_uploadPart'); - const netErr = new Error('Not Found'); - netErr.status = 404; - netErr.code = 'Not Found'; - netErr.name = 'Not Found'; - stubNetError.throws(netErr); + const requestId = 'KDJSJJSHDEEEEEEWWW'; + mm(store, '_uploadPart', () => { + store._stop(); + const netErr = new Error('Not Found'); + netErr.status = 404; + netErr.code = 'Not Found'; + netErr.name = 'Not Found'; + netErr.requestId = requestId; + throw netErr; + }); + let netErrs; try { await store.multipartUpload(name, fileName); } catch (err) { netErrs = err; } + store.resetCancelFlag(); + mm.restore(); + assert.strictEqual(netErrs.status, 0); assert.strictEqual(netErrs.name, 'abort'); - store._uploadPart.restore(); + assert.strictEqual(netErrs.requestId, requestId); }); }); @@ -973,8 +981,8 @@ describe('test/multipart.test.js', () => { afterEach(mm.restore); it('Test whether the speed limit setting for sharded upload is effective', async () => { - const file = await utils.createTempFile('multipart-upload-file-set-header', 101 * 1024); - const objectKey = `${prefix}multipart/upload-file-set-header`; + const file = await utils.createTempFile(`multipart-upload-file-set-header-${Date.now()}`, 101 * 1024); + const objectKey = `${prefix}multipart/upload-file-set-header-${Date.now()}`; const req = store.urllib.request; let header; mm(store.urllib, 'request', (url, args) => { diff --git a/test/node/multiversion.test.js b/test/node/multiversion.test.js index 272d7e485..8f2869bc9 100644 --- a/test/node/multiversion.test.js +++ b/test/node/multiversion.test.js @@ -3,6 +3,7 @@ const utils = require('./utils'); const oss = require('../..'); const config = require('../config').oss; const fs = require('fs'); +const mm = require('mm'); const ms = require('humanize-ms'); const { metaSyncTime } = require('../config'); @@ -92,7 +93,12 @@ describe('test/multiversion.test.js', () => { } }); it('should getBucketVersions with delimiter', async () => { - const names = ['getBucketVersions/delimiter1.js', 'getBucketVersions/delimiter2.js', 'getBucketVersions/delimiter3.js', 'others.js']; + const names = [ + 'getBucketVersions/delimiter1.js', + 'getBucketVersions/delimiter2.js', + 'getBucketVersions/delimiter3.js', + 'others.js' + ]; await Promise.all(names.map(_name => store.put(_name, __filename))); try { const result = await store.getBucketVersions({ @@ -107,36 +113,44 @@ describe('test/multiversion.test.js', () => { describe('putBucketLifecycle() getBucketLifecycle()', async () => { it('should putBucketLifecycle with NoncurrentVersionExpiration', async () => { - const putresult1 = await store.putBucketLifecycle(bucket, [{ - id: 'expiration1', - prefix: 'logs/', - status: 'Enabled', - expiration: { - days: 1 - }, - noncurrentVersionExpiration: { - noncurrentDays: 1 + const putresult1 = await store.putBucketLifecycle( + bucket, + [ + { + id: 'expiration1', + prefix: 'logs/', + status: 'Enabled', + expiration: { + days: 1 + }, + noncurrentVersionExpiration: { + noncurrentDays: 1 + } + } + ], + { + timeout: 120000 } - }], { - timeout: 120000 - }); + ); await utils.sleep(ms(metaSyncTime)); assert.strictEqual(putresult1.res.status, 200); const { rules } = await store.getBucketLifecycle(bucket); assert.strictEqual(rules[0].noncurrentVersionExpiration.noncurrentDays, '1'); }); it('should putBucketLifecycle with expiredObjectDeleteMarker', async () => { - const putresult1 = await store.putBucketLifecycle(bucket, [{ - id: 'expiration1', - prefix: 'logs/', - status: 'Enabled', - expiration: { - expiredObjectDeleteMarker: 'true' - }, - NoncurrentVersionExpiration: { - noncurrentDays: 1 + const putresult1 = await store.putBucketLifecycle(bucket, [ + { + id: 'expiration1', + prefix: 'logs/', + status: 'Enabled', + expiration: { + expiredObjectDeleteMarker: 'true' + }, + NoncurrentVersionExpiration: { + noncurrentDays: 1 + } } - }]); + ]); assert.equal(putresult1.res.status, 200); const { rules } = await store.getBucketLifecycle(bucket); assert.strictEqual(rules[0].expiration.expiredObjectDeleteMarker, 'true'); @@ -256,17 +270,60 @@ describe('test/multiversion.test.js', () => { store.delete(objectKey); const copyName = `${prefix}multipart-copy-target.js`; try { - const result = await store.multipartUploadCopy(copyName, { - sourceKey: objectKey, - sourceBucketName: bucket - }, { - versionId - }); + const result = await store.multipartUploadCopy( + copyName, + { + sourceKey: objectKey, + sourceBucketName: bucket + }, + { + versionId + } + ); assert.strictEqual(result.res.status, 200); } catch (error) { assert(false); } }); + + it('should request throw abort event', async () => { + const file = await utils.createTempFile(`multipart-upload-file-abort-${Date.now()}`, 102410); + const objectKey = `${prefix}multipart-copy-source-abort.js`; + const { res: sourceRes } = await store.multipartUpload(objectKey, file); + const versionId = sourceRes.headers['x-oss-version-id']; + store.delete(objectKey); + const copyName = `${prefix}multipart-copy-target-abort.js`; + const requestId = 'KDJSJJSHDEEEEEEWWW'; + mm(store, 'uploadPartCopy', () => { + store._stop(); + const netErr = new Error('Not Found'); + netErr.status = 404; + netErr.requestId = requestId; + throw netErr; + }); + + let netErrs; + try { + await store.multipartUploadCopy( + copyName, + { + sourceKey: objectKey, + sourceBucketName: bucket + }, + { + versionId + } + ); + } catch (error) { + netErrs = error; + } + store.resetCancelFlag(); + mm.restore(); + + assert.strictEqual(netErrs.status, 0); + assert.strictEqual(netErrs.name, 'abort'); + assert.strictEqual(netErrs.requestId, requestId); + }); }); describe('deleteMulti()', () => { @@ -275,7 +332,7 @@ describe('test/multiversion.test.js', () => { before(async () => { await store.putBucketVersioning(bucket, enabled); let result; - const _createHistoryObject = async (i) => { + const _createHistoryObject = async i => { name = name.replace('file', `file${i}`); result = await store.put(name, __filename); await store.delete(name); @@ -284,7 +341,11 @@ describe('test/multiversion.test.js', () => { versionId: result.res.headers['x-oss-version-id'] }); }; - await Promise.all(Array(3).fill(1).map((_, i) => _createHistoryObject(i))); + await Promise.all( + Array(3) + .fill(1) + .map((_, i) => _createHistoryObject(i)) + ); }); it('should deleteMulti', async () => { @@ -565,7 +626,7 @@ describe('test/multiversion.test.js', () => { describe('getBucketInfo()', () => { it('should return bucket Versioning', async () => { try { - await store.putBucketVersioning(bucket, enabled,); + await store.putBucketVersioning(bucket, enabled); const result = await store.getBucketInfo(bucket); assert.equal(result.res.status, 200); assert.equal(result.bucket.Versioning, enabled); @@ -693,7 +754,13 @@ describe('test/multiversion.test.js', () => { result = await store.deleteMulti(names); const markerVersionId = result.deleted.map(v => v.DeleteMarkerVersionId); assert.strictEqual(result.res.status, 200); - assert.strictEqual(result.deleted.map(v => v.Key).sort().toString(), names.sort().toString()); + assert.strictEqual( + result.deleted + .map(v => v.Key) + .sort() + .toString(), + names.sort().toString() + ); assert.strictEqual(result.deleted.filter(v => v.DeleteMarker).length, result.deleted.length); // 指定版本 批量删除历史版本文件,永久删除 @@ -703,7 +770,13 @@ describe('test/multiversion.test.js', () => { })); result = await store.deleteMulti(delNameObjArr); assert.strictEqual(result.res.status, 200); - assert.strictEqual(result.deleted.map(v => v.Key).sort().toString(), names.sort().toString()); + assert.strictEqual( + result.deleted + .map(v => v.Key) + .sort() + .toString(), + names.sort().toString() + ); // 指定版本 批量删除标记 const delNameMarkerArr = names.map((_, index) => ({ @@ -712,7 +785,13 @@ describe('test/multiversion.test.js', () => { })); result = await store.deleteMulti(delNameMarkerArr); assert.strictEqual(result.res.status, 200); - assert.strictEqual(result.deleted.map(v => v.Key).sort().toString(), names.sort().toString()); + assert.strictEqual( + result.deleted + .map(v => v.Key) + .sort() + .toString(), + names.sort().toString() + ); assert.strictEqual(result.deleted.filter(v => v.DeleteMarker).length, result.deleted.length); } catch (error) { assert(false, error.message); @@ -726,10 +805,7 @@ describe('test/multiversion.test.js', () => { let versionId; before(async () => { await store.putBucketVersioning(bucket, enabled); - fileName = await utils.createTempFile( - 'multipart-upload-file-copy', - 2 * 1024 * 1024 - ); + fileName = await utils.createTempFile('multipart-upload-file-copy', 2 * 1024 * 1024); sourceName = `${prefix}multipart/upload-file-with-copy`; const res = await store.multipartUpload(sourceName, fileName); // versionId @@ -744,13 +820,9 @@ describe('test/multiversion.test.js', () => { sourceKey: sourceName, sourceBucketName: bucket }; - const objectMeta = await store._getObjectMeta( - sourceData.sourceBucketName, - sourceData.sourceKey, - { - versionId - } - ); + const objectMeta = await store._getObjectMeta(sourceData.sourceBucketName, sourceData.sourceKey, { + versionId + }); const fileSize = objectMeta.res.headers['content-length']; const result = await store.initMultipartUpload(copyName); @@ -758,31 +830,24 @@ describe('test/multiversion.test.js', () => { const partSize = 100 * 1024; // 100kb const dones = []; - const uploadFn = async (i) => { + const uploadFn = async i => { const start = partSize * (i - 1); const end = Math.min(start + partSize, fileSize); const range = `${start}-${end - 1}`; - const part = await store.uploadPartCopy( - copyName, - result.uploadId, - i, - range, - sourceData, - { versionId } - ); + const part = await store.uploadPartCopy(copyName, result.uploadId, i, range, sourceData, { versionId }); dones.push({ number: i, etag: part.res.headers.etag }); }; - await Promise.all(Array(10).fill(1).map((v, i) => uploadFn(i + 1))); - - const complete = await store.completeMultipartUpload( - copyName, - result.uploadId, - dones + await Promise.all( + Array(10) + .fill(1) + .map((v, i) => uploadFn(i + 1)) ); + const complete = await store.completeMultipartUpload(copyName, result.uploadId, dones); + assert.equal(complete.res.status, 200); }); }); diff --git a/test/node/object.test.js b/test/node/object.test.js index 31a2a4fc7..da54a5c77 100644 --- a/test/node/object.test.js +++ b/test/node/object.test.js @@ -1313,7 +1313,6 @@ describe('test/object.test.js', () => { await store.getStream(`${name}not-exists`); throw new Error('should not run this'); } catch (err) { - console.log('error is', err); assert.equal(err.name, 'NoSuchKeyError'); assert(Object.keys(store.agent.freeSockets).length === 0); await utils.sleep(ms(metaSyncTime)); diff --git a/test/node/rtmp.test.js b/test/node/rtmp.test.js index b45dc290f..1d5ae66dc 100644 --- a/test/node/rtmp.test.js +++ b/test/node/rtmp.test.js @@ -70,11 +70,14 @@ describe('test/rtmp.test.js', () => { result = await store.deleteChannel(tempCid); assert.equal(result.res.status, 204); - await utils.throws(async () => { - await store.getChannel(tempCid); - }, (err) => { - assert.equal(err.status, 404); - }); + await utils.throws( + async () => { + await store.getChannel(tempCid); + }, + err => { + assert.equal(err.status, 404); + } + ); }); }); @@ -113,14 +116,22 @@ describe('test/rtmp.test.js', () => { before(async () => { channelNum = 10; channelPrefix = 'channel-list-'; - await Promise.all(Array(channelNum).fill(1).map((_, i) => { - conf.Description = i; - return store.putChannel(channelPrefix + i, conf); - })); + await Promise.all( + Array(channelNum) + .fill(1) + .map((_, i) => { + conf.Description = i; + return store.putChannel(channelPrefix + i, conf); + }) + ); }); after(async () => { - await Promise.all(Array(channelNum).fill(1).map((_, i) => store.deleteChannel(channelPrefix + i))); + await Promise.all( + Array(channelNum) + .fill(1) + .map((_, i) => store.deleteChannel(channelPrefix + i)) + ); }); it('list channels using prefix/marker/max-keys', async () => { @@ -177,13 +188,12 @@ describe('test/rtmp.test.js', () => { createVodConf.Description = 'this is live channel 4'; const result = await store.putChannel(createVodCid, createVodConf); assert.equal(result.res.status, 200); - const url = store.getRtmpUrl(createVodCid, { + store.getRtmpUrl(createVodCid, { params: { playlistName: 'vod.m3u8' }, expires: 3600 }); - console.log(url); }); after(async () => { @@ -203,7 +213,7 @@ describe('test/rtmp.test.js', () => { assert.equal(result.res.status, 200); } catch (err) { - console.error(err); + assert.fail(err); } }); });