001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.storage; 020 021import org.apache.reef.exception.evaluator.ServiceException; 022import org.apache.reef.exception.evaluator.StorageException; 023import org.apache.reef.io.Accumulable; 024import org.apache.reef.io.Accumulator; 025 026import java.io.ByteArrayOutputStream; 027import java.io.DataOutputStream; 028import java.io.IOException; 029import java.io.OutputStream; 030 031public class FramingOutputStream extends OutputStream implements Accumulable<byte[]> { 032 033 private final ByteArrayOutputStream baos; 034 private final DataOutputStream o; 035 private long offset; 036 private boolean closed; 037 038 public FramingOutputStream(OutputStream o) { 039 if (!(o instanceof DataOutputStream)) { 040 this.o = new DataOutputStream(o); 041 } else { 042 this.o = (DataOutputStream) o; 043 } 044 this.baos = new ByteArrayOutputStream(); 045 } 046 047 public void nextFrame() throws StorageException { 048 try { 049 o.writeInt(baos.size()); 050 offset += 4; 051 baos.writeTo(o); 052 baos.reset(); 053 } catch (IOException e) { 054 throw new StorageException(e); 055 } 056 } 057 058 public long getCurrentOffset() { 059 return offset; 060 } 061 062 @Override 063 public void write(int b) throws IOException { 064 baos.write(b); 065 offset++; 066 ; 067 } 068 069 @Override 070 public void write(byte[] b) throws IOException { 071 baos.write(b); 072 offset += b.length; 073 } 074 075 @Override 076 public void write(byte[] b, int offset, int length) throws IOException { 077 baos.write(b, offset, length); 078 offset += length; 079 } 080 081 @Override 082 public void flush() { 083 // no-op. 084 } 085 086 @Override 087 public void close() throws IOException { 088 if (!closed) { 089 try { 090 if (this.offset > 0) nextFrame(); 091 } catch (StorageException e) { 092 throw (IOException) e.getCause(); 093 } 094 o.writeInt(-1); 095 o.close(); 096 closed = true; 097 } 098 } 099 100 @Override 101 public Accumulator<byte[]> accumulator() throws StorageException { 102 @SuppressWarnings("resource") 103 final FramingOutputStream fos = this; 104 return new Accumulator<byte[]>() { 105 106 @Override 107 public void add(byte[] datum) throws ServiceException { 108 try { 109 o.writeInt(datum.length); 110 offset += 4; 111 o.write(datum); 112 offset += datum.length; 113 } catch (IOException e) { 114 throw new ServiceException(e); 115 } 116 117 } 118 119 @Override 120 public void close() throws ServiceException { 121 try { 122 o.writeInt(-1); 123 offset += 4; 124 o.close(); 125 fos.closed = true; 126 } catch (IOException e) { 127 throw new ServiceException(e); 128 } 129 } 130 131 }; 132 } 133 134}