001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.storage; 020 021import org.apache.reef.exception.evaluator.ServiceException; 022import org.apache.reef.exception.evaluator.StorageException; 023import org.apache.reef.io.Accumulable; 024import org.apache.reef.io.Accumulator; 025 026import java.io.ByteArrayOutputStream; 027import java.io.DataOutputStream; 028import java.io.IOException; 029import java.io.OutputStream; 030 031public class FramingOutputStream extends OutputStream implements Accumulable<byte[]> { 032 033 private final ByteArrayOutputStream baos; 034 private final DataOutputStream o; 035 private long offset; 036 private boolean closed; 037 038 public FramingOutputStream(final OutputStream o) { 039 if (!(o instanceof DataOutputStream)) { 040 this.o = new DataOutputStream(o); 041 } else { 042 this.o = (DataOutputStream) o; 043 } 044 this.baos = new ByteArrayOutputStream(); 045 } 046 047 public void nextFrame() throws IOException { 048 o.writeInt(baos.size()); 049 offset += 4; 050 baos.writeTo(o); 051 baos.reset(); 052 } 053 054 public long getCurrentOffset() { 055 return offset; 056 } 057 058 @Override 059 public void write(final int b) throws IOException { 060 baos.write(b); 061 offset++; 062 } 063 064 @Override 065 public void write(final byte[] b) throws IOException { 066 baos.write(b); 067 this.offset += b.length; 068 } 069 070 @Override 071 public void write(final byte[] b, final int offsetToWrite, final int length) throws IOException { 072 baos.write(b, offsetToWrite, length); 073 this.offset += length; 074 } 075 076 @Override 077 public void flush() { 078 // no-op. 079 } 080 081 @Override 082 public void close() throws IOException { 083 if (!closed) { 084 if (this.offset > 0) { 085 nextFrame(); 086 } 087 o.writeInt(-1); 088 o.close(); 089 closed = true; 090 } 091 } 092 093 @Override 094 public Accumulator<byte[]> accumulator() throws StorageException { 095 @SuppressWarnings("resource") 096 final FramingOutputStream fos = this; 097 return new Accumulator<byte[]>() { 098 099 @Override 100 public void add(final byte[] datum) throws ServiceException { 101 try { 102 o.writeInt(datum.length); 103 offset += 4; 104 o.write(datum); 105 offset += datum.length; 106 } catch (final IOException e) { 107 throw new ServiceException(e); 108 } 109 110 } 111 112 @Override 113 public void close() throws ServiceException { 114 try { 115 o.writeInt(-1); 116 offset += 4; 117 o.close(); 118 fos.closed = true; 119 } catch (final IOException e) { 120 throw new ServiceException(e); 121 } 122 } 123 124 }; 125 } 126 127}