Skip to content

Commit

Permalink
Add a means of specifying tags for s3.upload that applies to both sin…
Browse files Browse the repository at this point in the history
…gle part and multipart uploads (#1425)

* Add a means of specifying tags for s3.upload that applies to both single part and multipart uploads

* Add an managed upload example with tags

* Fix test sporadic test failure for s3.createPresignedPost

* Throw an error when invalid tags are passed in rather than validating when generating header
  • Loading branch information
jeskew committed Mar 23, 2017
1 parent 1a46ee6 commit e6671c4
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .changes/next-release/feature-S3-d7608648.json
@@ -0,0 +1,5 @@
{
"type": "feature",
"category": "S3",
"description": "Adds a means of specifying tags to apply to objects of any size uploaded with AWS.S3.ManagedUploader"
}
24 changes: 14 additions & 10 deletions lib/s3/managed_upload.d.ts
Expand Up @@ -19,7 +19,7 @@ export class ManagedUpload {
send(callback?: (err: AWSError, data: ManagedUpload.SendData) => void): void;
/**
* Adds a listener that is triggered when theuploader has uploaded more data.
*
*
* @param {string} event - httpUploadProgress: triggered when the uploader has uploaded more data.
* @param {function} listener - Callback to run when the uploader has uploaded more data.
*/
Expand All @@ -44,45 +44,49 @@ export namespace ManagedUpload {
/**
* URL of the uploaded object.
*/
Location: string
Location: string;
/**
* ETag of the uploaded object.
*/
ETag: string
ETag: string;
/**
* Bucket to which the object was uploaded.
*/
Bucket: string
Bucket: string;
/**
* Key to which the object was uploaded.
*/
Key: string
Key: string;
}
export interface ManagedUploadOptions {
/**
* A map of parameters to pass to the upload requests.
* The "Body" parameter is required to be specified either on the service or in the params option.
*/
params?: S3.Types.PutObjectRequest
params?: S3.Types.PutObjectRequest;
/**
* The size of the concurrent queue manager to upload parts in parallel. Set to 1 for synchronous uploading of parts. Note that the uploader will buffer at most queueSize * partSize bytes into memory at any given time.
* default: 4
*/
queueSize?: number
queueSize?: number;
/**
* Default: 5 mb
* The size in bytes for each individual part to be uploaded. Adjust the part size to ensure the number of parts does not exceed maxTotalParts. See minPartSize for the minimum allowed part size.
*/
partSize?: number
partSize?: number;
/**
* Default: false
* Whether to abort the multipart upload if an error occurs. Set to true if you want to handle failures manually.
*/
leavePartsOnError?: boolean
leavePartsOnError?: boolean;
/**
* An optional S3 service object to use for requests.
* This object might have bound parameters used by the uploader.
*/
service?: S3
service?: S3;
/**
* The tags to apply to the object.
*/
tags?: Array<S3.Tag>;
}
}
47 changes: 44 additions & 3 deletions lib/s3/managed_upload.js
Expand Up @@ -60,6 +60,8 @@ AWS.S3.ManagedUpload = AWS.util.inherit({
* failures manually.
* @option options service [AWS.S3] an optional S3 service object to use for
* requests. This object might have bound parameters used by the uploader.
* @option options tags [Array<map>] The tags to apply to the uploaded object.
* Each tag should have a `Key` and `Value` keys.
* @example Creating a default uploader for a stream object
* var upload = new AWS.S3.ManagedUpload({
* params: {Bucket: 'bucket', Key: 'key', Body: stream}
Expand All @@ -69,6 +71,11 @@ AWS.S3.ManagedUpload = AWS.util.inherit({
* partSize: 10 * 1024 * 1024, queueSize: 1,
* params: {Bucket: 'bucket', Key: 'key', Body: stream}
* });
* @example Creating an uploader with tags
* var upload = new AWS.S3.ManagedUpload({
* params: {Bucket: 'bucket', Key: 'key', Body: stream},
* tags: [{Key: 'tag1', Value: 'value1'}, {Key: 'tag2', Value: 'value2'}]
* });
* @see send
*/
constructor: function ManagedUpload(options) {
Expand Down Expand Up @@ -96,6 +103,13 @@ AWS.S3.ManagedUpload = AWS.util.inherit({
if (options.queueSize) this.queueSize = options.queueSize;
if (options.partSize) this.partSize = options.partSize;
if (options.leavePartsOnError) this.leavePartsOnError = true;
if (options.tags) {
if (!Array.isArray(options.tags)) {
throw new Error('Tags must be specified as an array; ' +
typeof options.tags + ' provided.');
}
this.tags = options.tags;
}

if (this.partSize < this.minPartSize) {
throw new Error('partSize must be greater than ' +
Expand Down Expand Up @@ -448,7 +462,11 @@ AWS.S3.ManagedUpload = AWS.util.inherit({

var partNumber = ++self.totalPartNumbers;
if (self.isDoneChunking && partNumber === 1) {
var req = self.service.putObject({Body: chunk});
var params = {Body: chunk};
if (this.tags) {
params.Tagging = this.getTaggingHeader();
}
var req = self.service.putObject(params);
req._managedUpload = self;
req.on('httpUploadProgress', self.progress).send(self.finishSinglePart);
return null;
Expand Down Expand Up @@ -487,6 +505,19 @@ AWS.S3.ManagedUpload = AWS.util.inherit({
}
},

/**
* @api private
*/
getTaggingHeader: function getTaggingHeader() {
var kvPairStrings = [];
for (var i = 0; i < this.tags.length; i++) {
kvPairStrings.push(AWS.util.uriEscape(this.tags[i].Key) + '=' +
AWS.util.uriEscape(this.tags[i].Value));
}

return kvPairStrings.join('&');
},

/**
* @api private
*/
Expand Down Expand Up @@ -583,8 +614,18 @@ AWS.S3.ManagedUpload = AWS.util.inherit({
var self = this;
var completeParams = { MultipartUpload: { Parts: self.completeInfo.slice(1) } };
self.service.completeMultipartUpload(completeParams, function(err, data) {
if (err) return self.cleanup(err);
else self.callback(err, data);
if (err) {
return self.cleanup(err);
}

if (Array.isArray(self.tags)) {
self.service.putObjectTagging(
{Tagging: {TagSet: self.tags}},
self.callback
);
} else {
self.callback(err, data);
}
});
},

Expand Down
78 changes: 78 additions & 0 deletions test/s3/managed_upload.spec.coffee
Expand Up @@ -530,3 +530,81 @@ describe 'AWS.S3.ManagedUpload', ->
return upload.promise().then(thenFunction).catch(catchFunction).then ->
expect(data).not.to.exist
expect(err.message).to.equal('ERROR')

describe 'tagging', ->
it 'should embed tags in PutObject request for single part uploads', (done) ->
reqs = helpers.mockResponses [
data: ETag: 'ETAG'
]

upload = new AWS.S3.ManagedUpload(
service: s3
params: {Body: smallbody}
tags: [
{Key: 'tag1', Value: 'value1'}
{Key: 'tag2', Value: 'value2'}
{Key: 'étiquette', Value: 'valeur à être encodé'}
]
)

send {}, ->
expect(err).not.to.exist
expect(reqs[0].httpRequest.headers['x-amz-tagging']).to.equal('tag1=value1&tag2=value2&%C3%A9tiquette=valeur%20%C3%A0%20%C3%AAtre%20encod%C3%A9')
done()

it 'should send a PutObjectTagging request following a successful multipart upload with tags', (done) ->
reqs = helpers.mockResponses [
{ data: UploadId: 'uploadId' }
{ data: ETag: 'ETAG1' }
{ data: ETag: 'ETAG2' }
{ data: ETag: 'ETAG3' }
{ data: ETag: 'ETAG4' }
{ data: ETag: 'FINAL_ETAG', Location: 'FINAL_LOCATION' }
{}
]

upload = new AWS.S3.ManagedUpload(
service: s3
params: {Body: bigbody}
tags: [
{Key: 'tag1', Value: 'value1'}
{Key: 'tag2', Value: 'value2'}
{Key: 'étiquette', Value: 'valeur à être encodé'}
]
)

send {}, ->
expect(helpers.operationsForRequests(reqs)).to.eql [
's3.createMultipartUpload'
's3.uploadPart'
's3.uploadPart'
's3.uploadPart'
's3.uploadPart'
's3.completeMultipartUpload'
's3.putObjectTagging'
]
expect(err).not.to.exist
expect(reqs[6].params.Tagging).to.deep.equal({
TagSet: [
{Key: 'tag1', Value: 'value1'}
{Key: 'tag2', Value: 'value2'}
{Key: 'étiquette', Value: 'valeur à être encodé'}
]
})
done()

it 'should throw when tags are not provided as an array', (done) ->
reqs = helpers.mockResponses [
data: ETag: 'ETAG'
]

try
upload = new AWS.S3.ManagedUpload(
service: s3
params: {Body: smallbody}
tags: 'tag1=value1&tag2=value2&%C3%A9tiquette=valeur%20%C3%A0%20%C3%AAtre%20encod%C3%A9'
)
done(new Error('AWS.S3.ManagedUpload should have thrown when passed a string for tags'))
catch e
done()

32 changes: 13 additions & 19 deletions test/services/s3.spec.coffee
Expand Up @@ -97,7 +97,7 @@ describe 'AWS.S3', ->
return 'v4'
else if (signer == AWS.Signers.V2)
return 'v2'

describe 'when using presigned requests', ->
req = null

Expand Down Expand Up @@ -152,7 +152,7 @@ describe 'AWS.S3', ->
it 'user does not specify a signatureVersion and region supports v2', (done) ->
s3 = new AWS.S3({region: 'us-east-1'})
expect(getVersion(s3.getSignerClass())).to.equal('s3')
done()
done()

describe 'will return a v4 signer when', ->

Expand Down Expand Up @@ -247,21 +247,21 @@ describe 'AWS.S3', ->
describe 'with useAccelerateEndpoint and dualstack set to true', ->
beforeEach ->
s3 = new AWS.S3(useAccelerateEndpoint: true, useDualstack: true)

it 'changes the hostname to use s3-accelerate for dns-comaptible buckets', ->
req = build('getObject', {Bucket: 'foo', Key: 'bar'})
expect(req.endpoint.hostname).to.equal('foo.s3-accelerate.dualstack.amazonaws.com')

it 'overrides s3BucketEndpoint configuration when s3BucketEndpoint is set', ->
s3 = new AWS.S3(useAccelerateEndpoint: true, useDualstack: true, s3BucketEndpoint: true, endpoint: 'foo.region.amazonaws.com')
req = build('getObject', {Bucket: 'foo', Key: 'baz'})
expect(req.endpoint.hostname).to.equal('foo.s3-accelerate.dualstack.amazonaws.com')

describe 'does not use s3-accelerate.dualstack or s3-accelerate', ->
it 'on dns-incompatible buckets', ->
req = build('getObject', {Bucket: 'foo.baz', Key: 'bar'})
expect(req.endpoint.hostname).to.not.contain('s3-accelerate')

it 'on excluded operations', ->
req = build('listBuckets')
expect(req.endpoint.hostname).to.not.contain('s3-accelerate')
Expand Down Expand Up @@ -902,7 +902,7 @@ describe 'AWS.S3', ->
s3.bucketRegionCache.name = 'eu-west-1'
fn()
req = callNetworkingErrorListener()
expect(spy.calls.length).to.equal(1)
expect(spy.calls.length).to.equal(1)
expect(regionReq.httpRequest.region).to.equal('us-east-1')
expect(regionReq.httpRequest.endpoint.hostname).to.equal('name.s3.amazonaws.com')
expect(req.httpRequest.region).to.equal('eu-west-1')
Expand Down Expand Up @@ -1253,7 +1253,7 @@ describe 'AWS.S3', ->
s3 = new AWS.S3()
params = Bucket: 'name'
s3.bucketRegionCache.name = 'rg-fake-1'
helpers.mockHttpResponse 204, {}, ''
helpers.mockHttpResponse 204, {}, ''
s3.deleteBucket params, ->
expect(s3.bucketRegionCache.name).to.not.exist

Expand Down Expand Up @@ -1323,7 +1323,7 @@ describe 'AWS.S3', ->
req = s3.putObject(Bucket: 'example', Key: 'key', Body: new Stream.Stream, ContentLength: 10)
req.send (err) ->
expect(err).not.to.exist
done()
done()

it 'opens separate stream if a file object is provided (signed payload)', (done) ->
hash = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
Expand Down Expand Up @@ -1542,26 +1542,20 @@ describe 'AWS.S3', ->
done()

it 'should default to expiration in one hour', (done) ->
helpers.spyOn(AWS.util.date, 'getDate').andReturn(new Date(946684800 * 1000))
s3 = new AWS.S3()
s3.createPresignedPost {Bucket: 'bucket'}, (err, data) ->
decoded = JSON.parse(AWS.util.base64.decode(data.fields.Policy))
expiration = new Date(decoded.expiration)
validForMs = expiration.valueOf() - (new Date()).valueOf()
# allow one second of leeway
expect(validForMs).to.be.above((60 * 60 - 1) * 1000)
expect(validForMs).to.be.below((60 * 60 + 1) * 1000)
expect(decoded.expiration).to.equal('2000-01-01T01:00:00Z')
done()

it 'should allow users to provide a custom expiration', (done) ->
helpers.spyOn(AWS.util.date, 'getDate').andReturn(new Date(946684800 * 1000))
customTtl = 900
s3 = new AWS.S3()
s3.createPresignedPost {Bucket: 'bucket', Expires: customTtl}, (err, data) ->
decoded = JSON.parse(AWS.util.base64.decode(data.fields.Policy))
expiration = new Date(decoded.expiration)
validForMs = expiration.valueOf() - (new Date()).valueOf()
# allow one second of leeway
expect(validForMs).to.be.above((customTtl - 1) * 1000)
expect(validForMs).to.be.below((customTtl + 1) * 1000)
expect(decoded.expiration).to.equal('2000-01-01T00:15:00Z')
done()

it 'should include signature metadata as conditions', (done) ->
Expand Down
24 changes: 19 additions & 5 deletions ts/s3.ts
Expand Up @@ -126,11 +126,25 @@ s3.putObject({
Body: fs.createReadStream('/fake/path')
});

const upload = s3.upload({
Bucket: 'BUCKET',
Key: 'KEY',
Body: new Buffer('some data')
});
const upload = s3.upload(
{
Bucket: 'BUCKET',
Key: 'KEY',
Body: new Buffer('some data')
},
{
tags: [
{
Key: 'key',
Value: 'value',
},
{
Key: 'otherKey',
Value: 'otherValue',
},
],
}
);

// test managed upload promise support
upload.promise()
Expand Down

0 comments on commit e6671c4

Please sign in to comment.