![[Azure 存储服务]使用 AppendBlobClient 对象实现对Blob进行追加内容操作 [Azure 存储服务]使用 AppendBlobClient 对象实现对Blob进行追加内容操作](/static/background/72.jpg)
问题描述
问题解答
在查看Java Storage SDK后,发现可以使用AppendBlobClient来实现。
/** * Creates a new {@link AppendBlobClient} associated with this blob. * * @return A {@link AppendBlobClient} associated with this blob. */ public AppendBlobClient getAppendBlobClient( { return new SpecializedBlobClientBuilder( .blobClient(this .buildAppendBlobClient(; }
在 AppendBlobClient 类,有 appendBlock 和 appendBlockWithResponse 等多种方法来实现追加。方法定义源码如下:
/** * Commits a new block of data to the end of the existing append blob. * <p> * Note that the data passed must be replayable if retries are enabled (the default. In other words, the * {@code Flux} must produce the same data each time it is subscribed to. * * <p><strong>Code Samples</strong></p> * * {@codesnippet com.azure.storage.blob.specialized.AppendBlobClient.appendBlock#InputStream-long} * * @param data The data to write to the blob. The data must be markable. This is in order to support retries. If * the data is not markable, consider using {@link #getBlobOutputStream(} and writing to the returned OutputStream. * Alternatively, consider wrapping your data source in a {@link java.io.BufferedInputStream} to add mark support. * @param length The exact length of the data. It is important that this value match precisely the length of the * data emitted by the {@code Flux}. * @return The information of the append blob operation. */ @ServiceMethod(returns = ReturnType.SINGLE public AppendBlobItem appendBlock(InputStream data, long length { return appendBlockWithResponse(data, length, null, null, null, Context.NONE.getValue(; } /** * Commits a new block of data to the end of the existing append blob. * <p> * Note that the data passed must be replayable if retries are enabled (the default. In other words, the * {@code Flux} must produce the same data each time it is subscribed to. * * <p><strong>Code Samples</strong></p> * * {@codesnippet com.azure.storage.blob.specialized.AppendBlobClient.appendBlockWithResponse#InputStream-long-byte-AppendBlobRequestConditions-Duration-Context} * * @param data The data to write to the blob. The data must be markable. This is in order to support retries. If * the data is not markable, consider using {@link #getBlobOutputStream(} and writing to the returned OutputStream. * Alternatively, consider wrapping your data source in a {@link java.io.BufferedInputStream} to add mark support. * @param length The exact length of the data. It is important that this value match precisely the length of the * data emitted by the {@code Flux}. * @param contentMd5 An MD5 hash of the block content. This hash is used to verify the integrity of the block during * transport. When this header is specified, the storage service compares the hash of the content that has arrived * with this header value. Note that this MD5 hash is not stored with the blob. If the two hashes do not match, the * operation will fail. * @param appendBlobRequestConditions {@link AppendBlobRequestConditions} * @param timeout An optional timeout value beyond which a {@link RuntimeException} will be raised. * @param context Additional context that is passed through the Http pipeline during the service call. * @return A {@link Response} whose {@link Response#getValue( value} contains the append blob operation. * @throws UnexpectedLengthException when the length of data does not match the input {@code length}. * @throws NullPointerException if the input data is null. */ @ServiceMethod(returns = ReturnType.SINGLE public Response<AppendBlobItem> appendBlockWithResponse(InputStream data, long length, byte[] contentMd5, AppendBlobRequestConditions appendBlobRequestConditions, Duration timeout, Context context { Objects.requireNonNull(data, "'data' cannot be null."; Flux<ByteBuffer> fbb = Utility.convertStreamToByteBuffer(data, length, MAX_APPEND_BLOCK_BYTES, true; Mono<Response<AppendBlobItem>> response = appendBlobAsyncClient.appendBlockWithResponse( fbb.subscribeOn(Schedulers.elastic(, length, contentMd5, appendBlobRequestConditions, context; return StorageImplUtils.blockWithOptionalTimeout(response, timeout; }
代码实现
第一步: 在Java项目 pom.xml 中引入Azure Storage Blob依赖
<dependency> <groupId>com.azure</groupId> <artifactId>azure-storage-blob</artifactId> <version>12.13.0</version> </dependency>
第二步: 引入必要的 Storage 类
import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.time.LocalTime; import com.azure.core.http.rest.Response; import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.BlobServiceClientBuilder; import com.azure.storage.blob.models.AppendBlobItem; import com.azure.storage.blob.models.AppendBlobRequestConditions; import com.azure.storage.blob.specialized.AppendBlobClient;
第三步:创建 AppendBlobClient 对象,使用 BlobServiceClient 及连接字符串(Connection String
String storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=*****;AccountKey=*******;EndpointSuffix=core.chinacloudapi.cn"; String containerName = "appendblob"; String fileName = "test.txt"; // Create a BlobServiceClient object which will be used to create a container System.out.println("\nCreate a BlobServiceClient Object to Connect Storage Account"; BlobServiceClient blobServiceClient = new BlobServiceClientBuilder( .connectionString(storageConnectionString .buildClient(; BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(containerName; if (!containerClient.exists( containerClient.create(; // Get a reference to a blob AppendBlobClient appendBlobClient = containerClient.getBlobClient(fileName.getAppendBlobClient(;
boolean overwrite = true; // Default value
if (!appendBlobClient.exists(
System.out.printf("Created AppendBlob at %s%n",
appendBlobClient.create(overwrite.getLastModified(;
String data = "Test to append new content into exists blob! by blogs lu bian liang zhan deng @"
+ LocalTime.now(.toString( + "\n";
InputStream inputStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8;
byte[] md5 = MessageDigest.getInstance("MD5".digest(data.getBytes(StandardCharsets.UTF_8;
AppendBlobRequestConditions requestConditions = new AppendBlobRequestConditions(;
// Context context = new Context("key", "value";
long length = data.getBytes(.length;
Response<AppendBlobItem> rsp = appendBlobClient.appendBlockWithResponse(inputStream, length, md5,
requestConditions, null, null;
if (rsp.getStatusCode( == 201 {
System.out.println("append content successful........";
}
运行结果展示
但如果操作的Blob类型不是Append Blob,就会遇见错误 Status code 409 ---- The blob type is invalid for this operation 错误
Exception in thread "main" com.azure.storage.blob.models.BlobStorageException: Status code 409, "
<?xml version="1.0" encoding="utf-8"?><Error>><Code>InvalidBlobType</Code>
<Message>The blob type is invalid for this operation.
RequestId:501ee0b9-301e-0003-4f7b-829ca6000000
Time:2023-05-09T13:37:17.7509942Z</Message></Error>"
at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:67
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:484
at com.azure.core.http.rest.RestProxy.instantiateUnexpectedException(RestProxy.java:343
at com.azure.core.http.rest.RestProxy.lambda$ensureExpectedStatus$5(RestProxy.java:382
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:337
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onNext(MonoCacheTime.java:354
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2397
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onSubscribe(MonoCacheTime.java:293
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:192
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52
at reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:143
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:118
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:184
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:416
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:470
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30
at java.base/java.lang.Thread.run(Thread.java:1589
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99
at reactor.core.publisher.Mono.block(Mono.java:1703
at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:128
at com.azure.storage.blob.specialized.AppendBlobClient.appendBlockWithResponse(AppendBlobClient.java:259
at test.App.AppendBlobContent(App.java:68
at test.App.main(App.java:31
参考资料
appendBlockWithResponse : https://learn.microsoft.com/en-us/java/api/com.azure.storage.blob.specialized.appendblobclient?view=azure-java-stable#com-azure-storage-blob-specialized-appendblobclient-appendblockwithresponse(java-io-inputstream-long-byte(-com-azure-storage-blob-models-appendblobrequestconditions-java-time-duration-com-azure-core-util-context
Blob(对象)存储简介 : https://docs.azure.cn/zh-cn/storage/blobs/storage-blobs-introduction